[MINOR] Fix typos in Spark client related classes (#4781)
This commit is contained in:
@@ -35,13 +35,14 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.log4j.LogManager;
|
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
import org.apache.avro.specific.SpecificData;
|
import org.apache.avro.specific.SpecificData;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
import org.springframework.shell.core.CommandMarker;
|
import org.springframework.shell.core.CommandMarker;
|
||||||
import org.springframework.shell.core.annotation.CliCommand;
|
import org.springframework.shell.core.annotation.CliCommand;
|
||||||
import org.springframework.shell.core.annotation.CliOption;
|
import org.springframework.shell.core.annotation.CliOption;
|
||||||
@@ -69,8 +70,8 @@ public class ExportCommand implements CommandMarker {
|
|||||||
@CliCommand(value = "export instants", help = "Export Instants and their metadata from the Timeline")
|
@CliCommand(value = "export instants", help = "Export Instants and their metadata from the Timeline")
|
||||||
public String exportInstants(
|
public String exportInstants(
|
||||||
@CliOption(key = {"limit"}, help = "Limit Instants", unspecifiedDefaultValue = "-1") final Integer limit,
|
@CliOption(key = {"limit"}, help = "Limit Instants", unspecifiedDefaultValue = "-1") final Integer limit,
|
||||||
@CliOption(key = {"actions"}, help = "Comma seperated list of Instant actions to export",
|
@CliOption(key = {"actions"}, help = "Comma separated list of Instant actions to export",
|
||||||
unspecifiedDefaultValue = "clean,commit,deltacommit,rollback,savepoint,restore") final String filter,
|
unspecifiedDefaultValue = "clean,commit,deltacommit,rollback,savepoint,restore") final String filter,
|
||||||
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
|
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
|
||||||
@CliOption(key = {"localFolder"}, help = "Local Folder to export to", mandatory = true) String localFolder)
|
@CliOption(key = {"localFolder"}, help = "Local Folder to export to", mandatory = true) String localFolder)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ public class TestUpgradeDowngradeCommand extends CLIFunctionalTestHarness {
|
|||||||
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode());
|
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode());
|
||||||
assertTableVersionFromPropertyFile();
|
assertTableVersionFromPropertyFile();
|
||||||
|
|
||||||
// verify marker files are non existant
|
// verify marker files are non existent
|
||||||
for (String partitionPath : DEFAULT_PARTITION_PATHS) {
|
for (String partitionPath : DEFAULT_PARTITION_PATHS) {
|
||||||
assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE));
|
assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -73,9 +73,9 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
||||||
Option<Integer> writes, Option<Integer> updates, Map<String, String> extraMetdata) throws Exception {
|
Option<Integer> writes, Option<Integer> updates, Map<String, String> extraMetadata) throws Exception {
|
||||||
createCommitFileWithMetadata(basePath, commitTime, configuration, UUID.randomUUID().toString(),
|
createCommitFileWithMetadata(basePath, commitTime, configuration, UUID.randomUUID().toString(),
|
||||||
UUID.randomUUID().toString(), writes, updates, extraMetdata);
|
UUID.randomUUID().toString(), writes, updates, extraMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload<T>> implements Seria
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple
|
* TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple
|
||||||
* basepath pointing to the table. Until, then just always assume a BloomIndex
|
* base path pointing to the table. Until, then just always assume a BloomIndex
|
||||||
*/
|
*/
|
||||||
private final transient HoodieIndex<?, ?> index;
|
private final transient HoodieIndex<?, ?> index;
|
||||||
private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable;
|
private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable;
|
||||||
|
|||||||
@@ -504,7 +504,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
@Override
|
@Override
|
||||||
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
|
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
|
||||||
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.
|
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.
|
||||||
// Important to create this after the lock to ensure latest commits show up in the timeline without need for reload
|
// Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
|
||||||
HoodieTable table = createTable(config, hadoopConf);
|
HoodieTable table = createTable(config, hadoopConf);
|
||||||
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
|
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
|
||||||
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner());
|
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner());
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
|
|||||||
|
|
||||||
// Add to the current file-group
|
// Add to the current file-group
|
||||||
currentGroup.add(currentSlice);
|
currentGroup.add(currentSlice);
|
||||||
// assume each filegroup size is ~= parquet.max.file.size
|
// assume each file group size is ~= parquet.max.file.size
|
||||||
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
|
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -118,7 +118,7 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
|
|||||||
@Override
|
@Override
|
||||||
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
|
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
|
||||||
return super.getFileSlicesEligibleForClustering(partition)
|
return super.getFileSlicesEligibleForClustering(partition)
|
||||||
// Only files that have basefile size smaller than small file size are eligible.
|
// Only files that have base file size smaller than small file size are eligible.
|
||||||
.filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
|
.filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ import java.util.Set;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Update strategy based on following.
|
* Update strategy based on following.
|
||||||
* if some file group have update record, throw exception
|
* if some file groups have update record, throw exception
|
||||||
*/
|
*/
|
||||||
public class SparkRejectUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, JavaRDD<HoodieRecord<T>>> {
|
public class SparkRejectUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, JavaRDD<HoodieRecord<T>>> {
|
||||||
private static final Logger LOG = LogManager.getLogger(SparkRejectUpdateStrategy.class);
|
private static final Logger LOG = LogManager.getLogger(SparkRejectUpdateStrategy.class);
|
||||||
|
|||||||
@@ -31,13 +31,13 @@ import org.apache.hudi.table.HoodieSparkTable;
|
|||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||||
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
|
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
|
||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SQLContext;
|
import org.apache.spark.sql.SQLContext;
|
||||||
import scala.collection.JavaConverters;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
@@ -47,6 +47,8 @@ import java.util.concurrent.CompletableFuture;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Spark validator utils to verify and run any precommit validators configured.
|
* Spark validator utils to verify and run any precommit validators configured.
|
||||||
*/
|
*/
|
||||||
@@ -97,7 +99,7 @@ public class SparkValidatorUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run validators in a separate threadpool for parallelism. Each of validator can submit a distributed spark job if needed.
|
* Run validators in a separate thread pool for parallelism. Each of validator can submit a distributed spark job if needed.
|
||||||
*/
|
*/
|
||||||
private static CompletableFuture<Boolean> runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata writeMetadata,
|
private static CompletableFuture<Boolean> runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata writeMetadata,
|
||||||
Dataset<Row> beforeState, Dataset<Row> afterState, String instantTime) {
|
Dataset<Row> beforeState, Dataset<Row> afterState, String instantTime) {
|
||||||
|
|||||||
@@ -37,8 +37,8 @@ import org.apache.spark.sql.SQLContext;
|
|||||||
* Validator to run sql query and compare table state
|
* Validator to run sql query and compare table state
|
||||||
* 1) before new commit started.
|
* 1) before new commit started.
|
||||||
* 2) current inflight commit (if successful).
|
* 2) current inflight commit (if successful).
|
||||||
*
|
* <p>
|
||||||
* Expects query results dont match.
|
* Expects query results do not match.
|
||||||
*/
|
*/
|
||||||
public class SqlQueryInequalityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
|
public class SqlQueryInequalityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
|
||||||
private static final Logger LOG = LogManager.getLogger(SqlQueryInequalityPreCommitValidator.class);
|
private static final Logger LOG = LogManager.getLogger(SqlQueryInequalityPreCommitValidator.class);
|
||||||
@@ -66,7 +66,7 @@ public class SqlQueryInequalityPreCommitValidator<T extends HoodieRecordPayload,
|
|||||||
LOG.info("Completed Inequality Validation, datasets equal? " + areDatasetsEqual);
|
LOG.info("Completed Inequality Validation, datasets equal? " + areDatasetsEqual);
|
||||||
if (areDatasetsEqual) {
|
if (areDatasetsEqual) {
|
||||||
LOG.error("query validation failed. See stdout for sample query results. Query: " + query);
|
LOG.error("query validation failed. See stdout for sample query results. Query: " + query);
|
||||||
System.out.println("Expected query results to be inequal, but they are same. Result (sample records only):");
|
System.out.println("Expected query results to be different, but they are same. Result (sample records only):");
|
||||||
prevRows.show();
|
prevRows.show();
|
||||||
throw new HoodieValidationException("Query validation failed for '" + query
|
throw new HoodieValidationException("Query validation failed for '" + query
|
||||||
+ "'. Expected " + prevRows.count() + " rows, Found " + newRows.count());
|
+ "'. Expected " + prevRows.count() + " rows, Found " + newRows.count());
|
||||||
|
|||||||
@@ -35,9 +35,9 @@ import org.apache.spark.sql.SQLContext;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validator to run sql queries on new table state and expects a single result. If the result doesnt match expected result,
|
* Validator to run sql queries on new table state and expects a single result. If the result does not match expected result,
|
||||||
* throw validation error.
|
* throw validation error.
|
||||||
*
|
* <p>
|
||||||
* Example configuration: "query1#expectedResult1;query2#expectedResult2;"
|
* Example configuration: "query1#expectedResult1;query2#expectedResult2;"
|
||||||
*/
|
*/
|
||||||
public class SqlQuerySingleResultPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
|
public class SqlQuerySingleResultPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ import java.io.Serializable;
|
|||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create handle with InternalRow for datasource implemention of bulk insert.
|
* Create handle with InternalRow for datasource implementation of bulk insert.
|
||||||
*/
|
*/
|
||||||
public class HoodieRowCreateHandle implements Serializable {
|
public class HoodieRowCreateHandle implements Serializable {
|
||||||
|
|
||||||
|
|||||||
@@ -18,9 +18,10 @@
|
|||||||
|
|
||||||
package org.apache.hudi.keygen;
|
package org.apache.hudi.keygen;
|
||||||
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
||||||
|
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.catalyst.InternalRow;
|
import org.apache.spark.sql.catalyst.InternalRow;
|
||||||
import org.apache.spark.sql.types.StructType;
|
import org.apache.spark.sql.types.StructType;
|
||||||
@@ -31,7 +32,7 @@ import java.util.List;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple Key generator for unpartitioned Hive Tables.
|
* Simple Key generator for non-partitioned Hive Tables.
|
||||||
*/
|
*/
|
||||||
public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
|
public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
|
||||||
|
|
||||||
|
|||||||
@@ -40,9 +40,9 @@ import java.util.stream.IntStream;
|
|||||||
|
|
||||||
import scala.Option;
|
import scala.Option;
|
||||||
|
|
||||||
import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
|
|
||||||
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
|
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
|
||||||
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
|
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
|
||||||
|
import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
|
||||||
import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
|
import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -230,9 +230,10 @@ public class RowKeyGeneratorHelper {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate the tree style positions for the field requested for as per the defined struct type.
|
* Generate the tree style positions for the field requested for as per the defined struct type.
|
||||||
* @param structType schema of interest
|
*
|
||||||
* @param field field of interest for which the positions are requested for
|
* @param structType schema of interest
|
||||||
* @param isRecordKey {@code true} if the field requested for is a record key. {@code false} incase of a partition path.
|
* @param field field of interest for which the positions are requested for
|
||||||
|
* @param isRecordKey {@code true} if the field requested for is a record key. {@code false} in case of a partition path.
|
||||||
* @return the positions of the field as per the struct type.
|
* @return the positions of the field as per the struct type.
|
||||||
*/
|
*/
|
||||||
public static List<Integer> getNestedFieldIndices(StructType structType, String field, boolean isRecordKey) {
|
public static List<Integer> getNestedFieldIndices(StructType structType, String field, boolean isRecordKey) {
|
||||||
|
|||||||
@@ -18,7 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.metadata;
|
package org.apache.hudi.metadata;
|
||||||
|
|
||||||
import org.apache.avro.specific.SpecificRecordBase;
|
|
||||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||||
@@ -35,6 +34,7 @@ import org.apache.hudi.data.HoodieJavaRDD;
|
|||||||
import org.apache.hudi.exception.HoodieMetadataException;
|
import org.apache.hudi.exception.HoodieMetadataException;
|
||||||
import org.apache.hudi.metrics.DistributedRegistry;
|
import org.apache.hudi.metrics.DistributedRegistry;
|
||||||
|
|
||||||
|
import org.apache.avro.specific.SpecificRecordBase;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@@ -51,8 +51,8 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
|||||||
/**
|
/**
|
||||||
* Return a Spark based implementation of {@code HoodieTableMetadataWriter} which can be used to
|
* Return a Spark based implementation of {@code HoodieTableMetadataWriter} which can be used to
|
||||||
* write to the metadata table.
|
* write to the metadata table.
|
||||||
*
|
* <p>
|
||||||
* If the metadata table does not exist, an attempt is made to bootstrap it but there is no guarantted that
|
* If the metadata table does not exist, an attempt is made to bootstrap it but there is no guaranteed that
|
||||||
* table will end up bootstrapping at this time.
|
* table will end up bootstrapping at this time.
|
||||||
*
|
*
|
||||||
* @param conf
|
* @param conf
|
||||||
|
|||||||
@@ -26,10 +26,11 @@ import org.apache.hudi.keygen.KeyGeneratorInterface;
|
|||||||
*/
|
*/
|
||||||
public interface BootstrapMetadataHandler {
|
public interface BootstrapMetadataHandler {
|
||||||
/**
|
/**
|
||||||
* Execute bootstrap with only metatata.
|
* Execute bootstrap with only metadata.
|
||||||
|
*
|
||||||
* @param srcPartitionPath source partition path.
|
* @param srcPartitionPath source partition path.
|
||||||
* @param partitionPath destination partition path.
|
* @param partitionPath destination partition path.
|
||||||
* @param keyGenerator key generator to use.
|
* @param keyGenerator key generator to use.
|
||||||
* @return the {@link BootstrapWriteStatus} which has the result of execution.
|
* @return the {@link BootstrapWriteStatus} which has the result of execution.
|
||||||
*/
|
*/
|
||||||
BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String partitionPath, KeyGeneratorInterface keyGenerator);
|
BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String partitionPath, KeyGeneratorInterface keyGenerator);
|
||||||
|
|||||||
@@ -113,9 +113,9 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
|
|||||||
validate();
|
validate();
|
||||||
try {
|
try {
|
||||||
HoodieTableMetaClient metaClient = table.getMetaClient();
|
HoodieTableMetaClient metaClient = table.getMetaClient();
|
||||||
Option<HoodieInstant> completetedInstant =
|
Option<HoodieInstant> completedInstant =
|
||||||
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
|
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
|
||||||
ValidationUtils.checkArgument(!completetedInstant.isPresent(),
|
ValidationUtils.checkArgument(!completedInstant.isPresent(),
|
||||||
"Active Timeline is expected to be empty for bootstrap to be performed. "
|
"Active Timeline is expected to be empty for bootstrap to be performed. "
|
||||||
+ "If you want to re-bootstrap, please rollback bootstrap first !!");
|
+ "If you want to re-bootstrap, please rollback bootstrap first !!");
|
||||||
Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();
|
Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();
|
||||||
|
|||||||
@@ -116,7 +116,7 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
|
|||||||
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
|
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
|
||||||
Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream()
|
Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream()
|
||||||
.map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
|
.map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
|
||||||
// for the below execution strategy, new filegroup id would be same as old filegroup id
|
// for the below execution strategy, new file group id would be same as old file group id
|
||||||
if (SparkSingleFileSortExecutionStrategy.class.getName().equals(config.getClusteringExecutionStrategyClass())) {
|
if (SparkSingleFileSortExecutionStrategy.class.getName().equals(config.getClusteringExecutionStrategyClass())) {
|
||||||
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
|
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
|
||||||
.collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
|
.collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
|
||||||
|
|||||||
@@ -20,16 +20,16 @@ package org.apache.hudi.table.action.commit;
|
|||||||
|
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.client.utils.SparkMemoryUtils;
|
import org.apache.hudi.client.utils.SparkMemoryUtils;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
|
||||||
import org.apache.hudi.client.utils.SparkValidatorUtils;
|
import org.apache.hudi.client.utils.SparkValidatorUtils;
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
|
import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
import org.apache.hudi.common.model.HoodieFileGroupId;
|
|
||||||
import org.apache.hudi.common.model.WriteOperationType;
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
@@ -44,9 +44,9 @@ import org.apache.hudi.exception.HoodieIOException;
|
|||||||
import org.apache.hudi.exception.HoodieUpsertException;
|
import org.apache.hudi.exception.HoodieUpsertException;
|
||||||
import org.apache.hudi.execution.SparkLazyInsertIterable;
|
import org.apache.hudi.execution.SparkLazyInsertIterable;
|
||||||
import org.apache.hudi.io.CreateHandleFactory;
|
import org.apache.hudi.io.CreateHandleFactory;
|
||||||
|
import org.apache.hudi.io.HoodieConcatHandle;
|
||||||
import org.apache.hudi.io.HoodieMergeHandle;
|
import org.apache.hudi.io.HoodieMergeHandle;
|
||||||
import org.apache.hudi.io.HoodieSortedMergeHandle;
|
import org.apache.hudi.io.HoodieSortedMergeHandle;
|
||||||
import org.apache.hudi.io.HoodieConcatHandle;
|
|
||||||
import org.apache.hudi.keygen.BaseKeyGenerator;
|
import org.apache.hudi.keygen.BaseKeyGenerator;
|
||||||
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
|
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
|
||||||
import org.apache.hudi.table.HoodieSparkTable;
|
import org.apache.hudi.table.HoodieSparkTable;
|
||||||
@@ -55,27 +55,29 @@ import org.apache.hudi.table.WorkloadProfile;
|
|||||||
import org.apache.hudi.table.WorkloadStat;
|
import org.apache.hudi.table.WorkloadStat;
|
||||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||||
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
|
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
|
||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.Partitioner;
|
import org.apache.spark.Partitioner;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.storage.StorageLevel;
|
import org.apache.spark.storage.StorageLevel;
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
import static org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans;
|
import static org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans;
|
||||||
|
|
||||||
@@ -126,7 +128,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
|||||||
if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
|
if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
|
||||||
return recordsAndPendingClusteringFileGroups.getLeft();
|
return recordsAndPendingClusteringFileGroups.getLeft();
|
||||||
}
|
}
|
||||||
// there are filegroups pending clustering and receiving updates, so rollback the pending clustering instants
|
// there are file groups pending clustering and receiving updates, so rollback the pending clustering instants
|
||||||
// there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed
|
// there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed
|
||||||
if (config.isRollbackPendingClustering()) {
|
if (config.isRollbackPendingClustering()) {
|
||||||
Set<HoodieInstant> pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
|
Set<HoodieInstant> pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.hudi.table.WorkloadProfile;
|
import org.apache.hudi.table.WorkloadProfile;
|
||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -44,7 +45,7 @@ public class SparkInsertOverwritePartitioner extends UpsertPartitioner {
|
|||||||
* Returns a list of small files in the given partition path.
|
* Returns a list of small files in the given partition path.
|
||||||
*/
|
*/
|
||||||
protected List<SmallFile> getSmallFiles(String partitionPath) {
|
protected List<SmallFile> getSmallFiles(String partitionPath) {
|
||||||
// for overwrite, we ignore all existing files. So dont consider any file to be smallFiles
|
// for overwrite, we ignore all existing files. So do not consider any file to be smallFiles
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -171,7 +171,7 @@ public class TestClientRollback extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Cases for effects of rollbacking completed/inflight commits.
|
* Test Cases for effects of rolling back completed/inflight commits.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testRollbackCommit() throws Exception {
|
public void testRollbackCommit() throws Exception {
|
||||||
|
|||||||
@@ -584,7 +584,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
|||||||
private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client,
|
private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client,
|
||||||
String prevCommitTime, String newCommitTime, int numRecords,
|
String prevCommitTime, String newCommitTime, int numRecords,
|
||||||
boolean doCommit) throws Exception {
|
boolean doCommit) throws Exception {
|
||||||
// Finish first base commmit
|
// Finish first base commit
|
||||||
JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert,
|
JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert,
|
||||||
false, false, numRecords);
|
false, false, numRecords);
|
||||||
if (doCommit) {
|
if (doCommit) {
|
||||||
|
|||||||
@@ -147,7 +147,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
|||||||
+ TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field")
|
+ TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field")
|
||||||
+ TRIP_SCHEMA_SUFFIX;
|
+ TRIP_SCHEMA_SUFFIX;
|
||||||
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema),
|
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema),
|
||||||
"Multiple added fields with defauls are compatible");
|
"Multiple added fields with defaults are compatible");
|
||||||
|
|
||||||
assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
|
assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
|
||||||
TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
|
TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
|
||||||
@@ -205,7 +205,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
|||||||
final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED);
|
final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED);
|
||||||
try {
|
try {
|
||||||
// We cannot use insertBatch directly here because we want to insert records
|
// We cannot use insertBatch directly here because we want to insert records
|
||||||
// with a devolved schema and insertBatch inserts records using the TRIP_EXMPLE_SCHEMA.
|
// with a devolved schema and insertBatch inserts records using the TRIP_EXAMPLE_SCHEMA.
|
||||||
writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
|
writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
|
||||||
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, 0, 0, 0, false);
|
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, 0, 0, 0, false);
|
||||||
fail("Insert with devolved scheme should fail");
|
fail("Insert with devolved scheme should fail");
|
||||||
@@ -233,7 +233,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
|||||||
client = getHoodieWriteClient(hoodieEvolvedWriteConfig);
|
client = getHoodieWriteClient(hoodieEvolvedWriteConfig);
|
||||||
|
|
||||||
// We cannot use insertBatch directly here because we want to insert records
|
// We cannot use insertBatch directly here because we want to insert records
|
||||||
// with a evolved schemaand insertBatch inserts records using the TRIP_EXMPLE_SCHEMA.
|
// with an evolved schema and insertBatch inserts records using the TRIP_EXAMPLE_SCHEMA.
|
||||||
final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("005", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
|
final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("005", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
|
||||||
writeBatch(client, "005", "004", Option.empty(), initCommitTime, numRecords,
|
writeBatch(client, "005", "004", Option.empty(), initCommitTime, numRecords,
|
||||||
(String s, Integer a) -> evolvedRecords, SparkRDDWriteClient::insert, false, 0, 0, 0, false);
|
(String s, Integer a) -> evolvedRecords, SparkRDDWriteClient::insert, false, 0, 0, 0, false);
|
||||||
|
|||||||
@@ -80,7 +80,7 @@ public class TestKeyRangeLookupTree {
|
|||||||
* Tests for many duplicate entries in the tree.
|
* Tests for many duplicate entries in the tree.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testFileGroupLookUpManyDulicateEntries() {
|
public void testFileGroupLookUpManyDuplicateEntries() {
|
||||||
KeyRangeNode toInsert = new KeyRangeNode(Long.toString(1200), Long.toString(2000), UUID.randomUUID().toString());
|
KeyRangeNode toInsert = new KeyRangeNode(Long.toString(1200), Long.toString(2000), UUID.randomUUID().toString());
|
||||||
updateExpectedMatchesToTest(toInsert);
|
updateExpectedMatchesToTest(toInsert);
|
||||||
keyRangeLookupTree.insert(toInsert);
|
keyRangeLookupTree.insert(toInsert);
|
||||||
|
|||||||
@@ -191,7 +191,7 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness
|
|||||||
final String newCommitTime = "001";
|
final String newCommitTime = "001";
|
||||||
final int numRecords = 10;
|
final int numRecords = 10;
|
||||||
final String oldPartitionPath = "1970/01/01";
|
final String oldPartitionPath = "1970/01/01";
|
||||||
final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName();
|
final String emptyHoodieRecordPayloadClassName = EmptyHoodieRecordPayload.class.getName();
|
||||||
|
|
||||||
List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime, numRecords);
|
List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime, numRecords);
|
||||||
List<HoodieRecord> oldRecords = new LinkedList();
|
List<HoodieRecord> oldRecords = new LinkedList();
|
||||||
@@ -226,7 +226,7 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness
|
|||||||
assertEquals(numRecords * 2L, taggedRecords.stream().count());
|
assertEquals(numRecords * 2L, taggedRecords.stream().count());
|
||||||
// Verify the number of deleted records
|
// Verify the number of deleted records
|
||||||
assertEquals(numRecords, taggedRecords.stream().filter(record -> record.getKey().getPartitionPath().equals(oldPartitionPath)
|
assertEquals(numRecords, taggedRecords.stream().filter(record -> record.getKey().getPartitionPath().equals(oldPartitionPath)
|
||||||
&& record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClasssName)).count());
|
&& record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClassName)).count());
|
||||||
// Verify the number of inserted records
|
// Verify the number of inserted records
|
||||||
assertEquals(numRecords, taggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals(oldPartitionPath)).count());
|
assertEquals(numRecords, taggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals(oldPartitionPath)).count());
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.io;
|
package org.apache.hudi.io;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||||
import org.apache.hudi.client.utils.MetadataConversionUtils;
|
import org.apache.hudi.client.utils.MetadataConversionUtils;
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||||
@@ -52,6 +51,7 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog;
|
|||||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@@ -655,7 +655,8 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
|
|||||||
public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exception {
|
public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exception {
|
||||||
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 2);
|
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 2);
|
||||||
|
|
||||||
// min archival commits is 2 and max archival commits is 4(either clean commits has to be > 4 or commits has to be greater than 4.
|
// min archival commits is 2 and max archival commits is 4
|
||||||
|
// (either clean commits has to be > 4 or commits has to be greater than 4)
|
||||||
// and so, after 5th commit, 3 commits will be archived.
|
// and so, after 5th commit, 3 commits will be archived.
|
||||||
// 1,2,3,4,5,6 : after archival -> 1,5,6 (because, 2,3,4,5 and 6 are clean commits and are eligible for archival)
|
// 1,2,3,4,5,6 : after archival -> 1,5,6 (because, 2,3,4,5 and 6 are clean commits and are eligible for archival)
|
||||||
// after 7th and 8th commit no-op wrt archival.
|
// after 7th and 8th commit no-op wrt archival.
|
||||||
|
|||||||
@@ -25,8 +25,8 @@ import org.apache.hudi.keygen.KeyGenerator;
|
|||||||
import org.apache.hudi.keygen.SimpleKeyGenerator;
|
import org.apache.hudi.keygen.SimpleKeyGenerator;
|
||||||
import org.apache.hudi.keygen.TestComplexKeyGenerator;
|
import org.apache.hudi.keygen.TestComplexKeyGenerator;
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
||||||
|
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
@@ -58,7 +58,7 @@ public class TestHoodieSparkKeyGeneratorFactory {
|
|||||||
// set both class name and keyGenerator type
|
// set both class name and keyGenerator type
|
||||||
props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.CUSTOM.name());
|
props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.CUSTOM.name());
|
||||||
KeyGenerator keyGenerator3 = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
|
KeyGenerator keyGenerator3 = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
|
||||||
// KEYGENERATOR_TYPE_PROP was overitten by KEYGENERATOR_CLASS_PROP
|
// KEYGENERATOR_TYPE_PROP was overwritten by KEYGENERATOR_CLASS_PROP
|
||||||
Assertions.assertEquals(SimpleKeyGenerator.class.getName(), keyGenerator3.getClass().getName());
|
Assertions.assertEquals(SimpleKeyGenerator.class.getName(), keyGenerator3.getClass().getName());
|
||||||
|
|
||||||
// set wrong class name
|
// set wrong class name
|
||||||
|
|||||||
@@ -169,9 +169,9 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
|||||||
return getConsistencyGuardConfig(3, 10, 10);
|
return getConsistencyGuardConfig(3, 10, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ConsistencyGuardConfig getConsistencyGuardConfig(int maxChecks, int initalSleep, int maxSleep) {
|
private ConsistencyGuardConfig getConsistencyGuardConfig(int maxChecks, int initialSleep, int maxSleep) {
|
||||||
return ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
|
return ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
|
||||||
.withInitialConsistencyCheckIntervalMs(initalSleep).withMaxConsistencyCheckIntervalMs(maxSleep)
|
.withInitialConsistencyCheckIntervalMs(initialSleep).withMaxConsistencyCheckIntervalMs(maxSleep)
|
||||||
.withMaxConsistencyChecks(maxChecks).build();
|
.withMaxConsistencyChecks(maxChecks).build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,8 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.table.action.compact;
|
package org.apache.hudi.table.action.compact;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hudi.client.HoodieReadClient;
|
import org.apache.hudi.client.HoodieReadClient;
|
||||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||||
@@ -32,6 +30,9 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.table.HoodieSparkTable;
|
import org.apache.hudi.table.HoodieSparkTable;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
@@ -204,8 +205,8 @@ public class TestAsyncCompaction extends CompactionTestBase {
|
|||||||
String compactionInstantTime = "006";
|
String compactionInstantTime = "006";
|
||||||
int numRecs = 2000;
|
int numRecs = 2000;
|
||||||
|
|
||||||
final List<HoodieRecord> initalRecords = dataGen.generateInserts(firstInstantTime, numRecs);
|
final List<HoodieRecord> initialRecords = dataGen.generateInserts(firstInstantTime, numRecs);
|
||||||
final List<HoodieRecord> records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), initalRecords, cfg, true,
|
final List<HoodieRecord> records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), initialRecords, cfg, true,
|
||||||
new ArrayList<>());
|
new ArrayList<>());
|
||||||
|
|
||||||
// Schedule compaction but do not run them
|
// Schedule compaction but do not run them
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.table.HoodieSparkTable;
|
import org.apache.hudi.table.HoodieSparkTable;
|
||||||
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -62,7 +63,7 @@ public class TestInlineCompaction extends CompactionTestBase {
|
|||||||
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
||||||
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
|
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
|
||||||
|
|
||||||
// Then: ensure no compaction is executedm since there are only 2 delta commits
|
// Then: ensure no compaction is executed since there are only 2 delta commits
|
||||||
assertEquals(2, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
|
assertEquals(2, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -152,7 +153,7 @@ public class TestInlineCompaction extends CompactionTestBase {
|
|||||||
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
||||||
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
|
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
|
||||||
|
|
||||||
// Then: ensure no compaction is executedm since there are only 3 delta commits
|
// Then: ensure no compaction is executed since there are only 3 delta commits
|
||||||
assertEquals(3, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
|
assertEquals(3, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
|
||||||
// 4th commit, that will trigger compaction
|
// 4th commit, that will trigger compaction
|
||||||
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
|
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
|
||||||
|
|||||||
@@ -143,10 +143,10 @@ public class TestHoodieCompactionStrategy {
|
|||||||
"DayBasedCompactionStrategy should have resulted in fewer compactions");
|
"DayBasedCompactionStrategy should have resulted in fewer compactions");
|
||||||
assertEquals(2, returned.size(), "DayBasedCompactionStrategy should have resulted in fewer compactions");
|
assertEquals(2, returned.size(), "DayBasedCompactionStrategy should have resulted in fewer compactions");
|
||||||
|
|
||||||
int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1).getPartitionPath(),
|
int comparison = strategy.getComparator().compare(returned.get(returned.size() - 1).getPartitionPath(),
|
||||||
returned.get(0).getPartitionPath());
|
returned.get(0).getPartitionPath());
|
||||||
// Either the partition paths are sorted in descending order or they are equal
|
// Either the partition paths are sorted in descending order or they are equal
|
||||||
assertTrue(comparision >= 0, "DayBasedCompactionStrategy should sort partitions in descending order");
|
assertTrue(comparison >= 0, "DayBasedCompactionStrategy should sort partitions in descending order");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -192,10 +192,10 @@ public class TestHoodieCompactionStrategy {
|
|||||||
assertEquals(5, returned.size(),
|
assertEquals(5, returned.size(),
|
||||||
"BoundedPartitionAwareCompactionStrategy should have resulted in fewer compactions");
|
"BoundedPartitionAwareCompactionStrategy should have resulted in fewer compactions");
|
||||||
|
|
||||||
int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1).getPartitionPath(),
|
int comparison = strategy.getComparator().compare(returned.get(returned.size() - 1).getPartitionPath(),
|
||||||
returned.get(0).getPartitionPath());
|
returned.get(0).getPartitionPath());
|
||||||
// Either the partition paths are sorted in descending order or they are equal
|
// Either the partition paths are sorted in descending order or they are equal
|
||||||
assertTrue(comparision >= 0, "BoundedPartitionAwareCompactionStrategy should sort partitions in descending order");
|
assertTrue(comparison >= 0, "BoundedPartitionAwareCompactionStrategy should sort partitions in descending order");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
|||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.hudi.testutils.Assertions;
|
import org.apache.hudi.testutils.Assertions;
|
||||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||||
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -78,18 +79,18 @@ public class HoodieClientRollbackTestBase extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//2. assert filegroup and get the first partition fileslice
|
//2. assert file group and get the first partition file slice
|
||||||
HoodieTable table = this.getHoodieTable(metaClient, cfg);
|
HoodieTable table = this.getHoodieTable(metaClient, cfg);
|
||||||
SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
|
SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
|
||||||
List<HoodieFileGroup> firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
|
List<HoodieFileGroup> firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
|
||||||
assertEquals(1, firstPartitionCommit2FileGroups.size());
|
assertEquals(1, firstPartitionCommit2FileGroups.size());
|
||||||
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
|
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
|
||||||
//3. assert filegroup and get the second partition fileslice
|
//3. assert file group and get the second partition file slice
|
||||||
List<HoodieFileGroup> secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
|
List<HoodieFileGroup> secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
|
||||||
assertEquals(1, secondPartitionCommit2FileGroups.size());
|
assertEquals(1, secondPartitionCommit2FileGroups.size());
|
||||||
secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
|
secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
|
||||||
|
|
||||||
//4. assert fileslice
|
//4. assert file slice
|
||||||
HoodieTableType tableType = this.getTableType();
|
HoodieTableType tableType = this.getTableType();
|
||||||
if (tableType.equals(HoodieTableType.COPY_ON_WRITE)) {
|
if (tableType.equals(HoodieTableType.COPY_ON_WRITE)) {
|
||||||
assertEquals(2, firstPartitionCommit2FileSlices.size());
|
assertEquals(2, firstPartitionCommit2FileSlices.size());
|
||||||
|
|||||||
@@ -112,7 +112,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
|
|||||||
assertTrue(meta.getSuccessDeleteFiles() == null || meta.getSuccessDeleteFiles().size() == 0);
|
assertTrue(meta.getSuccessDeleteFiles() == null || meta.getSuccessDeleteFiles().size() == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
//4. assert filegroup after rollback, and compare to the rollbackstat
|
//4. assert file group after rollback, and compare to the rollbackstat
|
||||||
// assert the first partition data and log file size
|
// assert the first partition data and log file size
|
||||||
List<HoodieFileGroup> firstPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
|
List<HoodieFileGroup> firstPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
|
||||||
assertEquals(1, firstPartitionRollBack1FileGroups.size());
|
assertEquals(1, firstPartitionRollBack1FileGroups.size());
|
||||||
|
|||||||
@@ -103,7 +103,7 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness {
|
|||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(booleans = {true, false})
|
@ValueSource(booleans = {true, false})
|
||||||
public void testDataPathsWhenCreatingOrMerging(boolean isTablePartitioned) throws IOException {
|
public void testDataPathsWhenCreatingOrMerging(boolean isTablePartitioned) throws IOException {
|
||||||
// add markfiles
|
// add marker files
|
||||||
createSomeMarkers(isTablePartitioned);
|
createSomeMarkers(isTablePartitioned);
|
||||||
// add invalid file
|
// add invalid file
|
||||||
createInvalidFile(isTablePartitioned ? "2020/06/01" : "", "invalid_file3");
|
createInvalidFile(isTablePartitioned ? "2020/06/01" : "", "invalid_file3");
|
||||||
|
|||||||
@@ -207,7 +207,7 @@ public class HoodieClientTestUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads the paths under the a hoodie table out as a DataFrame.
|
* Reads the paths under the hoodie table out as a DataFrame.
|
||||||
*/
|
*/
|
||||||
public static Dataset<Row> read(JavaSparkContext jsc, String basePath, SQLContext sqlContext, FileSystem fs,
|
public static Dataset<Row> read(JavaSparkContext jsc, String basePath, SQLContext sqlContext, FileSystem fs,
|
||||||
String... paths) {
|
String... paths) {
|
||||||
|
|||||||
@@ -275,7 +275,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
|||||||
HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord);
|
HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord);
|
||||||
return new HoodieMetadataPayload(key, type, combineBloomFilterMetadata);
|
return new HoodieMetadataPayload(key, type, combineBloomFilterMetadata);
|
||||||
case METADATA_TYPE_COLUMN_STATS:
|
case METADATA_TYPE_COLUMN_STATS:
|
||||||
return new HoodieMetadataPayload(key, type, combineColumnStatsMetadatat(previousRecord));
|
return new HoodieMetadataPayload(key, type, combineColumnStatsMetadata(previousRecord));
|
||||||
default:
|
default:
|
||||||
throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type);
|
throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type);
|
||||||
}
|
}
|
||||||
@@ -285,7 +285,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
|||||||
return this.bloomFilterMetadata;
|
return this.bloomFilterMetadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayload previousRecord) {
|
private HoodieMetadataColumnStats combineColumnStatsMetadata(HoodieMetadataPayload previousRecord) {
|
||||||
return this.columnStatMetadata;
|
return this.columnStatMetadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,9 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.metadata;
|
package org.apache.hudi.metadata;
|
||||||
|
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||||
@@ -51,6 +48,10 @@ import org.apache.hudi.exception.HoodieException;
|
|||||||
import org.apache.hudi.exception.HoodieMetadataException;
|
import org.apache.hudi.exception.HoodieMetadataException;
|
||||||
import org.apache.hudi.io.storage.HoodieFileReader;
|
import org.apache.hudi.io.storage.HoodieFileReader;
|
||||||
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
|
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
|
||||||
|
|
||||||
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -798,7 +799,7 @@ public class HoodieTableMetadataUtil {
|
|||||||
/**
|
/**
|
||||||
* Create column stats from write status.
|
* Create column stats from write status.
|
||||||
*
|
*
|
||||||
* @param engineContext - Enging context
|
* @param engineContext - Engine context
|
||||||
* @param datasetMetaClient - Dataset meta client
|
* @param datasetMetaClient - Dataset meta client
|
||||||
* @param allWriteStats - Write status to convert
|
* @param allWriteStats - Write status to convert
|
||||||
* @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for indexing
|
* @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for indexing
|
||||||
|
|||||||
@@ -164,7 +164,7 @@ public class HoodieJavaStreamingApp {
|
|||||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||||
int numInitialCommits = 0;
|
int numInitialCommits = 0;
|
||||||
|
|
||||||
// thread for spark strucutured streaming
|
// thread for spark structured streaming
|
||||||
try {
|
try {
|
||||||
Future<Void> streamFuture = executor.submit(() -> {
|
Future<Void> streamFuture = executor.submit(() -> {
|
||||||
LOG.info("===== Streaming Starting =====");
|
LOG.info("===== Streaming Starting =====");
|
||||||
@@ -211,7 +211,7 @@ public class HoodieJavaStreamingApp {
|
|||||||
Dataset<Row> inputDF3 = newSpark.read().json(jssc.parallelize(deletes, 2));
|
Dataset<Row> inputDF3 = newSpark.read().json(jssc.parallelize(deletes, 2));
|
||||||
executor = Executors.newFixedThreadPool(2);
|
executor = Executors.newFixedThreadPool(2);
|
||||||
|
|
||||||
// thread for spark strucutured streaming
|
// thread for spark structured streaming
|
||||||
try {
|
try {
|
||||||
Future<Void> streamFuture = executor.submit(() -> {
|
Future<Void> streamFuture = executor.submit(() -> {
|
||||||
LOG.info("===== Streaming Starting =====");
|
LOG.info("===== Streaming Starting =====");
|
||||||
|
|||||||
@@ -191,7 +191,7 @@ public class TestDataSourceUtils {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateUserDefinedBulkInsertPartitionerRowsWithInValidPartitioner() throws HoodieException {
|
public void testCreateUserDefinedBulkInsertPartitionerRowsWithInValidPartitioner() throws HoodieException {
|
||||||
config = HoodieWriteConfig.newBuilder().withPath("/").withUserDefinedBulkInsertPartitionerClass("NonExistantUserDefinedClass").build();
|
config = HoodieWriteConfig.newBuilder().withPath("/").withUserDefinedBulkInsertPartitionerClass("NonExistentUserDefinedClass").build();
|
||||||
|
|
||||||
Exception exception = assertThrows(HoodieException.class, () -> {
|
Exception exception = assertThrows(HoodieException.class, () -> {
|
||||||
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(config);
|
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(config);
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ public class TestBootstrap extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMetadataBootstrapUnpartitionedCOW() throws Exception {
|
public void testMetadataBootstrapNonpartitionedCOW() throws Exception {
|
||||||
testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
|
testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -229,7 +229,7 @@ public class TestBootstrap extends HoodieClientTestBase {
|
|||||||
bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
|
bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
bootstrapModeSelectorClass = TestRandomBootstapModeSelector.class.getName();
|
bootstrapModeSelectorClass = TestRandomBootstrapModeSelector.class.getName();
|
||||||
bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
|
bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
|
||||||
checkNumRawFiles = false;
|
checkNumRawFiles = false;
|
||||||
isBootstrapIndexCreated = true;
|
isBootstrapIndexCreated = true;
|
||||||
@@ -523,11 +523,11 @@ public class TestBootstrap extends HoodieClientTestBase {
|
|||||||
}).collect(Collectors.toList()));
|
}).collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TestRandomBootstapModeSelector extends BootstrapModeSelector {
|
public static class TestRandomBootstrapModeSelector extends BootstrapModeSelector {
|
||||||
|
|
||||||
private int currIdx = new Random().nextInt(2);
|
private int currIdx = new Random().nextInt(2);
|
||||||
|
|
||||||
public TestRandomBootstapModeSelector(HoodieWriteConfig writeConfig) {
|
public TestRandomBootstrapModeSelector(HoodieWriteConfig writeConfig) {
|
||||||
super(writeConfig);
|
super(writeConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -172,7 +172,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMetadataBootstrapUnpartitionedCOW() throws Exception {
|
public void testMetadataBootstrapNonpartitionedCOW() throws Exception {
|
||||||
testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
|
testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -222,7 +222,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
|
|||||||
bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
|
bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
bootstrapModeSelectorClass = TestRandomBootstapModeSelector.class.getName();
|
bootstrapModeSelectorClass = TestRandomBootstrapModeSelector.class.getName();
|
||||||
bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
|
bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
|
||||||
checkNumRawFiles = false;
|
checkNumRawFiles = false;
|
||||||
isBootstrapIndexCreated = true;
|
isBootstrapIndexCreated = true;
|
||||||
@@ -438,10 +438,10 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
|
|||||||
}).collect(Collectors.toList()));
|
}).collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TestRandomBootstapModeSelector extends BootstrapModeSelector {
|
public static class TestRandomBootstrapModeSelector extends BootstrapModeSelector {
|
||||||
private int currIdx = new Random().nextInt(2);
|
private int currIdx = new Random().nextInt(2);
|
||||||
|
|
||||||
public TestRandomBootstapModeSelector(HoodieWriteConfig writeConfig) {
|
public TestRandomBootstrapModeSelector(HoodieWriteConfig writeConfig) {
|
||||||
super(writeConfig);
|
super(writeConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -25,7 +25,6 @@ import org.apache.avro.Schema;
|
|||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
@@ -98,7 +97,7 @@ public class TestAWSDmsAvroPayload {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema);
|
Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema);
|
||||||
// expect nothing to be comitted to table
|
// expect nothing to be committed to table
|
||||||
assertFalse(outputPayload.isPresent());
|
assertFalse(outputPayload.isPresent());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
fail("Unexpected exception");
|
fail("Unexpected exception");
|
||||||
@@ -123,7 +122,7 @@ public class TestAWSDmsAvroPayload {
|
|||||||
try {
|
try {
|
||||||
OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload);
|
OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload);
|
||||||
Option<IndexedRecord> outputPayload = output.getInsertValue(avroSchema);
|
Option<IndexedRecord> outputPayload = output.getInsertValue(avroSchema);
|
||||||
// expect nothing to be comitted to table
|
// expect nothing to be committed to table
|
||||||
assertFalse(outputPayload.isPresent());
|
assertFalse(outputPayload.isPresent());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
fail("Unexpected exception");
|
fail("Unexpected exception");
|
||||||
|
|||||||
@@ -99,9 +99,9 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
var updateDf: DataFrame = null
|
var updateDf: DataFrame = null
|
||||||
if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
|
if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
|
||||||
// update current_ts to be same as original record so that partition path does not change with timestamp based key gen
|
// update current_ts to be same as original record so that partition path does not change with timestamp based key gen
|
||||||
val orignalRow = inputDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0)
|
val originalRow = inputDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0)
|
||||||
updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
|
updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
|
||||||
.withColumn("current_ts", lit(orignalRow.getAs("current_ts")))
|
.withColumn("current_ts", lit(originalRow.getAs("current_ts")))
|
||||||
} else {
|
} else {
|
||||||
updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
|
updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ public class TimelineServerPerf implements Serializable {
|
|||||||
public TimelineServerPerf(Config cfg) throws IOException {
|
public TimelineServerPerf(Config cfg) throws IOException {
|
||||||
this.cfg = cfg;
|
this.cfg = cfg;
|
||||||
useExternalTimelineServer = (cfg.serverHost != null);
|
useExternalTimelineServer = (cfg.serverHost != null);
|
||||||
TimelineService.Config timelineServiceConf = cfg.getTimelinServerConfig();
|
TimelineService.Config timelineServiceConf = cfg.getTimelineServerConfig();
|
||||||
this.timelineServer = new TimelineService(
|
this.timelineServer = new TimelineService(
|
||||||
new HoodieLocalEngineContext(FSUtils.prepareHadoopConf(new Configuration())),
|
new HoodieLocalEngineContext(FSUtils.prepareHadoopConf(new Configuration())),
|
||||||
new Configuration(), timelineServiceConf, FileSystem.get(new Configuration()),
|
new Configuration(), timelineServiceConf, FileSystem.get(new Configuration()),
|
||||||
@@ -281,7 +281,7 @@ public class TimelineServerPerf implements Serializable {
|
|||||||
description = " Server Host (Set it for externally managed timeline service")
|
description = " Server Host (Set it for externally managed timeline service")
|
||||||
public String serverHost = null;
|
public String serverHost = null;
|
||||||
|
|
||||||
@Parameter(names = {"--view-storage", "-st"}, description = "View Storage Type. Defaut - SPILLABLE_DISK")
|
@Parameter(names = {"--view-storage", "-st"}, description = "View Storage Type. Default - SPILLABLE_DISK")
|
||||||
public FileSystemViewStorageType viewStorageType = FileSystemViewStorageType.SPILLABLE_DISK;
|
public FileSystemViewStorageType viewStorageType = FileSystemViewStorageType.SPILLABLE_DISK;
|
||||||
|
|
||||||
@Parameter(names = {"--max-view-mem-per-table", "-mv"},
|
@Parameter(names = {"--max-view-mem-per-table", "-mv"},
|
||||||
@@ -310,7 +310,7 @@ public class TimelineServerPerf implements Serializable {
|
|||||||
@Parameter(names = {"--help", "-h"})
|
@Parameter(names = {"--help", "-h"})
|
||||||
public Boolean help = false;
|
public Boolean help = false;
|
||||||
|
|
||||||
public TimelineService.Config getTimelinServerConfig() {
|
public TimelineService.Config getTimelineServerConfig() {
|
||||||
TimelineService.Config c = new TimelineService.Config();
|
TimelineService.Config c = new TimelineService.Config();
|
||||||
c.viewStorageType = viewStorageType;
|
c.viewStorageType = viewStorageType;
|
||||||
c.baseStorePathForFileGroups = baseStorePathForFileGroups;
|
c.baseStorePathForFileGroups = baseStorePathForFileGroups;
|
||||||
|
|||||||
@@ -279,35 +279,35 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
|
|||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
if (useCustomSchema) {
|
if (useCustomSchema) {
|
||||||
Helpers.saveORCToDFS(Helpers.toGenericRecords(
|
Helpers.saveORCToDFS(Helpers.toGenericRecords(
|
||||||
dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr),
|
dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr),
|
||||||
schema), new Path(path), HoodieTestDataGenerator.ORC_TRIP_SCHEMA);
|
schema), new Path(path), HoodieTestDataGenerator.ORC_TRIP_SCHEMA);
|
||||||
} else {
|
} else {
|
||||||
Helpers.saveORCToDFS(Helpers.toGenericRecords(
|
Helpers.saveORCToDFS(Helpers.toGenericRecords(
|
||||||
dataGenerator.generateInserts("000", numRecords)), new Path(path));
|
dataGenerator.generateInserts("000", numRecords)), new Path(path));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addCommitToTimeline(HoodieTableMetaClient metaCient) throws IOException {
|
static void addCommitToTimeline(HoodieTableMetaClient metaClient) throws IOException {
|
||||||
addCommitToTimeline(metaCient, Collections.emptyMap());
|
addCommitToTimeline(metaClient, Collections.emptyMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addCommitToTimeline(HoodieTableMetaClient metaCient, Map<String, String> extraMetadata) throws IOException {
|
static void addCommitToTimeline(HoodieTableMetaClient metaClient, Map<String, String> extraMetadata) throws IOException {
|
||||||
addCommitToTimeline(metaCient, WriteOperationType.UPSERT, HoodieTimeline.COMMIT_ACTION, extraMetadata);
|
addCommitToTimeline(metaClient, WriteOperationType.UPSERT, HoodieTimeline.COMMIT_ACTION, extraMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addReplaceCommitToTimeline(HoodieTableMetaClient metaCient, Map<String, String> extraMetadata) throws IOException {
|
static void addReplaceCommitToTimeline(HoodieTableMetaClient metaClient, Map<String, String> extraMetadata) throws IOException {
|
||||||
addCommitToTimeline(metaCient, WriteOperationType.CLUSTER, HoodieTimeline.REPLACE_COMMIT_ACTION, extraMetadata);
|
addCommitToTimeline(metaClient, WriteOperationType.CLUSTER, HoodieTimeline.REPLACE_COMMIT_ACTION, extraMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addCommitToTimeline(HoodieTableMetaClient metaCient, WriteOperationType writeOperationType, String commitActiontype,
|
static void addCommitToTimeline(HoodieTableMetaClient metaClient, WriteOperationType writeOperationType, String commitActiontype,
|
||||||
Map<String, String> extraMetadata) throws IOException {
|
Map<String, String> extraMetadata) throws IOException {
|
||||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||||
commitMetadata.setOperationType(writeOperationType);
|
commitMetadata.setOperationType(writeOperationType);
|
||||||
extraMetadata.forEach((k,v) -> commitMetadata.getExtraMetadata().put(k, v));
|
extraMetadata.forEach((k, v) -> commitMetadata.getExtraMetadata().put(k, v));
|
||||||
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, commitActiontype, commitTime));
|
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, commitActiontype, commitTime));
|
||||||
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime));
|
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime));
|
||||||
metaCient.getActiveTimeline().saveAsComplete(
|
metaClient.getActiveTimeline().saveAsComplete(
|
||||||
new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime),
|
new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime),
|
||||||
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -367,22 +367,22 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
|||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
AtomicBoolean continousFailed = new AtomicBoolean(false);
|
AtomicBoolean continuousFailed = new AtomicBoolean(false);
|
||||||
AtomicBoolean backfillFailed = new AtomicBoolean(false);
|
AtomicBoolean backfillFailed = new AtomicBoolean(false);
|
||||||
try {
|
try {
|
||||||
Future regularIngestionJobFuture = service.submit(() -> {
|
Future regularIngestionJobFuture = service.submit(() -> {
|
||||||
try {
|
try {
|
||||||
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion, jobId);
|
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion, jobId);
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
continousFailed.set(true);
|
continuousFailed.set(true);
|
||||||
LOG.error("Continuous job failed " + ex.getMessage());
|
LOG.error("Continuous job failed " + ex.getMessage());
|
||||||
throw new RuntimeException(ex);
|
throw new RuntimeException(ex);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Future backfillJobFuture = service.submit(() -> {
|
Future backfillJobFuture = service.submit(() -> {
|
||||||
try {
|
try {
|
||||||
// trigger backfill atleast after 1 requested entry is added to timline from continuous job. If not, there is a chance that backfill will complete even before
|
// trigger backfill atleast after 1 requested entry is added to timeline from continuous job. If not, there is a chance that backfill will complete even before
|
||||||
// continous job starts.
|
// continuous job starts.
|
||||||
awaitCondition(new GetCommitsAfterInstant(tableBasePath, lastSuccessfulCommit));
|
awaitCondition(new GetCommitsAfterInstant(tableBasePath, lastSuccessfulCommit));
|
||||||
backfillJob.sync();
|
backfillJob.sync();
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
|
|||||||
Reference in New Issue
Block a user