diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index e271c84d3..a17ea0404 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -84,7 +84,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator operations = this.compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); LOG.info("CompactionPlanFunction compacting " + operations + " files"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index edd5acf70..84b68136d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -69,10 +69,6 @@ public class HoodieFlinkCompactor { HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); HoodieFlinkTable table = writeClient.getHoodieTable(); - // rolls back inflight compaction first - // condition: the schedule compaction is in INFLIGHT state for max delta seconds. - CompactionUtil.rollbackCompaction(table, conf); - // judge whether have operation // to compute the compaction instant time and do compaction. if (cfg.schedule) { @@ -83,13 +79,11 @@ public class HoodieFlinkCompactor { LOG.info("No compaction plan for this job "); return; } + table.getMetaClient().reloadActiveTimeline(); } - table.getMetaClient().reloadActiveTimeline(); - // fetch the instant based on the configured execution sequence - HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline() - .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); + HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline(); Option requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant(); if (!requested.isPresent()) { // do nothing. @@ -98,6 +92,14 @@ public class HoodieFlinkCompactor { } String compactionInstantTime = requested.get().getTimestamp(); + + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); + if (timeline.containsInstant(inflightInstant)) { + LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]"); + writeClient.rollbackInflightCompaction(inflightInstant, table); + table.getMetaClient().reloadActiveTimeline(); + } + // generate compaction plan // should support configurable commit metadata HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( @@ -129,6 +131,10 @@ public class HoodieFlinkCompactor { int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); + // Mark instant as compaction inflight + table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); + table.getMetaClient().reloadActiveTimeline(); + env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) .name("compaction_source") .uid("uid_compaction_source") diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 0df028600..a0de79d91 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -18,6 +18,7 @@ package org.apache.hudi.util; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -94,7 +95,7 @@ public class CompactionUtil { } } - public static void rollbackCompaction(HoodieFlinkTable table, Configuration conf) { + public static void rollbackCompaction(HoodieFlinkTable table, HoodieFlinkWriteClient writeClient, Configuration conf) { String curInstantTime = HoodieActiveTimeline.createNewInstantTime(); int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS); HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline() @@ -104,7 +105,7 @@ public class CompactionUtil { && StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds); inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { LOG.info("Rollback the pending compaction instant: " + inflightInstant); - table.rollback(table.getContext(), HoodieActiveTimeline.createNewInstantTime(), inflightInstant, true); + writeClient.rollbackInflightCompaction(inflightInstant, table); table.getMetaClient().reloadActiveTimeline(); }); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java new file mode 100644 index 000000000..9bd03e115 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test cases for {@link org.apache.hudi.util.CompactionUtil}. + */ +public class TestCompactionUtil { + + @TempDir + File tempFile; + + @Test + void rollbackCompaction() throws IOException { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, 0); + + StreamerUtil.initTableIfNotExists(conf); + + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + HoodieFlinkTable table = writeClient.getHoodieTable(); + HoodieTableMetaClient metaClient = table.getMetaClient(); + + HoodieCompactionOperation operation = new HoodieCompactionOperation(); + HoodieCompactionPlan plan = new HoodieCompactionPlan(Collections.singletonList(operation), Collections.emptyMap(), 1); + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + HoodieInstant compactionInstant = + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); + try { + metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant, + TimelineMetadataUtils.serializeCompactionPlan(plan)); + table.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant); + } catch (IOException ioe) { + throw new HoodieIOException("Exception scheduling compaction", ioe); + } + metaClient.reloadActiveTimeline(); + HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().orElse(null); + assertThat(instant.getTimestamp(), is(instantTime)); + + CompactionUtil.rollbackCompaction(table, writeClient, conf); + HoodieInstant rollbackInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get(); + assertThat(rollbackInstant.getState(), is(HoodieInstant.State.REQUESTED)); + assertThat(rollbackInstant.getTimestamp(), is(instantTime)); + } +} +