diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 560b5ffba..a43fcd5ad 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -21,9 +21,11 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; +import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -51,7 +53,7 @@ public class CompactFunction extends ProcessFunction writeClient; /** * Whether to execute compaction asynchronously. @@ -89,21 +91,24 @@ public class CompactFunction extends ProcessFunction doCompaction(instantTime, compactionOperation, collector), + () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()), (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)), "Execute compaction for instant %s from task %d", instantTime, taskID); } else { // executes the compaction task synchronously for batch mode. LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID); - doCompaction(instantTime, compactionOperation, collector); + doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig()); } } - private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector collector) throws IOException { - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + private void doCompaction(String instantTime, + CompactionOperation compactionOperation, + Collector collector, + HoodieWriteConfig writeConfig) throws IOException { + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor<>(); List writeStatuses = compactor.compact( new HoodieFlinkCopyOnWriteTable<>( - writeClient.getConfig(), + writeConfig, writeClient.getEngineContext(), writeClient.getHoodieTable().getMetaClient()), writeClient.getHoodieTable().getMetaClient(), @@ -114,6 +119,12 @@ public class CompactFunction extends ProcessFunction