diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 6f8c1ff5b..65a95ed7c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -492,6 +492,12 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(3600) // default 1 hour .withDescription("Max delta seconds time needed to trigger compaction, default 1 hour"); + public static final ConfigOption COMPACTION_TIMEOUT_SECONDS = ConfigOptions + .key("compaction.timeout.seconds") + .intType() + .defaultValue(1200) // default 20 minutes + .withDescription("Max timeout time in seconds for online compaction to rollback, default 20 minutes"); + public static final ConfigOption COMPACTION_MAX_MEMORY = ConfigOptions .key("compaction.max_memory") .intType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 81ab836bb..76e669574 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -18,9 +18,6 @@ package org.apache.hudi.sink.bootstrap; -import org.apache.hudi.client.FlinkTaskContextSupplier; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieKey; @@ -37,9 +34,9 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.format.FormatUtils; +import org.apache.hudi.util.FlinkTables; import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; @@ -119,7 +116,7 @@ public class BootstrapOperator this.hadoopConf = StreamerUtil.getHadoopConf(); this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); - this.hoodieTable = getTable(); + this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext()); preLoadIndexRecords(); } @@ -146,13 +143,6 @@ public class BootstrapOperator output.collect((StreamRecord) element); } - private HoodieFlinkTable getTable() { - HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( - new SerializableConfiguration(this.hadoopConf), - new FlinkTaskContextSupplier(getRuntimeContext())); - return HoodieFlinkTable.create(this.writeConfig, context); - } - /** * Loads all the indices of give partition path into the backup state. * diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index 945d4288b..9c0549ac8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -19,7 +19,6 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -27,7 +26,7 @@ import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; -import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.FlinkTables; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; @@ -54,14 +53,10 @@ public class CompactionPlanOperator extends AbstractStreamOperator table, long checkpointId) throws IOException { // the last instant takes the highest priority. - Option lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline() - .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant(); - if (!lastRequested.isPresent()) { + Option firstRequested = table.getActiveTimeline().filterPendingCompactionTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant(); + if (!firstRequested.isPresent()) { // do nothing. LOG.info("No compaction plan for checkpoint " + checkpointId); return; } - String compactionInstantTime = lastRequested.get().getTimestamp(); + String compactionInstantTime = firstRequested.get().getTimestamp(); // generate compaction plan // should support configurable commit metadata diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 89ffef381..e064a058e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -112,19 +112,34 @@ public class CompactionUtil { public static void rollbackCompaction(HoodieFlinkTable table, Configuration conf) { String curInstantTime = HoodieActiveTimeline.createNewInstantTime(); - int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS); + int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS); HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline() .filterPendingCompactionTimeline() .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT && StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds); inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { - LOG.info("Rollback the pending compaction instant: " + inflightInstant); + LOG.info("Rollback the inflight compaction instant: " + inflightInstant + " for timeout(" + deltaSeconds + "s)"); table.rollbackInflightCompaction(inflightInstant); table.getMetaClient().reloadActiveTimeline(); }); } + /** + * Rolls back the earliest compaction if there exists. + */ + public static void rollbackEarliestCompaction(HoodieFlinkTable table) { + Option earliestInflight = table.getActiveTimeline() + .filterPendingCompactionTimeline() + .filter(instant -> + instant.getState() == HoodieInstant.State.INFLIGHT).firstInstant(); + if (earliestInflight.isPresent()) { + LOG.info("Rollback the inflight compaction instant: " + earliestInflight.get() + " for failover"); + table.rollbackInflightCompaction(earliestInflight.get()); + table.getMetaClient().reloadActiveTimeline(); + } + } + /** * Returns whether the execution sequence is LIFO. */ diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java new file mode 100644 index 000000000..6918a06b1 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieFlinkTable; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; + +import static org.apache.hudi.util.StreamerUtil.getHadoopConf; +import static org.apache.hudi.util.StreamerUtil.getHoodieClientConfig; + +/** + * Utilities for {@link org.apache.hudi.table.HoodieFlinkTable}. + */ +public class FlinkTables { + private FlinkTables() { + } + + /** + * Creates the hoodie flink table. + * + *

This expects to be used by client. + */ + public static HoodieFlinkTable createTable(Configuration conf, RuntimeContext runtimeContext) { + HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( + new SerializableConfiguration(getHadoopConf()), + new FlinkTaskContextSupplier(runtimeContext)); + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true); + return HoodieFlinkTable.create(writeConfig, context); + } + + /** + * Creates the hoodie flink table. + * + *

This expects to be used by client. + */ + public static HoodieFlinkTable createTable( + HoodieWriteConfig writeConfig, + org.apache.hadoop.conf.Configuration hadoopConf, + RuntimeContext runtimeContext) { + HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( + new SerializableConfiguration(hadoopConf), + new FlinkTaskContextSupplier(runtimeContext)); + return HoodieFlinkTable.create(writeConfig, context); + } + + /** + * Creates the hoodie flink table. + * + *

This expects to be used by driver. + */ + public static HoodieFlinkTable createTable(Configuration conf) { + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true, false); + return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index 473a33e8c..8b937073d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -20,7 +20,6 @@ package org.apache.hudi.utils; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -30,38 +29,90 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.FlinkTables; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test cases for {@link org.apache.hudi.util.CompactionUtil}. */ public class TestCompactionUtil { + private HoodieFlinkTable table; + private HoodieTableMetaClient metaClient; + private Configuration conf; + @TempDir File tempFile; - @Test - void rollbackCompaction() throws IOException { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); - conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, 0); - + @BeforeEach + void beforeEach() throws IOException { + this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); StreamerUtil.initTableIfNotExists(conf); + this.table = FlinkTables.createTable(conf); + this.metaClient = table.getMetaClient(); + } - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); - HoodieFlinkTable table = writeClient.getHoodieTable(); - HoodieTableMetaClient metaClient = table.getMetaClient(); + @Test + void rollbackCompaction() { + conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0); + List oriInstants = IntStream.range(0, 3) + .mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList()); + List instants = metaClient.getActiveTimeline() + .filterPendingCompactionTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT) + .getInstants() + .collect(Collectors.toList()); + assertThat("all the instants should be in pending state", instants.size(), is(3)); + CompactionUtil.rollbackCompaction(table, conf); + boolean allRolledBack = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants() + .allMatch(instant -> instant.getState() == HoodieInstant.State.REQUESTED); + assertTrue(allRolledBack, "all the instants should be rolled back"); + List actualInstants = metaClient.getActiveTimeline() + .filterPendingCompactionTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + assertThat(actualInstants, is(oriInstants)); + } + @Test + void rollbackEarliestCompaction() { + List oriInstants = IntStream.range(0, 3) + .mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList()); + List instants = metaClient.getActiveTimeline() + .filterPendingCompactionTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT) + .getInstants() + .collect(Collectors.toList()); + assertThat("all the instants should be in pending state", instants.size(), is(3)); + CompactionUtil.rollbackEarliestCompaction(table); + long requestedCnt = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).count(); + assertThat("Only the first instant expects to be rolled back", requestedCnt, is(1L)); + + String instantTime = metaClient.getActiveTimeline() + .filterPendingCompactionTimeline().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED) + .firstInstant().get().getTimestamp(); + assertThat(instantTime, is(oriInstants.get(0))); + } + + /** + * Generates a compaction plan on the timeline and returns its instant time. + */ + private String generateCompactionPlan() { HoodieCompactionOperation operation = new HoodieCompactionOperation(); HoodieCompactionPlan plan = new HoodieCompactionPlan(Collections.singletonList(operation), Collections.emptyMap(), 1); String instantTime = HoodieActiveTimeline.createNewInstantTime(); @@ -75,13 +126,7 @@ public class TestCompactionUtil { throw new HoodieIOException("Exception scheduling compaction", ioe); } metaClient.reloadActiveTimeline(); - HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().orElse(null); - assertThat(instant.getTimestamp(), is(instantTime)); - - CompactionUtil.rollbackCompaction(table, conf); - HoodieInstant rollbackInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get(); - assertThat(rollbackInstant.getState(), is(HoodieInstant.State.REQUESTED)); - assertThat(rollbackInstant.getTimestamp(), is(instantTime)); + return instantTime; } }