[HUDI-2038] Support rollback inflight compaction instances for CompactionPlanOperator (#3105)
Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|||||||
import org.apache.hudi.common.util.CompactionUtils;
|
import org.apache.hudi.common.util.CompactionUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
|
import org.apache.hudi.util.CompactionUtil;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
import org.apache.flink.annotation.VisibleForTesting;
|
import org.apache.flink.annotation.VisibleForTesting;
|
||||||
@@ -80,18 +81,18 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void notifyCheckpointComplete(long checkpointId) throws IOException {
|
public void notifyCheckpointComplete(long checkpointId) {
|
||||||
try {
|
try {
|
||||||
scheduleCompaction(checkpointId);
|
HoodieFlinkTable hoodieTable = writeClient.getHoodieTable();
|
||||||
|
CompactionUtil.rollbackCompaction(hoodieTable, conf);
|
||||||
|
scheduleCompaction(hoodieTable, checkpointId);
|
||||||
} catch (Throwable throwable) {
|
} catch (Throwable throwable) {
|
||||||
// make it fail safe
|
// make it fail safe
|
||||||
LOG.error("Error while scheduling compaction at instant: " + compactionInstantTime, throwable);
|
LOG.error("Error while scheduling compaction at instant: " + compactionInstantTime, throwable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleCompaction(long checkpointId) throws IOException {
|
private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
|
||||||
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
|
||||||
|
|
||||||
// the last instant takes the highest priority.
|
// the last instant takes the highest priority.
|
||||||
Option<HoodieInstant> lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
|
Option<HoodieInstant> lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
|
||||||
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant();
|
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant();
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ package org.apache.hudi.sink.compact;
|
|||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.CompactionUtils;
|
import org.apache.hudi.common.util.CompactionUtils;
|
||||||
@@ -46,7 +45,6 @@ public class HoodieFlinkCompactor {
|
|||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
|
protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
|
||||||
|
|
||||||
@SuppressWarnings("unchecked, rawtypes")
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
|
||||||
@@ -73,17 +71,7 @@ public class HoodieFlinkCompactor {
|
|||||||
|
|
||||||
// rolls back inflight compaction first
|
// rolls back inflight compaction first
|
||||||
// condition: the schedule compaction is in INFLIGHT state for max delta seconds.
|
// condition: the schedule compaction is in INFLIGHT state for max delta seconds.
|
||||||
String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
CompactionUtil.rollbackCompaction(table, conf);
|
||||||
int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
|
|
||||||
HoodieTimeline inflightCompactionTimeline = metaClient.getActiveTimeline()
|
|
||||||
.filterPendingCompactionTimeline()
|
|
||||||
.filter(instant ->
|
|
||||||
instant.getState() == HoodieInstant.State.INFLIGHT
|
|
||||||
&& StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
|
|
||||||
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
|
|
||||||
writeClient.rollbackInflightCompaction(inflightInstant, table);
|
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
|
||||||
});
|
|
||||||
|
|
||||||
// judge whether have operation
|
// judge whether have operation
|
||||||
// to compute the compaction instant time and do compaction.
|
// to compute the compaction instant time and do compaction.
|
||||||
@@ -94,6 +82,8 @@ public class HoodieFlinkCompactor {
|
|||||||
LOG.info("No compaction plan for this job ");
|
LOG.info("No compaction plan for this job ");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
// generate compaction plan
|
// generate compaction plan
|
||||||
// should support configurable commit metadata
|
// should support configurable commit metadata
|
||||||
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|||||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
@@ -29,6 +30,7 @@ import org.apache.hudi.exception.HoodieIOException;
|
|||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -96,4 +98,19 @@ public class CompactionUtil {
|
|||||||
throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e);
|
throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void rollbackCompaction(HoodieFlinkTable<?> table, Configuration conf) {
|
||||||
|
String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
|
int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
|
||||||
|
HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
|
||||||
|
.filterPendingCompactionTimeline()
|
||||||
|
.filter(instant ->
|
||||||
|
instant.getState() == HoodieInstant.State.INFLIGHT
|
||||||
|
&& StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
|
||||||
|
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
|
||||||
|
LOG.info("Rollback the pending compaction instant: " + inflightInstant);
|
||||||
|
table.rollback(table.getContext(), HoodieActiveTimeline.createNewInstantTime(), inflightInstant, true);
|
||||||
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user