[HUDI-2067] Sync FlinkOptions config to FlinkStreamerConfig (#3151)
This commit is contained in:
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
|
||||
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
@@ -130,6 +131,150 @@ public class FlinkStreamerConfig extends Configuration {
|
||||
@Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.")
|
||||
public Integer writeTaskNum = 4;
|
||||
|
||||
@Parameter(names = {"--partition-default-name"},
|
||||
description = "The default partition name in case the dynamic partition column value is null/empty string")
|
||||
public String partitionDefaultName = "__DEFAULT_PARTITION__";
|
||||
|
||||
@Parameter(names = {"--index-bootstrap-enabled"},
|
||||
description = "Whether to bootstrap the index state from existing hoodie table, default false")
|
||||
public Boolean indexBootstrapEnabled = false;
|
||||
|
||||
@Parameter(names = {"--index-state-ttl"}, description = "Index state ttl in days, default 1.5 day")
|
||||
public Double indexStateTtl = 1.5D;
|
||||
|
||||
@Parameter(names = {"--index-global-enabled"}, description = "Whether to update index for the old partition path "
|
||||
+ "if same key record with different partition path came in, default false")
|
||||
public Boolean indexGlobalEnabled = false;
|
||||
|
||||
@Parameter(names = {"--index-partition-regex"},
|
||||
description = "Whether to load partitions in state if partition path matching, default *")
|
||||
public String indexPartitionRegex = ".*";
|
||||
|
||||
@Parameter(names = {"--avro-schema-path"}, description = "Avro schema file path, the parsed schema is used for deserialization")
|
||||
public String avroSchemaPath = "";
|
||||
|
||||
@Parameter(names = {"--avro-schema"}, description = "Avro schema string, the parsed schema is used for deserialization")
|
||||
public String avroSchema = "";
|
||||
|
||||
@Parameter(names = {"--utc-timezone"}, description = "Use UTC timezone or local timezone to the conversion between epoch"
|
||||
+ " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
|
||||
+ " use UTC timezone, by default true")
|
||||
public Boolean utcTimezone = true;
|
||||
|
||||
@Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
|
||||
public Boolean writePartitionUrlEncode;
|
||||
|
||||
@Parameter(names = {"--write-task-max-size"}, description = "Maximum memory in MB for a write task, when the threshold hits,\n"
|
||||
+ "it flushes the max size data bucket to avoid OOM, default 1GB")
|
||||
public Double writeTaskMaxSize = 1024D;
|
||||
|
||||
@Parameter(names = {"--write-batch-size"},
|
||||
description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB")
|
||||
public Double writeBatchSize = 64D;
|
||||
|
||||
@Parameter(names = {"--write-log-block-size"}, description = "Max log block size in MB for log file, default 128MB")
|
||||
public Integer writeLogBlockSize = 128;
|
||||
|
||||
@Parameter(names = {"--write-log-max-size"},
|
||||
description = "Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB")
|
||||
public Integer writeLogMaxSize = 1024;
|
||||
|
||||
@Parameter(names = {"--write-merge-max-memory"}, description = "Max memory in MB for merge, default 100MB")
|
||||
public Integer writeMergeMaxMemory = 100;
|
||||
|
||||
@Parameter(names = {"--compaction-async-enabled"}, description = "Async Compaction, enabled by default for MOR")
|
||||
public Boolean compactionAsyncEnabled = true;
|
||||
|
||||
@Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is 10")
|
||||
public Integer compactionTasks = 10;
|
||||
|
||||
@Parameter(names = {"--compaction-trigger-strategy"},
|
||||
description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
|
||||
+ "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
|
||||
+ "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
|
||||
+ "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
|
||||
+ "Default is 'num_commits'")
|
||||
public String compactionTriggerStrategy = FlinkOptions.NUM_COMMITS;
|
||||
|
||||
@Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits")
|
||||
public Integer compactionDeltaCommits = 5;
|
||||
|
||||
@Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta seconds time needed to trigger compaction, default 1 hour")
|
||||
public Integer compactionDeltaSeconds = 3600;
|
||||
|
||||
@Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB")
|
||||
public Integer compactionMaxMemory = 100;
|
||||
|
||||
@Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 5 GB")
|
||||
public Long compactionTargetIo = 5120L;
|
||||
|
||||
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
|
||||
public Boolean cleanAsyncEnabled = true;
|
||||
|
||||
@Parameter(names = {"--clean-retain-commits"},
|
||||
description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
|
||||
+ "This also directly translates into how much you can incrementally pull on this table, default 10")
|
||||
public Integer cleanRetainCommits = 10;
|
||||
|
||||
@Parameter(names = {"--archive-max-commits"},
|
||||
description = "Max number of commits to keep before archiving older commits into a sequential log, default 30")
|
||||
public Integer archiveMaxCommits = 30;
|
||||
|
||||
@Parameter(names = {"--archive-min-commits"},
|
||||
description = "Min number of commits to keep before archiving older commits into a sequential log, default 20")
|
||||
public Integer archiveMinCommits = 20;
|
||||
|
||||
@Parameter(names = {"--hive-sync-enable"}, description = "Asynchronously sync Hive meta to HMS, default false")
|
||||
public Boolean hiveSyncEnabled = false;
|
||||
|
||||
@Parameter(names = {"--hive-sync-db"}, description = "Database name for hive sync, default 'default'")
|
||||
public String hiveSyncDb = "default";
|
||||
|
||||
@Parameter(names = {"--hive-sync-table"}, description = "Table name for hive sync, default 'unknown'")
|
||||
public String hiveSyncTable = "unknown";
|
||||
|
||||
@Parameter(names = {"--hive-sync-file-format"}, description = "File format for hive sync, default 'PARQUET'")
|
||||
public String hiveSyncFileFormat = "PARQUET";
|
||||
|
||||
@Parameter(names = {"--hive-sync-username"}, description = "Username for hive sync, default 'hive'")
|
||||
public String hiveSyncUsername = "hive";
|
||||
|
||||
@Parameter(names = {"--hive-sync-password"}, description = "Password for hive sync, default 'hive'")
|
||||
public String hiveSyncPassword = "hive";
|
||||
|
||||
@Parameter(names = {"--hive-sync-jdbc-url"}, description = "Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'")
|
||||
public String hiveSyncJdbcUrl = "jdbc:hive2://localhost:10000";
|
||||
|
||||
@Parameter(names = {"--hive-sync-metastore-uris"}, description = "Metastore uris for hive sync, default ''")
|
||||
public String hiveSyncMetastoreUri = "";
|
||||
|
||||
@Parameter(names = {"--hive-sync-partition-fields"}, description = "Partition fields for hive sync, default ''")
|
||||
public String hiveSyncPartitionFields = "";
|
||||
|
||||
@Parameter(names = {"--hive-sync-partition-extractor-class"}, description = "Tool to extract the partition value from HDFS path, "
|
||||
+ "default 'SlashEncodedDayPartitionValueExtractor'")
|
||||
public String hiveSyncPartitionExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
|
||||
|
||||
@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
|
||||
public Boolean hiveSyncAssumeDatePartition = false;
|
||||
|
||||
@Parameter(names = {"--hive-sync-use-jdbc"}, description = "Use JDBC when hive synchronization is enabled, default true")
|
||||
public Boolean hiveSyncUseJdbc = true;
|
||||
|
||||
@Parameter(names = {"--hive-sync-auto-create-db"}, description = "Auto create hive database if it does not exists, default true")
|
||||
public Boolean hiveSyncAutoCreateDb = true;
|
||||
|
||||
@Parameter(names = {"--hive-sync-ignore-exceptions"}, description = "Ignore exceptions during hive synchronization, default false")
|
||||
public Boolean hiveSyncIgnoreExceptions = false;
|
||||
|
||||
@Parameter(names = {"--hive-sync-skip-ro-suffix"}, description = "Skip the _ro suffix for Read optimized table when registering, default false")
|
||||
public Boolean hiveSyncSkipRoSuffix = false;
|
||||
|
||||
@Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
|
||||
+ "Disabled by default for backward compatibility.")
|
||||
public Boolean hiveSyncSupportTimestamp = false;
|
||||
|
||||
|
||||
/**
|
||||
* Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}.
|
||||
* The latter is more suitable for the table APIs. It reads all the properties
|
||||
@@ -161,7 +306,47 @@ public class FlinkStreamerConfig extends Configuration {
|
||||
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
|
||||
}
|
||||
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
|
||||
|
||||
conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, config.partitionDefaultName);
|
||||
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, config.indexBootstrapEnabled);
|
||||
conf.setDouble(FlinkOptions.INDEX_STATE_TTL, config.indexStateTtl);
|
||||
conf.setBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED, config.indexGlobalEnabled);
|
||||
conf.setString(FlinkOptions.INDEX_PARTITION_REGEX, config.indexPartitionRegex);
|
||||
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.avroSchemaPath);
|
||||
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, config.avroSchema);
|
||||
conf.setBoolean(FlinkOptions.UTC_TIMEZONE, config.utcTimezone);
|
||||
conf.setBoolean(FlinkOptions.PARTITION_PATH_URL_ENCODE, config.writePartitionUrlEncode);
|
||||
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, config.writeTaskMaxSize);
|
||||
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, config.writeBatchSize);
|
||||
conf.setInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE, config.writeLogBlockSize);
|
||||
conf.setInteger(FlinkOptions.WRITE_LOG_MAX_SIZE, config.writeLogMaxSize);
|
||||
conf.setInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY, config.writeMergeMaxMemory);
|
||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, config.compactionAsyncEnabled);
|
||||
conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks);
|
||||
conf.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, config.compactionTriggerStrategy);
|
||||
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, config.compactionDeltaCommits);
|
||||
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
|
||||
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
|
||||
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
|
||||
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled);
|
||||
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
|
||||
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
|
||||
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits);
|
||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, config.hiveSyncEnabled);
|
||||
conf.setString(FlinkOptions.HIVE_SYNC_DB, config.hiveSyncDb);
|
||||
conf.setString(FlinkOptions.HIVE_SYNC_TABLE, config.hiveSyncTable);
|
||||
conf.setString(FlinkOptions.HIVE_SYNC_FILE_FORMAT, config.hiveSyncFileFormat);
|
||||
conf.setString(FlinkOptions.HIVE_SYNC_USERNAME, config.hiveSyncUsername);
|
||||
conf.setString(FlinkOptions.HIVE_SYNC_PASSWORD, config.hiveSyncPassword);
|
||||
conf.setString(FlinkOptions.HIVE_SYNC_JDBC_URL, config.hiveSyncJdbcUrl);
|
||||
conf.setString(FlinkOptions.HIVE_SYNC_METASTORE_URIS, config.hiveSyncMetastoreUri);
|
||||
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS, config.hiveSyncPartitionFields);
|
||||
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS, config.hiveSyncPartitionExtractorClass);
|
||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION, config.hiveSyncAssumeDatePartition);
|
||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC, config.hiveSyncUseJdbc);
|
||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB, config.hiveSyncAutoCreateDb);
|
||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS, config.hiveSyncIgnoreExceptions);
|
||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX, config.hiveSyncSkipRoSuffix);
|
||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP, config.hiveSyncSupportTimestamp);
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user