[HUDI-3870] Add timeout rollback for flink online compaction (#5314)
This commit is contained in:
@@ -88,8 +88,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
|||||||
// when the earliest inflight instant has timed out, assumes it has failed
|
// when the earliest inflight instant has timed out, assumes it has failed
|
||||||
// already and just rolls it back.
|
// already and just rolls it back.
|
||||||
|
|
||||||
// comment out: do we really need the timeout rollback ?
|
CompactionUtil.rollbackEarliestCompaction(table, conf);
|
||||||
// CompactionUtil.rollbackEarliestCompaction(table, conf);
|
|
||||||
scheduleCompaction(table, checkpointId);
|
scheduleCompaction(table, checkpointId);
|
||||||
} catch (Throwable throwable) {
|
} catch (Throwable throwable) {
|
||||||
// make it fail-safe
|
// make it fail-safe
|
||||||
@@ -99,7 +98,8 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
|||||||
|
|
||||||
private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
|
private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
|
||||||
// the first instant takes the highest priority.
|
// the first instant takes the highest priority.
|
||||||
Option<HoodieInstant> firstRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
|
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
|
||||||
|
Option<HoodieInstant> firstRequested = pendingCompactionTimeline
|
||||||
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
|
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
|
||||||
if (!firstRequested.isPresent()) {
|
if (!firstRequested.isPresent()) {
|
||||||
// do nothing.
|
// do nothing.
|
||||||
@@ -107,6 +107,13 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Option<HoodieInstant> firstInflight = pendingCompactionTimeline
|
||||||
|
.filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT).firstInstant();
|
||||||
|
if (firstInflight.isPresent()) {
|
||||||
|
LOG.warn("Waiting for pending compaction instant : " + firstInflight + " to complete, skip scheduling new compaction plans");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
String compactionInstantTime = firstRequested.get().getTimestamp();
|
String compactionInstantTime = firstRequested.get().getTimestamp();
|
||||||
|
|
||||||
// generate compaction plan
|
// generate compaction plan
|
||||||
|
|||||||
Reference in New Issue
Block a user