1
0

[HUDI-3120] Cache compactionPlan in buffer (#4463)

Co-authored-by: yuzhaojing <yuzhaojing@bytedance.com>
This commit is contained in:
yuzhaojing
2021-12-31 13:12:32 +08:00
committed by GitHub
parent a4e622ac61
commit e88b5fd450

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
@@ -67,6 +68,12 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
*/
private transient Map<String, Map<String, CompactionCommitEvent>> commitBuffer;
/**
* Cache to store compaction plan for each instant.
* Stores the mapping of instant_time -> compactionPlan.
*/
private transient Map<String, HoodieCompactionPlan> compactionPlanCache;
/**
* The hoodie table.
*/
@@ -84,6 +91,7 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
}
this.commitBuffer = new HashMap<>();
this.compactionPlanCache = new HashMap<>();
this.table = this.writeClient.getHoodieTable();
}
@@ -108,8 +116,15 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
* @param events Commit events ever received for the instant
*/
private void commitIfNecessary(String instant, Collection<CompactionCommitEvent> events) throws IOException {
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
this.writeClient.getHoodieTable().getMetaClient(), instant);
HoodieCompactionPlan compactionPlan = compactionPlanCache.computeIfAbsent(instant, k -> {
try {
return CompactionUtils.getCompactionPlan(
this.writeClient.getHoodieTable().getMetaClient(), instant);
} catch (IOException e) {
throw new HoodieException(e);
}
});
boolean isReady = compactionPlan.getOperations().size() == events.size();
if (!isReady) {
return;
@@ -143,5 +158,6 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
private void reset(String instant) {
this.commitBuffer.remove(instant);
this.compactionPlanCache.remove(instant);
}
}