1
0

[HUDI-2654] Add compaction failed event(part2) (#3896)

This commit is contained in:
Danny Chan
2021-10-31 17:51:11 +08:00
committed by GitHub
parent 92a3c458bd
commit 87c6f9cd07
11 changed files with 162 additions and 89 deletions

View File

@@ -47,6 +47,7 @@ public class CleanFunction<T> extends AbstractRichFunction
private final Configuration conf;
protected HoodieFlinkWriteClient writeClient;
private NonThrownExecutor executor;
private volatile boolean isCleaning;
@@ -60,7 +61,7 @@ public class CleanFunction<T> extends AbstractRichFunction
super.open(parameters);
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
this.executor = new NonThrownExecutor(LOG);
this.executor = NonThrownExecutor.builder(LOG).build();
}
}

View File

@@ -32,7 +32,6 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.CoordinatorExecutor;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
@@ -120,7 +119,7 @@ public class StreamWriteOperatorCoordinator
/**
* A single-thread executor to handle all the asynchronous jobs of the coordinator.
*/
private CoordinatorExecutor executor;
private NonThrownExecutor executor;
/**
* A single-thread executor to handle asynchronous hive sync.
@@ -165,7 +164,9 @@ public class StreamWriteOperatorCoordinator
this.writeClient = StreamerUtil.createWriteClient(conf);
this.tableState = TableState.create(conf);
// start the executor
this.executor = new CoordinatorExecutor(this.context, LOG);
this.executor = NonThrownExecutor.builder(LOG)
.exceptionHook((errMsg, t) -> this.context.failJob(new HoodieException(errMsg, t)))
.waitForTasksFinish(true).build();
// start the executor if required
if (tableState.syncHive) {
initHiveSync();
@@ -290,7 +291,7 @@ public class StreamWriteOperatorCoordinator
// -------------------------------------------------------------------------
private void initHiveSync() {
this.hiveSyncExecutor = new NonThrownExecutor(LOG, true);
this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
this.hiveSyncContext = HiveSyncContext.create(conf);
}
@@ -518,7 +519,7 @@ public class StreamWriteOperatorCoordinator
}
@VisibleForTesting
public void setExecutor(CoordinatorExecutor executor) throws Exception {
public void setExecutor(NonThrownExecutor executor) throws Exception {
if (this.executor != null) {
this.executor.close();
}

View File

@@ -78,7 +78,7 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
if (this.asyncCompaction) {
this.executor = new NonThrownExecutor(LOG);
this.executor = NonThrownExecutor.builder(LOG).build();
}
}
@@ -90,6 +90,7 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doCompaction(instantTime, compactionOperation, collector),
(errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
"Execute compaction for instant %s from task %d", instantTime, taskID);
} else {
// executes the compaction task synchronously for batch mode.

View File

@@ -51,6 +51,13 @@ public class CompactionCommitEvent implements Serializable {
public CompactionCommitEvent() {
}
/**
* An event with NULL write statuses that represents a failed compaction.
*/
public CompactionCommitEvent(String instant, String fileId, int taskID) {
this(instant, fileId, null, taskID);
}
public CompactionCommitEvent(String instant, String fileId, List<WriteStatus> writeStatuses, int taskID) {
this.instant = instant;
this.fileId = fileId;
@@ -58,6 +65,14 @@ public class CompactionCommitEvent implements Serializable {
this.taskID = taskID;
}
public boolean isFailed() {
return this.writeStatuses == null;
}
// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------
public void setInstant(String instant) {
this.instant = instant;
}

View File

@@ -24,6 +24,8 @@ import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
@@ -65,6 +67,11 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
*/
private transient Map<String, Map<String, CompactionCommitEvent>> commitBuffer;
/**
* The hoodie table.
*/
private transient HoodieFlinkTable<?> table;
public CompactionCommitSink(Configuration conf) {
super(conf);
this.conf = conf;
@@ -77,11 +84,17 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
}
this.commitBuffer = new HashMap<>();
this.table = this.writeClient.getHoodieTable();
}
@Override
public void invoke(CompactionCommitEvent event, Context context) throws Exception {
final String instant = event.getInstant();
if (event.isFailed()) {
// handle failure case
CompactionUtil.rollbackCompaction(table, event.getInstant());
return;
}
commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
.put(event.getFileId(), event);
commitIfNecessary(instant, commitBuffer.get(instant).values());

View File

@@ -67,10 +67,10 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
public void open() throws Exception {
super.open();
this.table = FlinkTables.createTable(conf, getRuntimeContext());
// when starting up, rolls back the first inflight compaction instant if there exists,
// the instant is the next one to schedule for scheduling task because the compaction instants are
// when starting up, rolls back all the inflight compaction instants if there exists,
// these instants are in priority for scheduling task because the compaction instants are
// scheduled from earliest(FIFO sequence).
CompactionUtil.rollbackEarliestCompaction(this.table);
CompactionUtil.rollbackCompaction(table);
}
@Override
@@ -85,9 +85,11 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
// There is no good way to infer when the compaction task for an instant crushed
// or is still undergoing. So we use a configured timeout threshold to control the rollback:
// {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS},
// when the threshold hits, but an instant is still in pending(inflight) state, assumes it has failed
// already and just roll it back.
CompactionUtil.rollbackCompaction(table, conf);
// when the earliest inflight instant has timed out, assumes it has failed
// already and just rolls it back.
// comment out: do we really need the timeout rollback ?
// CompactionUtil.rollbackEarliestCompaction(table, conf);
scheduleCompaction(table, checkpointId);
} catch (Throwable throwable) {
// make it fail-safe

View File

@@ -1,46 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.utils;
import org.apache.hudi.exception.HoodieException;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.slf4j.Logger;
/**
* Coordinator executor that executes the tasks asynchronously, it fails the job
* for any task exceptions.
*
* <p>We need this because the coordinator methods are called by
* the Job Manager's main thread (mailbox thread), executes the methods asynchronously
* to avoid blocking the main thread.
*/
public class CoordinatorExecutor extends NonThrownExecutor {
private final OperatorCoordinator.Context context;
public CoordinatorExecutor(OperatorCoordinator.Context context, Logger logger) {
super(logger, true);
this.context = context;
}
@Override
protected void exceptionHook(String actionString, Throwable t) {
this.context.failJob(new HoodieException(actionString, t));
}
}

View File

@@ -18,16 +18,23 @@
package org.apache.hudi.sink.utils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* An executor service that catches all the throwable with logging.
*
* <p>A post-exception hook {@link ExceptionHook} can be defined on construction
* or on each execution.
*/
public class NonThrownExecutor implements AutoCloseable {
private final Logger logger;
@@ -37,19 +44,27 @@ public class NonThrownExecutor implements AutoCloseable {
*/
private final ExecutorService executor;
/**
* Exception hook for post-exception handling.
*/
@VisibleForTesting
protected final ExceptionHook exceptionHook;
/**
* Flag saying whether to wait for the tasks finish on #close.
*/
private final boolean waitForTaskFinishOnClose;
private final boolean waitForTasksFinish;
public NonThrownExecutor(Logger logger, boolean waitForTaskFinishOnClose) {
@VisibleForTesting
protected NonThrownExecutor(Logger logger, @Nullable ExceptionHook exceptionHook, boolean waitForTasksFinish) {
this.executor = Executors.newSingleThreadExecutor();
this.logger = logger;
this.waitForTaskFinishOnClose = waitForTaskFinishOnClose;
this.exceptionHook = exceptionHook;
this.waitForTasksFinish = waitForTasksFinish;
}
public NonThrownExecutor(Logger logger) {
this(logger, false);
public static Builder builder(Logger logger) {
return new Builder(logger);
}
/**
@@ -59,6 +74,17 @@ public class NonThrownExecutor implements AutoCloseable {
final ThrowingRunnable<Throwable> action,
final String actionName,
final Object... actionParams) {
execute(action, this.exceptionHook, actionName, actionParams);
}
/**
* Run the action in a loop.
*/
public void execute(
final ThrowingRunnable<Throwable> action,
final ExceptionHook hook,
final String actionName,
final Object... actionParams) {
executor.execute(
() -> {
@@ -73,19 +99,17 @@ public class NonThrownExecutor implements AutoCloseable {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
final String errMsg = String.format("Executor executes action [%s] error", actionString);
logger.error(errMsg, t);
exceptionHook(errMsg, t);
if (hook != null) {
hook.apply(errMsg, t);
}
}
});
}
protected void exceptionHook(String errMsg, Throwable t) {
// for sub-class to override.
}
@Override
public void close() throws Exception {
if (executor != null) {
if (waitForTaskFinishOnClose) {
if (waitForTasksFinish) {
executor.shutdown();
} else {
executor.shutdownNow();
@@ -95,4 +119,38 @@ public class NonThrownExecutor implements AutoCloseable {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
public interface ExceptionHook {
void apply(String errMsg, Throwable t);
}
/**
* Builder for {@link NonThrownExecutor}.
*/
public static class Builder {
private final Logger logger;
private ExceptionHook exceptionHook;
private boolean waitForTasksFinish = false;
private Builder(Logger logger) {
this.logger = Objects.requireNonNull(logger);
}
public NonThrownExecutor build() {
return new NonThrownExecutor(logger, exceptionHook, waitForTasksFinish);
}
public Builder exceptionHook(ExceptionHook exceptionHook) {
this.exceptionHook = exceptionHook;
return this;
}
public Builder waitForTasksFinish(boolean waitForTasksFinish) {
this.waitForTasksFinish = waitForTasksFinish;
return this;
}
}
}

View File

@@ -110,16 +110,26 @@ public class CompactionUtil {
}
}
public static void rollbackCompaction(HoodieFlinkTable<?> table, Configuration conf) {
String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS);
public static void rollbackCompaction(HoodieFlinkTable<?> table, String instantTime) {
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(instantTime);
if (table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(inflightInstant)) {
LOG.warn("Rollback failed compaction instant: [" + instantTime + "]");
table.rollbackInflightCompaction(inflightInstant);
}
}
/**
* Force rolls back all the inflight compaction instants, especially for job failover restart.
*
* @param table The hoodie table
*/
public static void rollbackCompaction(HoodieFlinkTable<?> table) {
HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
.filterPendingCompactionTimeline()
.filter(instant ->
instant.getState() == HoodieInstant.State.INFLIGHT
&& StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
instant.getState() == HoodieInstant.State.INFLIGHT);
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
LOG.info("Rollback the inflight compaction instant: " + inflightInstant + " for timeout(" + deltaSeconds + "s)");
LOG.info("Rollback the inflight compaction instant: " + inflightInstant + " for failover");
table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
});
@@ -127,18 +137,27 @@ public class CompactionUtil {
/**
* Rolls back the earliest compaction if there exists.
*
* <p>Makes the strategy not that radical: firstly check whether there exists inflight compaction instants,
* rolls back the first inflight instant only if it has timed out. That means, if there are
* multiple timed out instants on the timeline, we only roll back the first one at a time.
*/
public static void rollbackEarliestCompaction(HoodieFlinkTable<?> table) {
public static void rollbackEarliestCompaction(HoodieFlinkTable<?> table, Configuration conf) {
Option<HoodieInstant> earliestInflight = table.getActiveTimeline()
.filterPendingCompactionTimeline()
.filter(instant ->
instant.getState() == HoodieInstant.State.INFLIGHT).firstInstant();
if (earliestInflight.isPresent()) {
LOG.info("Rollback the inflight compaction instant: " + earliestInflight.get() + " for failover");
table.rollbackInflightCompaction(earliestInflight.get());
HoodieInstant instant = earliestInflight.get();
String currentTime = HoodieActiveTimeline.createNewInstantTime();
int timeout = conf.getInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS);
if (StreamerUtil.instantTimeDiffSeconds(currentTime, instant.getTimestamp()) >= timeout) {
LOG.info("Rollback the inflight compaction instant: " + instant + " for timeout(" + timeout + "s)");
table.rollbackInflightCompaction(instant);
table.getMetaClient().reloadActiveTimeline();
}
}
}
/**
* Returns whether the execution sequence is LIFO.

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.sink.utils;
import org.apache.hudi.exception.HoodieException;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
@@ -25,17 +27,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A mock {@link CoordinatorExecutor} that executes the actions synchronously.
* A mock {@link NonThrownExecutor} that executes the actions synchronously.
*/
public class MockCoordinatorExecutor extends CoordinatorExecutor {
public class MockCoordinatorExecutor extends NonThrownExecutor {
private static final Logger LOG = LoggerFactory.getLogger(MockCoordinatorExecutor.class);
public MockCoordinatorExecutor(OperatorCoordinator.Context context) {
super(context, LOG);
super(LOG, (errMsg, t) -> context.failJob(new HoodieException(errMsg, t)), true);
}
@Override
public void execute(ThrowingRunnable<Throwable> action, String actionName, Object... actionParams) {
public void execute(
ThrowingRunnable<Throwable> action,
ExceptionHook hook,
String actionName,
Object... actionParams) {
final String actionString = String.format(actionName, actionParams);
try {
action.run();
@@ -45,7 +51,10 @@ public class MockCoordinatorExecutor extends CoordinatorExecutor {
// chance the
// logging or job failing will not succeed anymore
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
exceptionHook(actionString, t);
final String errMsg = String.format("Executor executes action [%s] error", actionString);
if (hook != null) {
hook.apply(errMsg, t);
}
}
}
}

View File

@@ -70,7 +70,6 @@ public class TestCompactionUtil {
@Test
void rollbackCompaction() {
conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0);
List<String> oriInstants = IntStream.range(0, 3)
.mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
List<HoodieInstant> instants = metaClient.getActiveTimeline()
@@ -79,7 +78,7 @@ public class TestCompactionUtil {
.getInstants()
.collect(Collectors.toList());
assertThat("all the instants should be in pending state", instants.size(), is(3));
CompactionUtil.rollbackCompaction(table, conf);
CompactionUtil.rollbackCompaction(table);
boolean allRolledBack = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
.allMatch(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
assertTrue(allRolledBack, "all the instants should be rolled back");
@@ -90,6 +89,7 @@ public class TestCompactionUtil {
@Test
void rollbackEarliestCompaction() {
conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0);
List<String> oriInstants = IntStream.range(0, 3)
.mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
List<HoodieInstant> instants = metaClient.getActiveTimeline()
@@ -98,7 +98,7 @@ public class TestCompactionUtil {
.getInstants()
.collect(Collectors.toList());
assertThat("all the instants should be in pending state", instants.size(), is(3));
CompactionUtil.rollbackEarliestCompaction(table);
CompactionUtil.rollbackEarliestCompaction(table, conf);
long requestedCnt = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).count();
assertThat("Only the first instant expects to be rolled back", requestedCnt, is(1L));