diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index a838af4ca..5272d20a1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -54,6 +55,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.utilities.UtilHelpers; @@ -213,13 +215,12 @@ public class DeltaSync implements Serializable { this.props = props; this.userProvidedSchemaProvider = schemaProvider; this.processedSchema = new SchemaSet(); - + this.keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); refreshTimeline(); // Register User Provided schema first registerAvroSchemas(schemaProvider); this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames); - this.keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider)); @@ -249,7 +250,6 @@ public class DeltaSync implements Serializable { } else { this.commitTimelineOpt = Option.empty(); String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator); - HoodieTableMetaClient.withPropertyBuilder() .setTableType(cfg.tableType) .setTableName(cfg.targetTableName) @@ -257,6 +257,11 @@ public class DeltaSync implements Serializable { .setPayloadClassName(cfg.payloadClassName) .setBaseFileFormat(cfg.baseFileFormat) .setPartitionFields(partitionColumns) + .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())) + .setPopulateMetaFields(props.getBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), + Boolean.parseBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()))) + .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS().key(), + SimpleKeyGenerator.class.getName())) .setPreCombineField(cfg.sourceOrderingField) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); @@ -356,6 +361,11 @@ public class DeltaSync implements Serializable { .setPayloadClassName(cfg.payloadClassName) .setBaseFileFormat(cfg.baseFileFormat) .setPartitionFields(partitionColumns) + .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())) + .setPopulateMetaFields(props.getBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), + Boolean.parseBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()))) + .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS().key(), + SimpleKeyGenerator.class.getName())) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); }