1
0

[MINOR] support different cleaning policy for flink (#5459)

This commit is contained in:
Gary Li
2022-04-29 09:48:44 +08:00
committed by GitHub
parent 4e928a6fe1
commit b27e8b51d8
3 changed files with 28 additions and 1 deletions

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.configuration;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -550,6 +551,13 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(true)
.withDescription("Whether to cleanup the old commits immediately on new commits, enabled by default");
public static final ConfigOption<String> CLEAN_POLICY = ConfigOptions
.key("clean.policy")
.stringType()
.defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.withDescription("Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS."
+ "Default is KEEP_LATEST_COMMITS.");
public static final ConfigOption<Integer> CLEAN_RETAIN_COMMITS = ConfigOptions
.key("clean.retain_commits")
.intType()
@@ -557,6 +565,12 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 30");
public static final ConfigOption<Integer> CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions
.key("clean.retain_file_versions")
.intType()
.defaultValue(5)// default 5 version
.withDescription("Number of file versions to retain. default 5");
public static final ConfigOption<Integer> ARCHIVE_MAX_COMMITS = ConfigOptions
.key("archive.max_commits")
.intType()

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.streamer;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.StringUtils;
@@ -260,11 +261,20 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
public Boolean cleanAsyncEnabled = true;
@Parameter(names = {"--clean-policy"},
description = "Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS."
+ "Default is KEEP_LATEST_COMMITS.")
public String cleanPolicy = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
@Parameter(names = {"--clean-retain-commits"},
description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10")
public Integer cleanRetainCommits = 10;
@Parameter(names = {"--clean-retain-file-versions"},
description = "Number of file versions to retain. Each file group will be retained for this number of version. default 5")
public Integer cleanRetainFileVersions = 5;
@Parameter(names = {"--archive-max-commits"},
description = "Max number of commits to keep before archiving older commits into a sequential log, default 30")
public Integer archiveMaxCommits = 30;
@@ -392,7 +402,9 @@ public class FlinkStreamerConfig extends Configuration {
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled);
conf.setString(FlinkOptions.CLEAN_POLICY, config.cleanPolicy);
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
conf.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, config.cleanRetainFileVersions);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits);
conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, config.hiveSyncEnabled);

View File

@@ -176,11 +176,12 @@ public class StreamerUtil {
.withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS))
.withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED))
.retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS))
.retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS))
// override and hardcode to 20,
// actually Flink cleaning is always with parallelism 1 now
.withCleanerParallelism(20)
.archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY)))
.build())
.withMemoryConfig(
HoodieMemoryConfig.newBuilder()