[HUDI-2709] Add more options when initializing table (#3939)
This commit is contained in:
@@ -28,6 +28,7 @@ import org.apache.hudi.common.config.TypedProperties;
|
|||||||
import org.apache.hudi.common.engine.EngineType;
|
import org.apache.hudi.common.engine.EngineType;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
@@ -210,7 +211,6 @@ public class StreamerUtil {
|
|||||||
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
||||||
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
||||||
.build())
|
.build())
|
||||||
.withKeyGenerator(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)) // needed by TwoToThreeUpgradeHandler
|
|
||||||
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
|
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
|
||||||
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
|
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
|
||||||
.withAutoCommit(false)
|
.withAutoCommit(false)
|
||||||
@@ -239,6 +239,8 @@ public class StreamerUtil {
|
|||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
// put all the set options
|
// put all the set options
|
||||||
flatConf.addAllToProperties(properties);
|
flatConf.addAllToProperties(properties);
|
||||||
|
// ugly: table keygen clazz, needed by TwoToThreeUpgradeHandler
|
||||||
|
properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), conf.getString(FlinkOptions.KEYGEN_CLASS_NAME));
|
||||||
// put all the default options
|
// put all the default options
|
||||||
for (ConfigOption<?> option : FlinkOptions.optionalOptions()) {
|
for (ConfigOption<?> option : FlinkOptions.optionalOptions()) {
|
||||||
if (!flatConf.contains(option) && option.hasDefaultValue()) {
|
if (!flatConf.contains(option) && option.hasDefaultValue()) {
|
||||||
@@ -268,9 +270,12 @@ public class StreamerUtil {
|
|||||||
.setTableName(conf.getString(FlinkOptions.TABLE_NAME))
|
.setTableName(conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
.setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null))
|
.setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null))
|
||||||
.setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
|
.setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
|
||||||
|
.setPreCombineField(OptionsResolver.getPreCombineField(conf))
|
||||||
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
|
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
|
||||||
.setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null))
|
.setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null))
|
||||||
.setPreCombineField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
.setKeyGeneratorClassProp(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME))
|
||||||
|
.setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING))
|
||||||
|
.setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING))
|
||||||
.setTimelineLayoutVersion(1)
|
.setTimelineLayoutVersion(1)
|
||||||
.initTable(hadoopConf, basePath);
|
.initTable(hadoopConf, basePath);
|
||||||
LOG.info("Table initialized under base path {}", basePath);
|
LOG.info("Table initialized under base path {}", basePath);
|
||||||
|
|||||||
Reference in New Issue
Block a user