1
0

[HUDI-2050] Support rollback inflight compaction instances for batch flink compactor (#3124)

This commit is contained in:
swuferhong
2021-06-21 20:32:48 +08:00
committed by GitHub
parent adf167991a
commit f8d9242372
2 changed files with 26 additions and 2 deletions

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
@@ -45,6 +46,7 @@ public class HoodieFlinkCompactor {
protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
@SuppressWarnings("unchecked, rawtypes")
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -66,17 +68,32 @@ public class HoodieFlinkCompactor {
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
// rolls back inflight compaction first
// condition: the schedule compaction is in INFLIGHT state for max delta seconds.
String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
HoodieTimeline inflightCompactionTimeline = metaClient.getActiveTimeline()
.filterPendingCompactionTimeline()
.filter(instant ->
instant.getState() == HoodieInstant.State.INFLIGHT
&& StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
writeClient.rollbackInflightCompaction(inflightInstant, table);
table.getMetaClient().reloadActiveTimeline();
});
// judge whether have operation
// to compute the compaction instant time and do compaction.
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
if (!scheduled) {
// do nothing.
LOG.info("No compaction plan for this job ");
return;
}
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(

View File

@@ -312,4 +312,11 @@ public class StreamerUtil {
long median = low + (high - low) / 2;
return String.valueOf(median);
}
/**
* Returns the time interval in seconds between the given instant time.
*/
public static long instantTimeDiff(String newInstantTime, String oldInstantTime) {
return Long.parseLong(newInstantTime) - Long.parseLong(oldInstantTime);
}
}