From 3b2da9f13847475be3dcef13b3d25df8818cecc7 Mon Sep 17 00:00:00 2001 From: yuzhaojing <32435329+yuzhaojing@users.noreply.github.com> Date: Wed, 2 Mar 2022 11:18:17 +0800 Subject: [PATCH] [HUDI-2631] In CompactFunction, set up the write schema each time with the latest schema (#4000) Co-authored-by: yuzhaojing --- .../hudi/sink/compact/CompactFunction.java | 23 ++++++++++++++----- .../org/apache/hudi/util/CompactionUtil.java | 13 +++++++++++ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 560b5ffba..a43fcd5ad 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -21,9 +21,11 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; +import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -51,7 +53,7 @@ public class CompactFunction extends ProcessFunction writeClient; /** * Whether to execute compaction asynchronously. @@ -89,21 +91,24 @@ public class CompactFunction extends ProcessFunction doCompaction(instantTime, compactionOperation, collector), + () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()), (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)), "Execute compaction for instant %s from task %d", instantTime, taskID); } else { // executes the compaction task synchronously for batch mode. LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID); - doCompaction(instantTime, compactionOperation, collector); + doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig()); } } - private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector collector) throws IOException { - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + private void doCompaction(String instantTime, + CompactionOperation compactionOperation, + Collector collector, + HoodieWriteConfig writeConfig) throws IOException { + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor<>(); List writeStatuses = compactor.compact( new HoodieFlinkCopyOnWriteTable<>( - writeClient.getConfig(), + writeConfig, writeClient.getEngineContext(), writeClient.getHoodieTable().getMetaClient()), writeClient.getHoodieTable().getMetaClient(), @@ -114,6 +119,12 @@ public class CompactFunction extends ProcessFunction