1
0

[HUDI-2135] Add compaction schedule option for flink (#3226)

This commit is contained in:
Danny Chan
2021-07-06 14:11:20 +08:00
committed by GitHub
parent a4dcbb5c5a
commit 1d6978cde4
6 changed files with 70 additions and 15 deletions

View File

@@ -329,6 +329,12 @@ public class FlinkOptions {
// Compaction Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> COMPACTION_SCHEDULE_ENABLED = ConfigOptions
.key("compaction.schedule.enabled")
.booleanType()
.defaultValue(true) // default true for MOR write
.withDescription("Schedule the compaction plan, enabled by default for MOR");
public static final ConfigOption<Boolean> COMPACTION_ASYNC_ENABLED = ConfigOptions
.key("compaction.async.enabled")
.booleanType()

View File

@@ -108,9 +108,9 @@ public class StreamWriteOperatorCoordinator
private final int parallelism;
/**
* Whether to schedule asynchronous compaction task on finished checkpoints.
* Whether to schedule compaction plan on finished checkpoints.
*/
private final boolean asyncCompaction;
private final boolean scheduleCompaction;
/**
* A single-thread executor to handle all the asynchronous jobs of the coordinator.
@@ -144,7 +144,7 @@ public class StreamWriteOperatorCoordinator
this.conf = conf;
this.context = context;
this.parallelism = context.currentParallelism();
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
}
@Override
@@ -205,7 +205,7 @@ public class StreamWriteOperatorCoordinator
final boolean committed = commitInstant(this.instant);
if (committed) {
// if async compaction is on, schedule the compaction
if (asyncCompaction) {
if (scheduleCompaction) {
writeClient.scheduleCompaction(Option.empty());
}
// start new instant.

View File

@@ -62,7 +62,7 @@ public class FlinkCompactionConfig extends Configuration {
public Integer compactionDeltaSeconds = 3600;
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false)
public Boolean cleanAsyncEnable = false;
public Boolean cleanAsyncEnable = false;
@Parameter(names = {"--clean-retain-commits"},
description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
@@ -89,12 +89,25 @@ public class FlinkCompactionConfig extends Configuration {
@Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1", required = false)
public Integer compactionTasks = -1;
@Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. Schedule the compaction plan in this job.\n"
+ "There is a risk of losing data when scheduling compaction outside the writer job.\n"
+ "Scheduling compaction in the writer job and only let this job do the compaction execution is recommended.\n"
+ "Default is false", required = false)
public Boolean schedule = false;
public static final String SEQ_FIFO = "FIFO";
public static final String SEQ_LIFO = "LIFO";
@Parameter(names = {"--seq"}, description = "Compaction plan execution sequence, two options are supported:\n"
+ "1). FIFO: execute the oldest plan first;\n"
+ "2). LIFO: execute the latest plan first, by default LIFO", required = false)
public String compactionSeq = SEQ_LIFO;
/**
* Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties
* in the properties file (set by `--props` option) and cmd line options
* (set by `--hoodie-conf` option).
* */
* (set by `--hoodie-conf` option).
*/
public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkCompactionConfig config) {
org.apache.flink.configuration.Configuration conf = new Configuration();
@@ -111,6 +124,7 @@ public class FlinkCompactionConfig extends Configuration {
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
// use synchronous compaction always
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule);
return conf;
}

View File

@@ -75,15 +75,29 @@ public class HoodieFlinkCompactor {
// judge whether have operation
// to compute the compaction instant time and do compaction.
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
if (!scheduled) {
// do nothing.
LOG.info("No compaction plan for this job ");
return;
if (cfg.schedule) {
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
if (!scheduled) {
// do nothing.
LOG.info("No compaction plan for this job ");
return;
}
}
table.getMetaClient().reloadActiveTimeline();
// fetch the instant based on the configured execution sequence
HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant();
if (!requested.isPresent()) {
// do nothing.
LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
return;
}
String compactionInstantTime = requested.get().getTimestamp();
// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
@@ -92,7 +106,7 @@ public class HoodieFlinkCompactor {
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// No compaction plan, do nothing and return.
LOG.info("No compaction plan for this job and instant " + compactionInstantTime);
LOG.info("No compaction plan for instant " + compactionInstantTime);
return;
}
@@ -113,7 +127,7 @@ public class HoodieFlinkCompactor {
// get compactionParallelism.
int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
.name("compaction_source")

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.avro.Schema;
@@ -35,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Locale;
/**
* Utilities for flink hudi compaction.
@@ -106,4 +108,11 @@ public class CompactionUtil {
table.getMetaClient().reloadActiveTimeline();
});
}
/**
* Returns whether the execution sequence is LIFO.
*/
public static boolean isLIFO(String seq) {
return seq.toUpperCase(Locale.ROOT).equals(FlinkCompactionConfig.SEQ_LIFO);
}
}

View File

@@ -246,6 +246,18 @@ public class StreamerUtil {
&& conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
}
/**
* Returns whether needs to schedule the compaction plan.
*
* @param conf The flink configuration.
*/
public static boolean needsScheduleCompaction(Configuration conf) {
return conf.getString(FlinkOptions.TABLE_TYPE)
.toUpperCase(Locale.ROOT)
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
}
/**
* Creates the meta client.
*/