From 1d6978cde4c98c82c184f6404f49ef0f4832fbd6 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 6 Jul 2021 14:11:20 +0800 Subject: [PATCH] [HUDI-2135] Add compaction schedule option for flink (#3226) --- .../hudi/configuration/FlinkOptions.java | 6 ++++ .../sink/StreamWriteOperatorCoordinator.java | 8 ++--- .../sink/compact/FlinkCompactionConfig.java | 20 +++++++++++-- .../sink/compact/HoodieFlinkCompactor.java | 30 ++++++++++++++----- .../org/apache/hudi/util/CompactionUtil.java | 9 ++++++ .../org/apache/hudi/util/StreamerUtil.java | 12 ++++++++ 6 files changed, 70 insertions(+), 15 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index b426cb150..e921b68aa 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -329,6 +329,12 @@ public class FlinkOptions { // Compaction Options // ------------------------------------------------------------------------ + public static final ConfigOption COMPACTION_SCHEDULE_ENABLED = ConfigOptions + .key("compaction.schedule.enabled") + .booleanType() + .defaultValue(true) // default true for MOR write + .withDescription("Schedule the compaction plan, enabled by default for MOR"); + public static final ConfigOption COMPACTION_ASYNC_ENABLED = ConfigOptions .key("compaction.async.enabled") .booleanType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 9c11a68d0..303532aa9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -108,9 +108,9 @@ public class StreamWriteOperatorCoordinator private final int parallelism; /** - * Whether to schedule asynchronous compaction task on finished checkpoints. + * Whether to schedule compaction plan on finished checkpoints. */ - private final boolean asyncCompaction; + private final boolean scheduleCompaction; /** * A single-thread executor to handle all the asynchronous jobs of the coordinator. @@ -144,7 +144,7 @@ public class StreamWriteOperatorCoordinator this.conf = conf; this.context = context; this.parallelism = context.currentParallelism(); - this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf); + this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); } @Override @@ -205,7 +205,7 @@ public class StreamWriteOperatorCoordinator final boolean committed = commitInstant(this.instant); if (committed) { // if async compaction is on, schedule the compaction - if (asyncCompaction) { + if (scheduleCompaction) { writeClient.scheduleCompaction(Option.empty()); } // start new instant. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index 26ad824ce..d8f96dbe3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -62,7 +62,7 @@ public class FlinkCompactionConfig extends Configuration { public Integer compactionDeltaSeconds = 3600; @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false) - public Boolean cleanAsyncEnable = false; + public Boolean cleanAsyncEnable = false; @Parameter(names = {"--clean-retain-commits"}, description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" @@ -89,12 +89,25 @@ public class FlinkCompactionConfig extends Configuration { @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1", required = false) public Integer compactionTasks = -1; + @Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. Schedule the compaction plan in this job.\n" + + "There is a risk of losing data when scheduling compaction outside the writer job.\n" + + "Scheduling compaction in the writer job and only let this job do the compaction execution is recommended.\n" + + "Default is false", required = false) + public Boolean schedule = false; + + public static final String SEQ_FIFO = "FIFO"; + public static final String SEQ_LIFO = "LIFO"; + @Parameter(names = {"--seq"}, description = "Compaction plan execution sequence, two options are supported:\n" + + "1). FIFO: execute the oldest plan first;\n" + + "2). LIFO: execute the latest plan first, by default LIFO", required = false) + public String compactionSeq = SEQ_LIFO; + /** * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. * The latter is more suitable for the table APIs. It reads all the properties * in the properties file (set by `--props` option) and cmd line options - * (set by `--hoodie-conf` option). - * */ + * (set by `--hoodie-conf` option). + */ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkCompactionConfig config) { org.apache.flink.configuration.Configuration conf = new Configuration(); @@ -111,6 +124,7 @@ public class FlinkCompactionConfig extends Configuration { conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable); // use synchronous compaction always conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule); return conf; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 8ee6c111e..edd5acf70 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -75,15 +75,29 @@ public class HoodieFlinkCompactor { // judge whether have operation // to compute the compaction instant time and do compaction. - String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); - boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); - if (!scheduled) { - // do nothing. - LOG.info("No compaction plan for this job "); - return; + if (cfg.schedule) { + String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); + boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + if (!scheduled) { + // do nothing. + LOG.info("No compaction plan for this job "); + return; + } } table.getMetaClient().reloadActiveTimeline(); + + // fetch the instant based on the configured execution sequence + HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); + Option requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant(); + if (!requested.isPresent()) { + // do nothing. + LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option"); + return; + } + + String compactionInstantTime = requested.get().getTimestamp(); // generate compaction plan // should support configurable commit metadata HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( @@ -92,7 +106,7 @@ public class HoodieFlinkCompactor { if (compactionPlan == null || (compactionPlan.getOperations() == null) || (compactionPlan.getOperations().isEmpty())) { // No compaction plan, do nothing and return. - LOG.info("No compaction plan for this job and instant " + compactionInstantTime); + LOG.info("No compaction plan for instant " + compactionInstantTime); return; } @@ -113,7 +127,7 @@ public class HoodieFlinkCompactor { // get compactionParallelism. int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 - ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); + ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) .name("compaction_source") diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index df856742e..93ad59e82 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.avro.Schema; @@ -35,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Locale; /** * Utilities for flink hudi compaction. @@ -106,4 +108,11 @@ public class CompactionUtil { table.getMetaClient().reloadActiveTimeline(); }); } + + /** + * Returns whether the execution sequence is LIFO. + */ + public static boolean isLIFO(String seq) { + return seq.toUpperCase(Locale.ROOT).equals(FlinkCompactionConfig.SEQ_LIFO); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index d69481dc0..b60e6aaca 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -246,6 +246,18 @@ public class StreamerUtil { && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED); } + /** + * Returns whether needs to schedule the compaction plan. + * + * @param conf The flink configuration. + */ + public static boolean needsScheduleCompaction(Configuration conf) { + return conf.getString(FlinkOptions.TABLE_TYPE) + .toUpperCase(Locale.ROOT) + .equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); + } + /** * Creates the meta client. */