diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index ad3dd577a..e271c84d3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -80,18 +81,18 @@ public class CompactionPlanOperator extends AbstractStreamOperator table = writeClient.getHoodieTable(); - + private void scheduleCompaction(HoodieFlinkTable table, long checkpointId) throws IOException { // the last instant takes the highest priority. Option lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline() .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index f1102e849..f95f3e3ae 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -21,7 +21,6 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; @@ -46,7 +45,6 @@ public class HoodieFlinkCompactor { protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class); - @SuppressWarnings("unchecked, rawtypes") public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -73,17 +71,7 @@ public class HoodieFlinkCompactor { // rolls back inflight compaction first // condition: the schedule compaction is in INFLIGHT state for max delta seconds. - String curInstantTime = HoodieActiveTimeline.createNewInstantTime(); - int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS); - HoodieTimeline inflightCompactionTimeline = metaClient.getActiveTimeline() - .filterPendingCompactionTimeline() - .filter(instant -> - instant.getState() == HoodieInstant.State.INFLIGHT - && StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds); - inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { - writeClient.rollbackInflightCompaction(inflightInstant, table); - table.getMetaClient().reloadActiveTimeline(); - }); + CompactionUtil.rollbackCompaction(table, conf); // judge whether have operation // to compute the compaction instant time and do compaction. @@ -94,6 +82,8 @@ public class HoodieFlinkCompactor { LOG.info("No compaction plan for this job "); return; } + + table.getMetaClient().reloadActiveTimeline(); // generate compaction plan // should support configurable commit metadata HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 46d727262..3d98ce9f3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieIOException; @@ -29,6 +30,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hudi.table.HoodieFlinkTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,4 +98,19 @@ public class CompactionUtil { throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e); } } + + public static void rollbackCompaction(HoodieFlinkTable table, Configuration conf) { + String curInstantTime = HoodieActiveTimeline.createNewInstantTime(); + int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS); + HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline() + .filterPendingCompactionTimeline() + .filter(instant -> + instant.getState() == HoodieInstant.State.INFLIGHT + && StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds); + inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { + LOG.info("Rollback the pending compaction instant: " + inflightInstant); + table.rollback(table.getContext(), HoodieActiveTimeline.createNewInstantTime(), inflightInstant, true); + table.getMetaClient().reloadActiveTimeline(); + }); + } }