diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index ecd66936e..c9fb7aceb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -101,11 +101,6 @@ public class CompactionCommitSink extends CleanFunction { @Override public void invoke(CompactionCommitEvent event, Context context) throws Exception { final String instant = event.getInstant(); - if (event.isFailed()) { - // handle failure case - CompactionUtil.rollbackCompaction(table, event.getInstant()); - return; - } commitBuffer.computeIfAbsent(instant, k -> new HashMap<>()) .put(event.getFileId(), event); commitIfNecessary(instant, commitBuffer.get(instant).values()); @@ -132,6 +127,18 @@ public class CompactionCommitSink extends CleanFunction { if (!isReady) { return; } + + if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) { + try { + // handle failure case + CompactionUtil.rollbackCompaction(table, instant); + } finally { + // remove commitBuffer to avoid obsolete metadata commit + reset(instant); + return; + } + } + try { doCommit(instant, events); } catch (Throwable throwable) {