1
0

[HUDI-89] Add configOption & refactor all configs based on that (#2833)

Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
wenningd
2021-06-30 14:26:30 -07:00
committed by GitHub
parent 07e93de8b4
commit d412fb2fe6
173 changed files with 4277 additions and 3309 deletions

View File

@@ -74,7 +74,7 @@ public class HoodieSnapshotCopier implements Serializable {
public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
@Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system")
public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
public Boolean verifyMetadataFileListing = HoodieMetadataConfig.METADATA_VALIDATE_PROP.defaultValue();
}
public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir,

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.callback.kafka;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -48,22 +49,22 @@ public class HoodieWriteCommitKafkaCallback implements HoodieWriteCommitCallback
private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitKafkaCallback.class);
private Properties props;
private HoodieConfig hoodieConfig;
private String bootstrapServers;
private String topic;
public HoodieWriteCommitKafkaCallback(HoodieWriteConfig config) {
this.props = config.getProps();
this.bootstrapServers = props.getProperty(CALLBACK_KAFKA_BOOTSTRAP_SERVERS);
this.topic = props.getProperty(CALLBACK_KAFKA_TOPIC);
this.hoodieConfig = config;
this.bootstrapServers = config.getString(CALLBACK_KAFKA_BOOTSTRAP_SERVERS);
this.topic = config.getString(CALLBACK_KAFKA_TOPIC);
validateKafkaConfig();
}
@Override
public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
String callbackMsg = HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage);
try (KafkaProducer<String, String> producer = createProducer(props)) {
ProducerRecord<String, String> record = buildProducerRecord(props, callbackMsg);
try (KafkaProducer<String, String> producer = createProducer(hoodieConfig)) {
ProducerRecord<String, String> record = buildProducerRecord(hoodieConfig, callbackMsg);
producer.send(record);
LOG.info(String.format("Send callback message %s succeed", callbackMsg));
} catch (Exception e) {
@@ -75,17 +76,19 @@ public class HoodieWriteCommitKafkaCallback implements HoodieWriteCommitCallback
* Method helps to create {@link KafkaProducer}. Here we set acks = all and retries = 3 by default to ensure no data
* loss.
*
* @param props Kafka configs
* @param hoodieConfig Kafka configs
* @return A {@link KafkaProducer}
*/
public KafkaProducer<String, String> createProducer(Properties props) {
public KafkaProducer<String, String> createProducer(HoodieConfig hoodieConfig) {
Properties kafkaProducerProps = new Properties();
// bootstrap.servers
kafkaProducerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// default "all" to ensure no message loss
kafkaProducerProps.setProperty(ProducerConfig.ACKS_CONFIG, props.getProperty(CALLBACK_KAFKA_ACKS));
kafkaProducerProps.setProperty(ProducerConfig.ACKS_CONFIG, hoodieConfig
.getString(CALLBACK_KAFKA_ACKS));
// retries 3 times by default
kafkaProducerProps.setProperty(ProducerConfig.RETRIES_CONFIG, props.getProperty(CALLBACK_KAFKA_RETRIES));
kafkaProducerProps.setProperty(ProducerConfig.RETRIES_CONFIG, hoodieConfig
.getString(CALLBACK_KAFKA_RETRIES));
kafkaProducerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
@@ -101,17 +104,18 @@ public class HoodieWriteCommitKafkaCallback implements HoodieWriteCommitCallback
* that the callback message of the same table will goes to the same partition. Therefore, if user does not specify
* the partition, we can use the table name as {@link ProducerRecord} key.
*
* @param props Kafka configs
* @param hoodieConfig Kafka configs
* @param callbackMsg Callback message
* @return Callback {@link ProducerRecord}
*/
private ProducerRecord<String, String> buildProducerRecord(Properties props, String callbackMsg) {
String partition = props.getProperty(CALLBACK_KAFKA_PARTITION);
private ProducerRecord<String, String> buildProducerRecord(HoodieConfig hoodieConfig, String callbackMsg) {
String partition = hoodieConfig.getString(CALLBACK_KAFKA_PARTITION);
if (null != partition) {
return new ProducerRecord<String, String>(topic, Integer.valueOf(partition), props.getProperty(TABLE_NAME),
return new ProducerRecord<String, String>(topic, Integer.valueOf(partition), hoodieConfig
.getString(TABLE_NAME),
callbackMsg);
} else {
return new ProducerRecord<String, String>(topic, props.getProperty(TABLE_NAME), callbackMsg);
return new ProducerRecord<String, String>(topic, hoodieConfig.getString(TABLE_NAME), callbackMsg);
}
}
@@ -121,9 +125,9 @@ public class HoodieWriteCommitKafkaCallback implements HoodieWriteCommitCallback
*/
private void validateKafkaConfig() {
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(bootstrapServers), String.format("Config %s can not be "
+ "null or empty", CALLBACK_KAFKA_BOOTSTRAP_SERVERS));
+ "null or empty", CALLBACK_KAFKA_BOOTSTRAP_SERVERS.key()));
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(topic), String.format("Config %s can not be null or empty",
CALLBACK_KAFKA_TOPIC));
CALLBACK_KAFKA_TOPIC.key()));
}
/**

View File

@@ -17,32 +17,52 @@
package org.apache.hudi.utilities.callback.kafka;
import java.util.Properties;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import static org.apache.hudi.common.config.DefaultHoodieConfig.setDefaultOnCondition;
import static org.apache.hudi.config.HoodieWriteCommitCallbackConfig.CALLBACK_PREFIX;
/**
* Kafka write callback related config.
*/
public class HoodieWriteCommitKafkaCallbackConfig {
public class HoodieWriteCommitKafkaCallbackConfig extends HoodieConfig {
public static final String CALLBACK_KAFKA_BOOTSTRAP_SERVERS = CALLBACK_PREFIX + "kafka.bootstrap.servers";
public static final String CALLBACK_KAFKA_TOPIC = CALLBACK_PREFIX + "kafka.topic";
public static final String CALLBACK_KAFKA_PARTITION = CALLBACK_PREFIX + "kafka.partition";
public static final String CALLBACK_KAFKA_ACKS = CALLBACK_PREFIX + "kafka.acks";
public static final String DEFAULT_CALLBACK_KAFKA_ACKS = "all";
public static final String CALLBACK_KAFKA_RETRIES = CALLBACK_PREFIX + "kafka.retries";
public static final int DEFAULT_CALLBACK_KAFKA_RETRIES = 3;
public static final ConfigProperty<String> CALLBACK_KAFKA_BOOTSTRAP_SERVERS = ConfigProperty
.key(CALLBACK_PREFIX + "kafka.bootstrap.servers")
.noDefaultValue()
.sinceVersion("0.7.0")
.withDocumentation("Bootstrap servers of kafka callback cluster");
public static final ConfigProperty<String> CALLBACK_KAFKA_TOPIC = ConfigProperty
.key(CALLBACK_PREFIX + "kafka.topic")
.noDefaultValue()
.sinceVersion("0.7.0")
.withDocumentation("Kafka topic to be sent along with callback messages");
public static final ConfigProperty<String> CALLBACK_KAFKA_PARTITION = ConfigProperty
.key(CALLBACK_PREFIX + "kafka.partition")
.noDefaultValue()
.sinceVersion("0.7.0")
.withDocumentation("partition of CALLBACK_KAFKA_TOPIC, 0 by default");
public static final ConfigProperty<String> CALLBACK_KAFKA_ACKS = ConfigProperty
.key(CALLBACK_PREFIX + "kafka.acks")
.defaultValue("all")
.sinceVersion("0.7.0")
.withDocumentation("kafka acks level, all by default");
public static final ConfigProperty<Integer> CALLBACK_KAFKA_RETRIES = ConfigProperty
.key(CALLBACK_PREFIX + "kafka.retries")
.defaultValue(3)
.sinceVersion("0.7.0")
.withDocumentation("Times to retry. 3 by default");
/**
* Set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.
*/
public static void setCallbackKafkaConfigIfNeeded(Properties props) {
setDefaultOnCondition(props, !props.containsKey(CALLBACK_KAFKA_ACKS), CALLBACK_KAFKA_ACKS,
DEFAULT_CALLBACK_KAFKA_ACKS);
setDefaultOnCondition(props, !props.containsKey(CALLBACK_KAFKA_RETRIES), CALLBACK_KAFKA_RETRIES,
String.valueOf(DEFAULT_CALLBACK_KAFKA_RETRIES));
public static void setCallbackKafkaConfigIfNeeded(HoodieConfig config) {
config.setDefaultValue(CALLBACK_KAFKA_ACKS);
config.setDefaultValue(CALLBACK_KAFKA_RETRIES);
}
}

View File

@@ -49,7 +49,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import static org.apache.hudi.common.table.HoodieTableConfig.DEFAULT_ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP;
/**
* Performs bootstrap from a non-hudi source.
@@ -111,13 +111,14 @@ public class BootstrapExecutor implements Serializable {
this.configuration = conf;
this.props = properties;
ValidationUtils.checkArgument(properties.containsKey(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH),
HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH + " must be specified.");
this.bootstrapBasePath = properties.getString(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH);
ValidationUtils.checkArgument(properties.containsKey(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH_PROP
.key()),
HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH_PROP.key() + " must be specified.");
this.bootstrapBasePath = properties.getString(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH_PROP.key());
// Add more defaults if full bootstrap requested
this.props.putIfAbsent(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL());
this.props.putIfAbsent(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY().key(),
DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY().defaultValue());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath)
@@ -174,7 +175,7 @@ public class BootstrapExecutor implements Serializable {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(DEFAULT_ARCHIVELOG_FOLDER)
.setArchiveLogFolder(HOODIE_ARCHIVELOG_FOLDER_PROP.defaultValue())
.setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat)
.setBootstrapIndexClass(cfg.bootstrapIndexClass)

View File

@@ -90,7 +90,7 @@ import java.util.stream.Collectors;
import scala.collection.JavaConversions;
import static org.apache.hudi.common.table.HoodieTableConfig.DEFAULT_ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP;
@@ -243,7 +243,7 @@ public class DeltaSync implements Serializable {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(DEFAULT_ARCHIVELOG_FOLDER)
.setArchiveLogFolder(HOODIE_ARCHIVELOG_FOLDER_PROP.defaultValue())
.setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat)
.setPartitionColumns(partitionColumns)
@@ -336,7 +336,7 @@ public class DeltaSync implements Serializable {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(DEFAULT_ARCHIVELOG_FOLDER)
.setArchiveLogFolder(HOODIE_ARCHIVELOG_FOLDER_PROP.defaultValue())
.setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat)
.setPartitionColumns(partitionColumns)
@@ -654,18 +654,18 @@ public class DeltaSync implements Serializable {
// set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.
if (config.writeCommitCallbackOn() && HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) {
HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config.getProps());
HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config);
}
// Validate what deltastreamer assumes of write-config to be really safe
ValidationUtils.checkArgument(config.inlineCompactionEnabled() == cfg.isInlineCompactionEnabled(),
String.format("%s should be set to %s", INLINE_COMPACT_PROP, cfg.isInlineCompactionEnabled()));
String.format("%s should be set to %s", INLINE_COMPACT_PROP.key(), cfg.isInlineCompactionEnabled()));
ValidationUtils.checkArgument(!config.shouldAutoCommit(),
String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP, autoCommit));
String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP.key(), autoCommit));
ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes,
String.format("%s should be set to %s", COMBINE_BEFORE_INSERT_PROP, cfg.filterDupes));
String.format("%s should be set to %s", COMBINE_BEFORE_INSERT_PROP.key(), cfg.filterDupes));
ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(),
String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT_PROP, combineBeforeUpsert));
String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT_PROP.key(), combineBeforeUpsert));
return config;
}

View File

@@ -128,7 +128,7 @@ public class HoodieMultiTableDeltaStreamer {
Helpers.deepCopyConfigs(config, cfg);
String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
if (cfg.enableHiveSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), ""))) {
if (cfg.enableHiveSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY().key(), ""))) {
throw new HoodieException("Hive sync table field not provided!");
}
populateSchemaProviderProps(cfg, tableProperties);

View File

@@ -18,13 +18,13 @@
package org.apache.hudi.utilities.deser;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Map;
@@ -48,7 +48,7 @@ public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
super.configure(configs, isKey);
try {
TypedProperties props = getConvertToTypedProperties(configs);
sourceSchema = new Schema.Parser().parse(props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA()));
sourceSchema = new Schema.Parser().parse(props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key()));
} catch (Throwable e) {
throw new HoodieException(e);
}

View File

@@ -289,10 +289,10 @@ public class TimelineServerPerf implements Serializable {
@Parameter(names = {"--base-store-path", "-sp"},
description = "Directory where spilled view entries will be stored. Used for SPILLABLE_DISK storage type")
public String baseStorePathForFileGroups = FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR;
public String baseStorePathForFileGroups = FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue();
@Parameter(names = {"--rocksdb-path", "-rp"}, description = "Root directory for RocksDB")
public String rocksDBPath = FileSystemViewStorageConfig.DEFAULT_ROCKSDB_BASE_PATH;
public String rocksDBPath = FileSystemViewStorageConfig.ROCKSDB_BASE_PATH_PROP.defaultValue();
@Parameter(names = {"--wait-for-manual-queries", "-ww"})
public Boolean waitForManualQueries = false;
@@ -301,7 +301,7 @@ public class TimelineServerPerf implements Serializable {
public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
@Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system")
public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
public Boolean verifyMetadataFileListing = HoodieMetadataConfig.METADATA_VALIDATE_PROP.defaultValue();
@Parameter(names = {"--help", "-h"})
public Boolean help = false;

View File

@@ -60,7 +60,7 @@ public class AvroKafkaSource extends AvroSource {
super(props, sparkContext, sparkSession, schemaProvider);
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
String deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER(), "");
String deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER().key(), "");
if (deserializerClassName.isEmpty()) {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, KafkaAvroDeserializer.class);
@@ -70,7 +70,7 @@ public class AvroKafkaSource extends AvroSource {
if (schemaProvider == null) {
throw new HoodieIOException("SchemaProvider has to be set to use custom Deserializer");
}
props.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA(), schemaProvider.getSourceSchema().toString());
props.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), schemaProvider.getSourceSchema().toString());
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
LOG.error(error);

View File

@@ -115,9 +115,9 @@ public class HoodieIncrSource extends RowSource {
// Do Incr pull. Set end instant if available
DataFrameReader reader = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY(), instantEndpts.getRight());
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY().key(), instantEndpts.getRight());
Dataset<Row> source = reader.load(srcPath);

View File

@@ -34,7 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension();
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().getFileExtension();
@Test
public void testValidKafkaConnectPath() throws Exception {

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.deser;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.sources.helpers.SchemaTestProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -99,7 +100,7 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase {
public void testKafkaAvroSchemaDeserializer() {
byte[] bytesOrigRecord;
IndexedRecord avroRecord = createUserRecord();
config.put("hoodie.deltastreamer.source.kafka.value.deserializer.schema", origSchema.toString());
config.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), origSchema.toString());
KafkaAvroSchemaDeserializer avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
avroDeserializer.configure(new HashMap(config), false);
@@ -111,7 +112,7 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase {
byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField);
SchemaTestProvider.schemaToReturn.set(evolSchema);
config.put("hoodie.deltastreamer.source.kafka.value.deserializer.schema", evolSchema.toString());
config.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), evolSchema.toString());
avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
avroDeserializer.configure(new HashMap(config), false);
// record is serialized w/ evolved schema, and deserialized w/ evolved schema

View File

@@ -24,6 +24,7 @@ import java.util.ConcurrentModificationException;
import java.util.concurrent.ExecutorService;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -195,11 +196,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb1");
props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), "hive_trips");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(), "testdb1");
props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY().key(), "hive_trips");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(),
MultiPartKeysValueExtractor.class.getName());
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
@@ -280,11 +281,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
protected static void populateCommonHiveProps(TypedProperties props) {
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb2");
props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY(), "false");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(), "testdb2");
props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY().key(), "false");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(),
MultiPartKeysValueExtractor.class.getName());
}
@@ -738,7 +739,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.continuousMode = true;
cfg.tableType = tableType.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
@@ -769,7 +770,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// Prepare base dataset with some commits
@@ -796,7 +797,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
// re-init ingestion job to start sync service
@@ -822,14 +823,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
// re-init ingestion job
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// re-init backfill job
@@ -856,7 +857,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// Prepare base dataset with some commits
@@ -898,7 +899,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// checkpoint will move from 00000 to 00001 for this backfill job
cfgBackfillJob.checkpoint = commitMetadataForFirstInstant.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
backfillJob.sync();
@@ -987,9 +988,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true"));
cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2"));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), "true"));
cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key(), "2"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
@@ -1023,8 +1024,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfg.configs.add(String.format("%s=true", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
cfg.configs.add(String.format("%s=true", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key()));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
@@ -1178,7 +1179,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.load(inputStream);
}
assertEquals(props.getProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME), DummyAvroPayload.class.getName());
assertEquals(new HoodieConfig(props).getString(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP), DummyAvroPayload.class.getName());
}
@Test
@@ -1204,7 +1205,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.load(inputStream);
}
assertFalse(props.containsKey(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME));
assertFalse(props.containsKey(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP.key()));
}
@Test
@@ -1236,7 +1237,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg2.filterDupes = false;
cfg2.sourceLimit = 2000;
cfg2.operation = WriteOperationType.UPSERT;
cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
ds2.sync();
mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();

View File

@@ -112,8 +112,8 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
assertEquals(dfsBasePath + "/multi_table_dataset/uber_db/dummy_table_uber", executionContext.getConfig().targetBasePath);
assertEquals("uber_db.dummy_table_uber", executionContext.getConfig().targetTableName);
assertEquals("topic1", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP));
assertEquals("_row_key", executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()));
assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()));
assertEquals("_row_key", executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY().key()));
assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key()));
assertEquals("uber_hive_dummy_table", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP));
}
@@ -226,11 +226,11 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
tableExecutionContexts.forEach(tableExecutionContext -> {
switch (tableExecutionContext.getTableName()) {
case "dummy_table_short_trip":
String tableLevelKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
String tableLevelKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key());
assertEquals(TestHoodieDeltaStreamer.TestTableLevelGenerator.class.getName(), tableLevelKeyGeneratorClass);
break;
default:
String defaultKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
String defaultKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key());
assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), defaultKeyGeneratorClass);
}
});

View File

@@ -70,7 +70,7 @@ public class TestHoodieSnapshotCopier extends FunctionalTestHarness {
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc(), basePath, outputPath, true,
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE);
HoodieMetadataConfig.METADATA_VALIDATE_PROP.defaultValue());
// Nothing changed; we just bail out
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
@@ -124,7 +124,7 @@ public class TestHoodieSnapshotCopier extends FunctionalTestHarness {
// Do a snapshot copy
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc(), basePath, outputPath, false, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE);
HoodieMetadataConfig.METADATA_VALIDATE_PROP.defaultValue());
// Check results
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));