diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 2cbe152cc..de2218acb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -35,6 +35,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.hudi.configuration.FlinkOptions.PARTITION_FORMAT_DAY; + /** * Configurations for Hoodie Flink streamer. */ @@ -124,6 +126,30 @@ public class FlinkStreamerConfig extends Configuration { + "writing. Default : Not set. Pass a comma-separated list of subclass names to chain the transformations.") public List transformerClassNames = null; + @Parameter(names = {"--metadata-enabled"}, description = "Enable the internal metadata table which serves table metadata like level file listings, default false.") + public Boolean metadataEnabled = false; + + @Parameter(names = {"--metadata-compaction-delta_commits"}, description = "Max delta commits for metadata table to trigger compaction, default 10.") + public Integer metadataCompactionDeltaCommits = 10; + + @Parameter(names = {"--write-partition-format"}, description = "Partition path format, default is 'yyyyMMdd'.") + public String writePartitionFormat = PARTITION_FORMAT_DAY; + + @Parameter(names = {"--write-rate-limit"}, description = "Write record rate limit per second to prevent traffic jitter and improve stability, default 0 (no limit).") + public Long writeRateLimit = 0L; + + @Parameter(names = {"--write-parquet-block-size"}, description = "Parquet RowGroup size. It's recommended to make this large enough that scan costs can be" + + " amortized by packing enough column values into a single row group.") + public Integer writeParquetBlockSize = 120; + + @Parameter(names = {"--write-parquet-max-file-size"}, description = "Target size for parquet files produced by Hudi write phases. " + + "For DFS, this needs to be aligned with the underlying filesystem block size for optimal performance.") + public Integer writeParquetMaxFileSize = 120; + + @Parameter(names = {"--parquet-page-size"}, description = "Parquet page size. Page is the unit of read within a parquet file. " + + "Within a block, pages are compressed separately.") + public Integer parquetPageSize = 1; + /** * Flink checkpoint interval. */ @@ -144,18 +170,18 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--partition-default-name"}, description = "The default partition name in case the dynamic partition column value is null/empty string") - public String partitionDefaultName = "__DEFAULT_PARTITION__"; + public String partitionDefaultName = "default"; @Parameter(names = {"--index-bootstrap-enabled"}, description = "Whether to bootstrap the index state from existing hoodie table, default false") public Boolean indexBootstrapEnabled = false; - @Parameter(names = {"--index-state-ttl"}, description = "Index state ttl in days, default 1.5 day") - public Double indexStateTtl = 1.5D; + @Parameter(names = {"--index-state-ttl"}, description = "Index state ttl in days, default stores the index permanently") + public Double indexStateTtl = 0D; @Parameter(names = {"--index-global-enabled"}, description = "Whether to update index for the old partition path " - + "if same key record with different partition path came in, default false") - public Boolean indexGlobalEnabled = false; + + "if same key record with different partition path came in, default true") + public Boolean indexGlobalEnabled = true; @Parameter(names = {"--index-partition-regex"}, description = "Whether to load partitions in state if partition path matching, default *") @@ -319,6 +345,13 @@ public class FlinkStreamerConfig extends Configuration { conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors); conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField); conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField); + conf.setBoolean(FlinkOptions.METADATA_ENABLED, config.metadataEnabled); + conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, config.metadataCompactionDeltaCommits); + conf.setString(FlinkOptions.PARTITION_FORMAT, config.writePartitionFormat); + conf.setLong(FlinkOptions.WRITE_RATE_LIMIT, config.writeRateLimit); + conf.setInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE, config.writeParquetBlockSize); + conf.setInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, config.writeParquetMaxFileSize); + conf.setInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE, config.parquetPageSize); if (!StringUtils.isNullOrEmpty(config.keygenClass)) { conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, config.keygenClass); } else {