From ba11082282d48af41126b94e233ea12fcf89c279 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Sun, 24 Jul 2022 08:44:22 +0800 Subject: [PATCH] [HUDI-4450] Revert the checkpoint abort notification (#6181) --- .../apache/hudi/sink/StreamWriteOperatorCoordinator.java | 9 --------- .../hudi/sink/common/AbstractStreamWriteFunction.java | 9 +-------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index d5ca307a0..d636bcde3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -267,15 +267,6 @@ public class StreamWriteOperatorCoordinator ); } - @Override - public void notifyCheckpointAborted(long checkpointId) { - if (checkpointId == this.checkpointId && !WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { - executor.execute(() -> { - this.ckpMetadata.abortInstant(this.instant); - }, "abort instant %s", this.instant); - } - } - @Override public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { // no operation diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 04b7f4354..674cd3588 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -243,13 +243,6 @@ public abstract class AbstractStreamWriteFunction return this.ckpMetadata.lastPendingInstant(); } - /** - * Returns whether the instant is fresh new(not aborted). - */ - protected boolean freshInstant(String instant) { - return !this.ckpMetadata.isAborted(instant); - } - /** * Prepares the instant time to write with for next checkpoint. * @@ -286,6 +279,6 @@ public abstract class AbstractStreamWriteFunction * Returns whether the pending instant is invalid to write with. */ private boolean invalidInstant(String instant, boolean hasData) { - return instant.equals(this.currentInstant) && hasData && freshInstant(instant); + return instant.equals(this.currentInstant) && hasData; } }