[MINOR] Rename TestSourceConfig to SourceConfigs (#1749)
This commit is contained in:
@@ -49,7 +49,7 @@ import org.apache.hudi.utilities.sources.ParquetDFSSource;
|
|||||||
import org.apache.hudi.utilities.sources.TestDataSource;
|
import org.apache.hudi.utilities.sources.TestDataSource;
|
||||||
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
|
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
|
||||||
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
|
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
|
||||||
import org.apache.hudi.utilities.testutils.sources.config.TestSourceConfig;
|
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
|
||||||
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
|
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
|
||||||
import org.apache.hudi.utilities.transform.Transformer;
|
import org.apache.hudi.utilities.transform.Transformer;
|
||||||
|
|
||||||
@@ -490,7 +490,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT);
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT);
|
||||||
cfg.continuousMode = true;
|
cfg.continuousMode = true;
|
||||||
cfg.tableType = tableType.name();
|
cfg.tableType = tableType.name();
|
||||||
cfg.configs.add(String.format("%s=%d", TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||||
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
|
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
|
||||||
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
||||||
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
|
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
|
||||||
@@ -722,9 +722,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
@Test
|
@Test
|
||||||
public void testDistributedTestDataSource() {
|
public void testDistributedTestDataSource() {
|
||||||
TypedProperties props = new TypedProperties();
|
TypedProperties props = new TypedProperties();
|
||||||
props.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, "1000");
|
props.setProperty(SourceConfigs.MAX_UNIQUE_RECORDS_PROP, "1000");
|
||||||
props.setProperty(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, "1");
|
props.setProperty(SourceConfigs.NUM_SOURCE_PARTITIONS_PROP, "1");
|
||||||
props.setProperty(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true");
|
props.setProperty(SourceConfigs.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true");
|
||||||
DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props, jsc, sparkSession, null);
|
DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props, jsc, sparkSession, null);
|
||||||
InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000);
|
InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000);
|
||||||
batch.getBatch().get().cache();
|
batch.getBatch().get().cache();
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ import org.apache.hudi.exception.HoodieIOException;
|
|||||||
import org.apache.hudi.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.testutils.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||||
import org.apache.hudi.utilities.sources.AvroSource;
|
import org.apache.hudi.utilities.sources.AvroSource;
|
||||||
import org.apache.hudi.utilities.testutils.sources.config.TestSourceConfig;
|
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
|
||||||
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
@@ -57,9 +57,9 @@ public abstract class AbstractBaseTestSource extends AvroSource {
|
|||||||
|
|
||||||
public static void initDataGen(TypedProperties props, int partition) {
|
public static void initDataGen(TypedProperties props, int partition) {
|
||||||
try {
|
try {
|
||||||
boolean useRocksForTestDataGenKeys = props.getBoolean(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS,
|
boolean useRocksForTestDataGenKeys = props.getBoolean(SourceConfigs.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS,
|
||||||
TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
|
SourceConfigs.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
|
||||||
String baseStoreDir = props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS,
|
String baseStoreDir = props.getString(SourceConfigs.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS,
|
||||||
File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + partition;
|
File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + partition;
|
||||||
LOG.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir);
|
LOG.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir);
|
||||||
dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
|
dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
|
||||||
@@ -84,7 +84,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
|
|||||||
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String instantTime,
|
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String instantTime,
|
||||||
int partition) {
|
int partition) {
|
||||||
int maxUniqueKeys =
|
int maxUniqueKeys =
|
||||||
props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
|
props.getInteger(SourceConfigs.MAX_UNIQUE_RECORDS_PROP, SourceConfigs.DEFAULT_MAX_UNIQUE_RECORDS);
|
||||||
|
|
||||||
HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
|
HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
|
|||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||||
import org.apache.hudi.utilities.sources.InputBatch;
|
import org.apache.hudi.utilities.sources.InputBatch;
|
||||||
import org.apache.hudi.utilities.testutils.sources.config.TestSourceConfig;
|
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
|
||||||
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
@@ -47,7 +47,7 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
|
|||||||
SchemaProvider schemaProvider) {
|
SchemaProvider schemaProvider) {
|
||||||
super(props, sparkContext, sparkSession, schemaProvider);
|
super(props, sparkContext, sparkSession, schemaProvider);
|
||||||
this.numTestSourcePartitions =
|
this.numTestSourcePartitions =
|
||||||
props.getInteger(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, TestSourceConfig.DEFAULT_NUM_SOURCE_PARTITIONS);
|
props.getInteger(SourceConfigs.NUM_SOURCE_PARTITIONS_PROP, SourceConfigs.DEFAULT_NUM_SOURCE_PARTITIONS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -66,9 +66,9 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
|
|||||||
|
|
||||||
// Set the maxUniqueRecords per partition for TestDataSource
|
// Set the maxUniqueRecords per partition for TestDataSource
|
||||||
int maxUniqueRecords =
|
int maxUniqueRecords =
|
||||||
props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
|
props.getInteger(SourceConfigs.MAX_UNIQUE_RECORDS_PROP, SourceConfigs.DEFAULT_MAX_UNIQUE_RECORDS);
|
||||||
String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, maxUniqueRecords / numTestSourcePartitions));
|
String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, maxUniqueRecords / numTestSourcePartitions));
|
||||||
newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition);
|
newProps.setProperty(SourceConfigs.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition);
|
||||||
int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions));
|
int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions));
|
||||||
JavaRDD<GenericRecord> avroRDD =
|
JavaRDD<GenericRecord> avroRDD =
|
||||||
sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed().collect(Collectors.toList()),
|
sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed().collect(Collectors.toList()),
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ package org.apache.hudi.utilities.testutils.sources.config;
|
|||||||
/**
|
/**
|
||||||
* Configurations for Test Data Sources.
|
* Configurations for Test Data Sources.
|
||||||
*/
|
*/
|
||||||
public class TestSourceConfig {
|
public class SourceConfigs {
|
||||||
|
|
||||||
// Used by DistributedTestDataSource only. Number of partitions where each partitions generates test-data
|
// Used by DistributedTestDataSource only. Number of partitions where each partitions generates test-data
|
||||||
public static final String NUM_SOURCE_PARTITIONS_PROP = "hoodie.deltastreamer.source.test.num_partitions";
|
public static final String NUM_SOURCE_PARTITIONS_PROP = "hoodie.deltastreamer.source.test.num_partitions";
|
||||||
Reference in New Issue
Block a user