diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index e75fad5fd..26ac9f3ad 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -47,6 +47,7 @@ public class CleanFunction extends AbstractRichFunction private final Configuration conf; protected HoodieFlinkWriteClient writeClient; + private NonThrownExecutor executor; private volatile boolean isCleaning; @@ -60,7 +61,7 @@ public class CleanFunction extends AbstractRichFunction super.open(parameters); if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); - this.executor = new NonThrownExecutor(LOG); + this.executor = NonThrownExecutor.builder(LOG).build(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index f9c810861..feb348fe3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -32,7 +32,6 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.utils.CoordinatorExecutor; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; @@ -120,7 +119,7 @@ public class StreamWriteOperatorCoordinator /** * A single-thread executor to handle all the asynchronous jobs of the coordinator. */ - private CoordinatorExecutor executor; + private NonThrownExecutor executor; /** * A single-thread executor to handle asynchronous hive sync. @@ -165,7 +164,9 @@ public class StreamWriteOperatorCoordinator this.writeClient = StreamerUtil.createWriteClient(conf); this.tableState = TableState.create(conf); // start the executor - this.executor = new CoordinatorExecutor(this.context, LOG); + this.executor = NonThrownExecutor.builder(LOG) + .exceptionHook((errMsg, t) -> this.context.failJob(new HoodieException(errMsg, t))) + .waitForTasksFinish(true).build(); // start the executor if required if (tableState.syncHive) { initHiveSync(); @@ -290,7 +291,7 @@ public class StreamWriteOperatorCoordinator // ------------------------------------------------------------------------- private void initHiveSync() { - this.hiveSyncExecutor = new NonThrownExecutor(LOG, true); + this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); this.hiveSyncContext = HiveSyncContext.create(conf); } @@ -518,7 +519,7 @@ public class StreamWriteOperatorCoordinator } @VisibleForTesting - public void setExecutor(CoordinatorExecutor executor) throws Exception { + public void setExecutor(NonThrownExecutor executor) throws Exception { if (this.executor != null) { this.executor.close(); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 98726d273..560b5ffba 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -78,7 +78,7 @@ public class CompactFunction extends ProcessFunction doCompaction(instantTime, compactionOperation, collector), + (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)), "Execute compaction for instant %s from task %d", instantTime, taskID); } else { // executes the compaction task synchronously for batch mode. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java index 04449441c..398dfcf61 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java @@ -51,6 +51,13 @@ public class CompactionCommitEvent implements Serializable { public CompactionCommitEvent() { } + /** + * An event with NULL write statuses that represents a failed compaction. + */ + public CompactionCommitEvent(String instant, String fileId, int taskID) { + this(instant, fileId, null, taskID); + } + public CompactionCommitEvent(String instant, String fileId, List writeStatuses, int taskID) { this.instant = instant; this.fileId = fileId; @@ -58,6 +65,14 @@ public class CompactionCommitEvent implements Serializable { this.taskID = taskID; } + public boolean isFailed() { + return this.writeStatuses == null; + } + + // ------------------------------------------------------------------------- + // Getter/Setter + // ------------------------------------------------------------------------- + public void setInstant(String instant) { this.instant = instant; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index d90af2c32..0309278f4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -24,6 +24,8 @@ import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; @@ -65,6 +67,11 @@ public class CompactionCommitSink extends CleanFunction { */ private transient Map> commitBuffer; + /** + * The hoodie table. + */ + private transient HoodieFlinkTable table; + public CompactionCommitSink(Configuration conf) { super(conf); this.conf = conf; @@ -77,11 +84,17 @@ public class CompactionCommitSink extends CleanFunction { this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); } this.commitBuffer = new HashMap<>(); + this.table = this.writeClient.getHoodieTable(); } @Override public void invoke(CompactionCommitEvent event, Context context) throws Exception { final String instant = event.getInstant(); + if (event.isFailed()) { + // handle failure case + CompactionUtil.rollbackCompaction(table, event.getInstant()); + return; + } commitBuffer.computeIfAbsent(instant, k -> new HashMap<>()) .put(event.getFileId(), event); commitIfNecessary(instant, commitBuffer.get(instant).values()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index 9c0549ac8..f6dd241ec 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -67,10 +67,10 @@ public class CompactionPlanOperator extends AbstractStreamOperatorWe need this because the coordinator methods are called by - * the Job Manager's main thread (mailbox thread), executes the methods asynchronously - * to avoid blocking the main thread. - */ -public class CoordinatorExecutor extends NonThrownExecutor { - private final OperatorCoordinator.Context context; - - public CoordinatorExecutor(OperatorCoordinator.Context context, Logger logger) { - super(logger, true); - this.context = context; - } - - @Override - protected void exceptionHook(String actionString, Throwable t) { - this.context.failJob(new HoodieException(actionString, t)); - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java index 446cb854c..242b3ee0d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java @@ -18,16 +18,23 @@ package org.apache.hudi.sink.utils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; +import javax.annotation.Nullable; + +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * An executor service that catches all the throwable with logging. + * + *

A post-exception hook {@link ExceptionHook} can be defined on construction + * or on each execution. */ public class NonThrownExecutor implements AutoCloseable { private final Logger logger; @@ -37,19 +44,27 @@ public class NonThrownExecutor implements AutoCloseable { */ private final ExecutorService executor; + /** + * Exception hook for post-exception handling. + */ + @VisibleForTesting + protected final ExceptionHook exceptionHook; + /** * Flag saying whether to wait for the tasks finish on #close. */ - private final boolean waitForTaskFinishOnClose; + private final boolean waitForTasksFinish; - public NonThrownExecutor(Logger logger, boolean waitForTaskFinishOnClose) { + @VisibleForTesting + protected NonThrownExecutor(Logger logger, @Nullable ExceptionHook exceptionHook, boolean waitForTasksFinish) { this.executor = Executors.newSingleThreadExecutor(); this.logger = logger; - this.waitForTaskFinishOnClose = waitForTaskFinishOnClose; + this.exceptionHook = exceptionHook; + this.waitForTasksFinish = waitForTasksFinish; } - public NonThrownExecutor(Logger logger) { - this(logger, false); + public static Builder builder(Logger logger) { + return new Builder(logger); } /** @@ -59,6 +74,17 @@ public class NonThrownExecutor implements AutoCloseable { final ThrowingRunnable action, final String actionName, final Object... actionParams) { + execute(action, this.exceptionHook, actionName, actionParams); + } + + /** + * Run the action in a loop. + */ + public void execute( + final ThrowingRunnable action, + final ExceptionHook hook, + final String actionName, + final Object... actionParams) { executor.execute( () -> { @@ -73,19 +99,17 @@ public class NonThrownExecutor implements AutoCloseable { ExceptionUtils.rethrowIfFatalErrorOrOOM(t); final String errMsg = String.format("Executor executes action [%s] error", actionString); logger.error(errMsg, t); - exceptionHook(errMsg, t); + if (hook != null) { + hook.apply(errMsg, t); + } } }); } - protected void exceptionHook(String errMsg, Throwable t) { - // for sub-class to override. - } - @Override public void close() throws Exception { if (executor != null) { - if (waitForTaskFinishOnClose) { + if (waitForTasksFinish) { executor.shutdown(); } else { executor.shutdownNow(); @@ -95,4 +119,38 @@ public class NonThrownExecutor implements AutoCloseable { executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + public interface ExceptionHook { + void apply(String errMsg, Throwable t); + } + + /** + * Builder for {@link NonThrownExecutor}. + */ + public static class Builder { + private final Logger logger; + private ExceptionHook exceptionHook; + private boolean waitForTasksFinish = false; + + private Builder(Logger logger) { + this.logger = Objects.requireNonNull(logger); + } + + public NonThrownExecutor build() { + return new NonThrownExecutor(logger, exceptionHook, waitForTasksFinish); + } + + public Builder exceptionHook(ExceptionHook exceptionHook) { + this.exceptionHook = exceptionHook; + return this; + } + + public Builder waitForTasksFinish(boolean waitForTasksFinish) { + this.waitForTasksFinish = waitForTasksFinish; + return this; + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index e064a058e..e0056f9a1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -110,16 +110,26 @@ public class CompactionUtil { } } - public static void rollbackCompaction(HoodieFlinkTable table, Configuration conf) { - String curInstantTime = HoodieActiveTimeline.createNewInstantTime(); - int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS); + public static void rollbackCompaction(HoodieFlinkTable table, String instantTime) { + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(instantTime); + if (table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(inflightInstant)) { + LOG.warn("Rollback failed compaction instant: [" + instantTime + "]"); + table.rollbackInflightCompaction(inflightInstant); + } + } + + /** + * Force rolls back all the inflight compaction instants, especially for job failover restart. + * + * @param table The hoodie table + */ + public static void rollbackCompaction(HoodieFlinkTable table) { HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline() .filterPendingCompactionTimeline() .filter(instant -> - instant.getState() == HoodieInstant.State.INFLIGHT - && StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds); + instant.getState() == HoodieInstant.State.INFLIGHT); inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { - LOG.info("Rollback the inflight compaction instant: " + inflightInstant + " for timeout(" + deltaSeconds + "s)"); + LOG.info("Rollback the inflight compaction instant: " + inflightInstant + " for failover"); table.rollbackInflightCompaction(inflightInstant); table.getMetaClient().reloadActiveTimeline(); }); @@ -127,16 +137,25 @@ public class CompactionUtil { /** * Rolls back the earliest compaction if there exists. + * + *

Makes the strategy not that radical: firstly check whether there exists inflight compaction instants, + * rolls back the first inflight instant only if it has timed out. That means, if there are + * multiple timed out instants on the timeline, we only roll back the first one at a time. */ - public static void rollbackEarliestCompaction(HoodieFlinkTable table) { + public static void rollbackEarliestCompaction(HoodieFlinkTable table, Configuration conf) { Option earliestInflight = table.getActiveTimeline() .filterPendingCompactionTimeline() .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT).firstInstant(); if (earliestInflight.isPresent()) { - LOG.info("Rollback the inflight compaction instant: " + earliestInflight.get() + " for failover"); - table.rollbackInflightCompaction(earliestInflight.get()); - table.getMetaClient().reloadActiveTimeline(); + HoodieInstant instant = earliestInflight.get(); + String currentTime = HoodieActiveTimeline.createNewInstantTime(); + int timeout = conf.getInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS); + if (StreamerUtil.instantTimeDiffSeconds(currentTime, instant.getTimestamp()) >= timeout) { + LOG.info("Rollback the inflight compaction instant: " + instant + " for timeout(" + timeout + "s)"); + table.rollbackInflightCompaction(instant); + table.getMetaClient().reloadActiveTimeline(); + } } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java index 099dfd63f..7e84453aa 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java @@ -18,6 +18,8 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.exception.HoodieException; + import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.ThrowingRunnable; @@ -25,17 +27,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A mock {@link CoordinatorExecutor} that executes the actions synchronously. + * A mock {@link NonThrownExecutor} that executes the actions synchronously. */ -public class MockCoordinatorExecutor extends CoordinatorExecutor { +public class MockCoordinatorExecutor extends NonThrownExecutor { private static final Logger LOG = LoggerFactory.getLogger(MockCoordinatorExecutor.class); public MockCoordinatorExecutor(OperatorCoordinator.Context context) { - super(context, LOG); + super(LOG, (errMsg, t) -> context.failJob(new HoodieException(errMsg, t)), true); } @Override - public void execute(ThrowingRunnable action, String actionName, Object... actionParams) { + public void execute( + ThrowingRunnable action, + ExceptionHook hook, + String actionName, + Object... actionParams) { final String actionString = String.format(actionName, actionParams); try { action.run(); @@ -43,9 +49,12 @@ public class MockCoordinatorExecutor extends CoordinatorExecutor { } catch (Throwable t) { // if we have a JVM critical error, promote it immediately, there is a good // chance the - // logging or job failing will not succeed any more + // logging or job failing will not succeed anymore ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - exceptionHook(actionString, t); + final String errMsg = String.format("Executor executes action [%s] error", actionString); + if (hook != null) { + hook.apply(errMsg, t); + } } } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index 8b937073d..a5fed83ea 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -70,7 +70,6 @@ public class TestCompactionUtil { @Test void rollbackCompaction() { - conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0); List oriInstants = IntStream.range(0, 3) .mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList()); List instants = metaClient.getActiveTimeline() @@ -79,7 +78,7 @@ public class TestCompactionUtil { .getInstants() .collect(Collectors.toList()); assertThat("all the instants should be in pending state", instants.size(), is(3)); - CompactionUtil.rollbackCompaction(table, conf); + CompactionUtil.rollbackCompaction(table); boolean allRolledBack = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants() .allMatch(instant -> instant.getState() == HoodieInstant.State.REQUESTED); assertTrue(allRolledBack, "all the instants should be rolled back"); @@ -90,6 +89,7 @@ public class TestCompactionUtil { @Test void rollbackEarliestCompaction() { + conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0); List oriInstants = IntStream.range(0, 3) .mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList()); List instants = metaClient.getActiveTimeline() @@ -98,7 +98,7 @@ public class TestCompactionUtil { .getInstants() .collect(Collectors.toList()); assertThat("all the instants should be in pending state", instants.size(), is(3)); - CompactionUtil.rollbackEarliestCompaction(table); + CompactionUtil.rollbackEarliestCompaction(table, conf); long requestedCnt = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants() .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).count(); assertThat("Only the first instant expects to be rolled back", requestedCnt, is(1L));