[HUDI-3912] Fix lose data when rollback in flink async compact (#5357)
* stop add event when has failed compact event Co-authored-by: wxp <wxp4532@outlook.com>
This commit is contained in:
@@ -101,11 +101,6 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
|
|||||||
@Override
|
@Override
|
||||||
public void invoke(CompactionCommitEvent event, Context context) throws Exception {
|
public void invoke(CompactionCommitEvent event, Context context) throws Exception {
|
||||||
final String instant = event.getInstant();
|
final String instant = event.getInstant();
|
||||||
if (event.isFailed()) {
|
|
||||||
// handle failure case
|
|
||||||
CompactionUtil.rollbackCompaction(table, event.getInstant());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
|
commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
|
||||||
.put(event.getFileId(), event);
|
.put(event.getFileId(), event);
|
||||||
commitIfNecessary(instant, commitBuffer.get(instant).values());
|
commitIfNecessary(instant, commitBuffer.get(instant).values());
|
||||||
@@ -132,6 +127,18 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
|
|||||||
if (!isReady) {
|
if (!isReady) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
|
||||||
|
try {
|
||||||
|
// handle failure case
|
||||||
|
CompactionUtil.rollbackCompaction(table, instant);
|
||||||
|
} finally {
|
||||||
|
// remove commitBuffer to avoid obsolete metadata commit
|
||||||
|
reset(instant);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
doCommit(instant, events);
|
doCommit(instant, events);
|
||||||
} catch (Throwable throwable) {
|
} catch (Throwable throwable) {
|
||||||
|
|||||||
Reference in New Issue
Block a user