diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 3bd887d61..079f88d68 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -49,7 +49,7 @@ import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource; -import org.apache.hudi.utilities.testutils.sources.config.TestSourceConfig; +import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; import org.apache.hudi.utilities.transform.Transformer; @@ -490,7 +490,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT); cfg.continuousMode = true; cfg.tableType = tableType.name(); - cfg.configs.add(String.format("%s=%d", TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> { @@ -722,9 +722,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { @Test public void testDistributedTestDataSource() { TypedProperties props = new TypedProperties(); - props.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, "1000"); - props.setProperty(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, "1"); - props.setProperty(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true"); + props.setProperty(SourceConfigs.MAX_UNIQUE_RECORDS_PROP, "1000"); + props.setProperty(SourceConfigs.NUM_SOURCE_PARTITIONS_PROP, "1"); + props.setProperty(SourceConfigs.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true"); DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props, jsc, sparkSession, null); InputBatch> batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000); batch.getBatch().get().cache(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java index 67bab4df4..4b42e2d7e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java @@ -26,7 +26,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.testutils.HoodieTestDataGenerator; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.AvroSource; -import org.apache.hudi.utilities.testutils.sources.config.TestSourceConfig; +import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -57,9 +57,9 @@ public abstract class AbstractBaseTestSource extends AvroSource { public static void initDataGen(TypedProperties props, int partition) { try { - boolean useRocksForTestDataGenKeys = props.getBoolean(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, - TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS); - String baseStoreDir = props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, + boolean useRocksForTestDataGenKeys = props.getBoolean(SourceConfigs.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, + SourceConfigs.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS); + String baseStoreDir = props.getString(SourceConfigs.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + partition; LOG.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir); dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, @@ -84,7 +84,7 @@ public abstract class AbstractBaseTestSource extends AvroSource { protected static Stream fetchNextBatch(TypedProperties props, int sourceLimit, String instantTime, int partition) { int maxUniqueKeys = - props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS); + props.getInteger(SourceConfigs.MAX_UNIQUE_RECORDS_PROP, SourceConfigs.DEFAULT_MAX_UNIQUE_RECORDS); HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java index c0da33f4c..0ccddd582 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.InputBatch; -import org.apache.hudi.utilities.testutils.sources.config.TestSourceConfig; +import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.avro.generic.GenericRecord; import org.apache.log4j.LogManager; @@ -47,7 +47,7 @@ public class DistributedTestDataSource extends AbstractBaseTestSource { SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); this.numTestSourcePartitions = - props.getInteger(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, TestSourceConfig.DEFAULT_NUM_SOURCE_PARTITIONS); + props.getInteger(SourceConfigs.NUM_SOURCE_PARTITIONS_PROP, SourceConfigs.DEFAULT_NUM_SOURCE_PARTITIONS); } @Override @@ -66,9 +66,9 @@ public class DistributedTestDataSource extends AbstractBaseTestSource { // Set the maxUniqueRecords per partition for TestDataSource int maxUniqueRecords = - props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS); + props.getInteger(SourceConfigs.MAX_UNIQUE_RECORDS_PROP, SourceConfigs.DEFAULT_MAX_UNIQUE_RECORDS); String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, maxUniqueRecords / numTestSourcePartitions)); - newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition); + newProps.setProperty(SourceConfigs.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition); int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions)); JavaRDD avroRDD = sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed().collect(Collectors.toList()), diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/config/TestSourceConfig.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/config/SourceConfigs.java similarity index 98% rename from hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/config/TestSourceConfig.java rename to hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/config/SourceConfigs.java index 04f90d53e..1d3aae7ba 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/config/TestSourceConfig.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/config/SourceConfigs.java @@ -21,7 +21,7 @@ package org.apache.hudi.utilities.testutils.sources.config; /** * Configurations for Test Data Sources. */ -public class TestSourceConfig { +public class SourceConfigs { // Used by DistributedTestDataSource only. Number of partitions where each partitions generates test-data public static final String NUM_SOURCE_PARTITIONS_PROP = "hoodie.deltastreamer.source.test.num_partitions";