From 408663c42b93dfffd55547e6e0262d3ef0388c78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= <408317717@qq.com> Date: Wed, 20 Apr 2022 19:23:39 +0800 Subject: [PATCH] [HUDI-3912] Fix lose data when rollback in flink async compact (#5357) * stop add event when has failed compact event Co-authored-by: wxp --- .../hudi/sink/compact/CompactionCommitSink.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index ecd66936e..c9fb7aceb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -101,11 +101,6 @@ public class CompactionCommitSink extends CleanFunction { @Override public void invoke(CompactionCommitEvent event, Context context) throws Exception { final String instant = event.getInstant(); - if (event.isFailed()) { - // handle failure case - CompactionUtil.rollbackCompaction(table, event.getInstant()); - return; - } commitBuffer.computeIfAbsent(instant, k -> new HashMap<>()) .put(event.getFileId(), event); commitIfNecessary(instant, commitBuffer.get(instant).values()); @@ -132,6 +127,18 @@ public class CompactionCommitSink extends CleanFunction { if (!isReady) { return; } + + if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) { + try { + // handle failure case + CompactionUtil.rollbackCompaction(table, instant); + } finally { + // remove commitBuffer to avoid obsolete metadata commit + reset(instant); + return; + } + } + try { doCommit(instant, events); } catch (Throwable throwable) {