[HUDI-4258] Fix when HoodieTable removes data file before the end of Flink job (#5876)
* [HUDI-4258] Fix when HoodieTable removes data file before the end of Flink job
This commit is contained in:
committed by
GitHub
parent
7c6bedff25
commit
f1103281d2
@@ -285,7 +285,8 @@ public class StreamWriteOperatorCoordinator
|
|||||||
|
|
||||||
if (event.isEndInput()) {
|
if (event.isEndInput()) {
|
||||||
// handle end input event synchronously
|
// handle end input event synchronously
|
||||||
handleEndInputEvent(event);
|
// wrap handleEndInputEvent in executeSync to preserve the order of events
|
||||||
|
executor.executeSync(() -> handleEndInputEvent(event), "handle end input event for instant %s", this.instant);
|
||||||
} else {
|
} else {
|
||||||
executor.execute(
|
executor.execute(
|
||||||
() -> {
|
() -> {
|
||||||
|
|||||||
@@ -163,6 +163,18 @@ public class WriteMetadataEvent implements OperatorEvent {
|
|||||||
return lastBatch && this.instantTime.equals(currentInstant);
|
return lastBatch && this.instantTime.equals(currentInstant);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "WriteMetadataEvent{"
|
||||||
|
+ "writeStatusesSize=" + writeStatuses.size()
|
||||||
|
+ ", taskID=" + taskID
|
||||||
|
+ ", instantTime='" + instantTime + '\''
|
||||||
|
+ ", lastBatch=" + lastBatch
|
||||||
|
+ ", endInput=" + endInput
|
||||||
|
+ ", bootstrap=" + bootstrap
|
||||||
|
+ '}';
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Utilities
|
// Utilities
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -26,9 +26,11 @@ import org.slf4j.Logger;
|
|||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An executor service that catches all the throwable with logging.
|
* An executor service that catches all the throwable with logging.
|
||||||
@@ -85,25 +87,21 @@ public class NonThrownExecutor implements AutoCloseable {
|
|||||||
final ExceptionHook hook,
|
final ExceptionHook hook,
|
||||||
final String actionName,
|
final String actionName,
|
||||||
final Object... actionParams) {
|
final Object... actionParams) {
|
||||||
|
executor.execute(wrapAction(action, hook, actionName, actionParams));
|
||||||
|
}
|
||||||
|
|
||||||
executor.execute(
|
/**
|
||||||
() -> {
|
* Run the action in a loop and wait for completion.
|
||||||
final String actionString = String.format(actionName, actionParams);
|
*/
|
||||||
try {
|
public void executeSync(ThrowingRunnable<Throwable> action, String actionName, Object... actionParams) {
|
||||||
action.run();
|
try {
|
||||||
logger.info("Executor executes action [{}] success!", actionString);
|
executor.submit(wrapAction(action, this.exceptionHook, actionName, actionParams)).get();
|
||||||
} catch (Throwable t) {
|
} catch (InterruptedException e) {
|
||||||
// if we have a JVM critical error, promote it immediately, there is a good
|
handleException(e, this.exceptionHook, getActionString(actionName, actionParams));
|
||||||
// chance the
|
} catch (ExecutionException e) {
|
||||||
// logging or job failing will not succeed any more
|
// nonfatal exceptions are handled by wrapAction
|
||||||
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
|
ExceptionUtils.rethrowIfFatalErrorOrOOM(e.getCause());
|
||||||
final String errMsg = String.format("Executor executes action [%s] error", actionString);
|
}
|
||||||
logger.error(errMsg, t);
|
|
||||||
if (hook != null) {
|
|
||||||
hook.apply(errMsg, t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -120,6 +118,40 @@ public class NonThrownExecutor implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private <E extends Throwable> Runnable wrapAction(
|
||||||
|
final ThrowingRunnable<E> action,
|
||||||
|
final ExceptionHook hook,
|
||||||
|
final String actionName,
|
||||||
|
final Object... actionParams) {
|
||||||
|
|
||||||
|
return () -> {
|
||||||
|
final Supplier<String> actionString = getActionString(actionName, actionParams);
|
||||||
|
try {
|
||||||
|
action.run();
|
||||||
|
logger.info("Executor executes action [{}] success!", actionString.get());
|
||||||
|
} catch (Throwable t) {
|
||||||
|
handleException(t, hook, actionString);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleException(Throwable t, ExceptionHook hook, Supplier<String> actionString) {
|
||||||
|
// if we have a JVM critical error, promote it immediately, there is a good
|
||||||
|
// chance the
|
||||||
|
// logging or job failing will not succeed any more
|
||||||
|
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
|
||||||
|
final String errMsg = String.format("Executor executes action [%s] error", actionString.get());
|
||||||
|
logger.error(errMsg, t);
|
||||||
|
if (hook != null) {
|
||||||
|
hook.apply(errMsg, t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Supplier<String> getActionString(String actionName, Object... actionParams) {
|
||||||
|
// avoid String.format before OOM rethrown
|
||||||
|
return () -> String.format(actionName, actionParams);
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Inner Class
|
// Inner Class
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
|
|||||||
import org.apache.hudi.metadata.HoodieTableMetadata;
|
import org.apache.hudi.metadata.HoodieTableMetadata;
|
||||||
import org.apache.hudi.sink.event.WriteMetadataEvent;
|
import org.apache.hudi.sink.event.WriteMetadataEvent;
|
||||||
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
|
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
|
||||||
|
import org.apache.hudi.sink.utils.NonThrownExecutor;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
import org.apache.hudi.utils.TestConfigurations;
|
import org.apache.hudi.utils.TestConfigurations;
|
||||||
import org.apache.hudi.utils.TestUtils;
|
import org.apache.hudi.utils.TestUtils;
|
||||||
@@ -46,11 +47,14 @@ import org.junit.jupiter.api.AfterEach;
|
|||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.io.TempDir;
|
import org.junit.jupiter.api.io.TempDir;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
@@ -298,6 +302,40 @@ public class TestStreamWriteOperatorCoordinator {
|
|||||||
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
|
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEndInputIsTheLastEvent() throws Exception {
|
||||||
|
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||||
|
MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
|
||||||
|
Logger logger = Mockito.mock(Logger.class); // avoid too many logs by executor
|
||||||
|
NonThrownExecutor executor = NonThrownExecutor.builder(logger).waitForTasksFinish(true).build();
|
||||||
|
|
||||||
|
try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, context)) {
|
||||||
|
coordinator.start();
|
||||||
|
coordinator.setExecutor(executor);
|
||||||
|
coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
|
||||||
|
TimeUnit.SECONDS.sleep(5); // wait for handled bootstrap event
|
||||||
|
|
||||||
|
int eventCount = 20_000; // big enough to fill executor's queue
|
||||||
|
for (int i = 0; i < eventCount; i++) {
|
||||||
|
coordinator.handleEventFromOperator(0, createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1));
|
||||||
|
}
|
||||||
|
|
||||||
|
WriteMetadataEvent endInput = WriteMetadataEvent.builder()
|
||||||
|
.taskID(0)
|
||||||
|
.instantTime(coordinator.getInstant())
|
||||||
|
.writeStatus(Collections.emptyList())
|
||||||
|
.endInput(true)
|
||||||
|
.build();
|
||||||
|
coordinator.handleEventFromOperator(0, endInput);
|
||||||
|
|
||||||
|
// wait for submitted events completed
|
||||||
|
executor.close();
|
||||||
|
|
||||||
|
// there should be no events after endInput
|
||||||
|
assertNull(coordinator.getEventBuffer()[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Utilities
|
// Utilities
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user