1
0

[HUDI-2294] Adding virtual keys support to deltastreamer (#3450)

This commit is contained in:
Sivabalan Narayanan
2021-08-12 08:02:39 -04:00
committed by GitHub
parent c0fc9cdaf3
commit b651336454

View File

@@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -54,6 +55,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.UtilHelpers;
@@ -213,13 +215,12 @@ public class DeltaSync implements Serializable {
this.props = props; this.props = props;
this.userProvidedSchemaProvider = schemaProvider; this.userProvidedSchemaProvider = schemaProvider;
this.processedSchema = new SchemaSet(); this.processedSchema = new SchemaSet();
this.keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
refreshTimeline(); refreshTimeline();
// Register User Provided schema first // Register User Provided schema first
registerAvroSchemas(schemaProvider); registerAvroSchemas(schemaProvider);
this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames); this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames);
this.keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider)); this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
@@ -249,7 +250,6 @@ public class DeltaSync implements Serializable {
} else { } else {
this.commitTimelineOpt = Option.empty(); this.commitTimelineOpt = Option.empty();
String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator); String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator);
HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType) .setTableType(cfg.tableType)
.setTableName(cfg.targetTableName) .setTableName(cfg.targetTableName)
@@ -257,6 +257,11 @@ public class DeltaSync implements Serializable {
.setPayloadClassName(cfg.payloadClassName) .setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat) .setBaseFileFormat(cfg.baseFileFormat)
.setPartitionFields(partitionColumns) .setPartitionFields(partitionColumns)
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(),
Boolean.parseBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue())))
.setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS().key(),
SimpleKeyGenerator.class.getName()))
.setPreCombineField(cfg.sourceOrderingField) .setPreCombineField(cfg.sourceOrderingField)
.initTable(new Configuration(jssc.hadoopConfiguration()), .initTable(new Configuration(jssc.hadoopConfiguration()),
cfg.targetBasePath); cfg.targetBasePath);
@@ -356,6 +361,11 @@ public class DeltaSync implements Serializable {
.setPayloadClassName(cfg.payloadClassName) .setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat) .setBaseFileFormat(cfg.baseFileFormat)
.setPartitionFields(partitionColumns) .setPartitionFields(partitionColumns)
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(),
Boolean.parseBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue())))
.setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS().key(),
SimpleKeyGenerator.class.getName()))
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
} }