1
0

[HUDI-2651] Sync all the missing sql options for HoodieFlinkStreamer (#3903)

Co-authored-by: yuzhaojing <yuzhaojing@bytedance.com>
This commit is contained in:
yuzhaojing
2021-11-05 12:16:21 +08:00
committed by GitHub
parent 33436aa359
commit 2c1e259329

View File

@@ -35,6 +35,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hudi.configuration.FlinkOptions.PARTITION_FORMAT_DAY;
/**
* Configurations for Hoodie Flink streamer.
*/
@@ -124,6 +126,30 @@ public class FlinkStreamerConfig extends Configuration {
+ "writing. Default : Not set. Pass a comma-separated list of subclass names to chain the transformations.")
public List<String> transformerClassNames = null;
@Parameter(names = {"--metadata-enabled"}, description = "Enable the internal metadata table which serves table metadata like level file listings, default false.")
public Boolean metadataEnabled = false;
@Parameter(names = {"--metadata-compaction-delta_commits"}, description = "Max delta commits for metadata table to trigger compaction, default 10.")
public Integer metadataCompactionDeltaCommits = 10;
@Parameter(names = {"--write-partition-format"}, description = "Partition path format, default is 'yyyyMMdd'.")
public String writePartitionFormat = PARTITION_FORMAT_DAY;
@Parameter(names = {"--write-rate-limit"}, description = "Write record rate limit per second to prevent traffic jitter and improve stability, default 0 (no limit).")
public Long writeRateLimit = 0L;
@Parameter(names = {"--write-parquet-block-size"}, description = "Parquet RowGroup size. It's recommended to make this large enough that scan costs can be"
+ " amortized by packing enough column values into a single row group.")
public Integer writeParquetBlockSize = 120;
@Parameter(names = {"--write-parquet-max-file-size"}, description = "Target size for parquet files produced by Hudi write phases. "
+ "For DFS, this needs to be aligned with the underlying filesystem block size for optimal performance.")
public Integer writeParquetMaxFileSize = 120;
@Parameter(names = {"--parquet-page-size"}, description = "Parquet page size. Page is the unit of read within a parquet file. "
+ "Within a block, pages are compressed separately.")
public Integer parquetPageSize = 1;
/**
* Flink checkpoint interval.
*/
@@ -144,18 +170,18 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--partition-default-name"},
description = "The default partition name in case the dynamic partition column value is null/empty string")
public String partitionDefaultName = "__DEFAULT_PARTITION__";
public String partitionDefaultName = "default";
@Parameter(names = {"--index-bootstrap-enabled"},
description = "Whether to bootstrap the index state from existing hoodie table, default false")
public Boolean indexBootstrapEnabled = false;
@Parameter(names = {"--index-state-ttl"}, description = "Index state ttl in days, default 1.5 day")
public Double indexStateTtl = 1.5D;
@Parameter(names = {"--index-state-ttl"}, description = "Index state ttl in days, default stores the index permanently")
public Double indexStateTtl = 0D;
@Parameter(names = {"--index-global-enabled"}, description = "Whether to update index for the old partition path "
+ "if same key record with different partition path came in, default false")
public Boolean indexGlobalEnabled = false;
+ "if same key record with different partition path came in, default true")
public Boolean indexGlobalEnabled = true;
@Parameter(names = {"--index-partition-regex"},
description = "Whether to load partitions in state if partition path matching default *")
@@ -319,6 +345,13 @@ public class FlinkStreamerConfig extends Configuration {
conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
conf.setBoolean(FlinkOptions.METADATA_ENABLED, config.metadataEnabled);
conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, config.metadataCompactionDeltaCommits);
conf.setString(FlinkOptions.PARTITION_FORMAT, config.writePartitionFormat);
conf.setLong(FlinkOptions.WRITE_RATE_LIMIT, config.writeRateLimit);
conf.setInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE, config.writeParquetBlockSize);
conf.setInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, config.writeParquetMaxFileSize);
conf.setInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE, config.parquetPageSize);
if (!StringUtils.isNullOrEmpty(config.keygenClass)) {
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, config.keygenClass);
} else {