From 3a1fd22841ab40b98589f5dc17717548500bde4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= <408317717@qq.com> Date: Mon, 27 Jun 2022 16:09:44 +0800 Subject: [PATCH] [HUDI-4311] Fix Flink lose data on some rollback scene (#5950) --- .../org/apache/hudi/sink/StreamWriteOperatorCoordinator.java | 2 +- .../src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 44553820b..6aa4c0b1f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -271,7 +271,7 @@ public class StreamWriteOperatorCoordinator @Override public void notifyCheckpointAborted(long checkpointId) { - if (checkpointId == this.checkpointId) { + if (checkpointId == this.checkpointId && !WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { executor.execute(() -> { this.ckpMetadata.abortInstant(this.instant); }, "abort instant %s", this.instant); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index 45a4e04ba..f059c7050 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -97,8 +97,6 @@ public class CkpMetadata implements Serializable { public void bootstrap(HoodieTableMetaClient metaClient) throws IOException { fs.delete(path, true); fs.mkdirs(path); - metaClient.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction() - .lastInstant().ifPresent(instant -> startInstant(instant.getTimestamp())); } public void startInstant(String instant) {