diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 0ba2351b9..f1102e849 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -21,6 +21,7 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; @@ -45,6 +46,7 @@ public class HoodieFlinkCompactor { protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class); + @SuppressWarnings("unchecked, rawtypes") public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -66,17 +68,32 @@ public class HoodieFlinkCompactor { // set table schema CompactionUtil.setAvroSchema(conf, metaClient); + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + HoodieFlinkTable table = writeClient.getHoodieTable(); + + // rolls back inflight compaction first + // condition: the schedule compaction is in INFLIGHT state for max delta seconds. + String curInstantTime = HoodieActiveTimeline.createNewInstantTime(); + int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS); + HoodieTimeline inflightCompactionTimeline = metaClient.getActiveTimeline() + .filterPendingCompactionTimeline() + .filter(instant -> + instant.getState() == HoodieInstant.State.INFLIGHT + && StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds); + inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { + writeClient.rollbackInflightCompaction(inflightInstant, table); + table.getMetaClient().reloadActiveTimeline(); + }); + // judge whether have operation // to compute the compaction instant time and do compaction. String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); if (!scheduled) { // do nothing. LOG.info("No compaction plan for this job "); return; } - HoodieFlinkTable table = writeClient.getHoodieTable(); // generate compaction plan // should support configurable commit metadata HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index fcbdb21b5..b49343d01 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -312,4 +312,11 @@ public class StreamerUtil { long median = low + (high - low) / 2; return String.valueOf(median); } + + /** + * Returns the time interval in seconds between the given instant time. + */ + public static long instantTimeDiff(String newInstantTime, String oldInstantTime) { + return Long.parseLong(newInstantTime) - Long.parseLong(oldInstantTime); + } }