1
0

[HUDI-3917] Flink write task hangs if last checkpoint has no data input (#5360)

This commit is contained in:
Danny Chan
2022-04-20 12:48:24 +08:00
committed by GitHub
parent 28fdddfee0
commit 7a9e411e9d
3 changed files with 40 additions and 2 deletions

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.HiveSyncContext;
@@ -42,6 +43,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -429,6 +431,31 @@ public class StreamWriteOperatorCoordinator
addEventToBuffer(event);
}
/**
* The coordinator reuses the instant if there is no data for this round of checkpoint,
* sends the commit ack events to unblock the flushing.
*/
private void sendCommitAckEvents(long checkpointId) {
CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
.map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
.toArray(CompletableFuture<?>[]::new);
CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
if (!sendToFinishedTasks(error)) {
throw new HoodieException("Error while waiting for the commit ack events to finish sending", error);
}
});
}
/**
* Decides whether the given exception is caused by sending events to FINISHED tasks.
*
* <p>Ugly impl: the exception may change in the future.
*/
private static boolean sendToFinishedTasks(Throwable throwable) {
return throwable.getCause() instanceof TaskNotRunningException
|| throwable.getCause().getMessage().contains("running");
}
/**
* Commits the instant.
*/
@@ -456,6 +483,10 @@ public class StreamWriteOperatorCoordinator
if (writeResults.size() == 0) {
// No data has written, reset the buffer and returns early
reset();
// Send commit ack event to the write function to unblock the flushing
// If this checkpoint has no inputs while the next checkpoint has inputs,
// the 'isConfirming' flag should be switched with the ack event.
sendCommitAckEvents(checkpointId);
return false;
}
doCommit(instant, writeResults);

View File

@@ -123,9 +123,9 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
instant = this.writerHelper.getInstantTime();
} else {
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
writeStatus = Collections.emptyList();
instant = instantToWrite(false);
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, instant);
}
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)

View File

@@ -247,7 +247,7 @@ public abstract class AbstractStreamWriteFunction<I>
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
if (instant == null || (instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant))) {
if (instant == null || invalidInstant(instant, hasData)) {
// sleep for a while
timeWait.waitFor();
// refresh the inflight instant
@@ -260,4 +260,11 @@ public abstract class AbstractStreamWriteFunction<I>
}
return instant;
}
/**
* Returns whether the pending instant is invalid to write with.
*/
private boolean invalidInstant(String instant, boolean hasData) {
return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant);
}
}