1
0

[HUDI-2631] In CompactFunction, set up the write schema each time with the latest schema (#4000)

Co-authored-by: yuzhaojing <yuzhaojing@bytedance.com>
This commit is contained in:
yuzhaojing
2022-03-02 11:18:17 +08:00
committed by GitHub
parent 3cfb52c413
commit 3b2da9f138
2 changed files with 30 additions and 6 deletions

View File

@@ -21,9 +21,11 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -51,7 +53,7 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
private transient HoodieFlinkWriteClient<?> writeClient;
/**
* Whether to execute compaction asynchronously.
@@ -89,21 +91,24 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
if (asyncCompaction) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doCompaction(instantTime, compactionOperation, collector),
() -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),
(errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
"Execute compaction for instant %s from task %d", instantTime, taskID);
} else {
// executes the compaction task synchronously for batch mode.
LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
doCompaction(instantTime, compactionOperation, collector);
doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());
}
}
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
private void doCompaction(String instantTime,
CompactionOperation compactionOperation,
Collector<CompactionCommitEvent> collector,
HoodieWriteConfig writeConfig) throws IOException {
HoodieFlinkMergeOnReadTableCompactor<?> compactor = new HoodieFlinkMergeOnReadTableCompactor<>();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
writeClient.getConfig(),
writeConfig,
writeClient.getEngineContext(),
writeClient.getHoodieTable().getMetaClient()),
writeClient.getHoodieTable().getMetaClient(),
@@ -114,6 +119,12 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID));
}
private HoodieWriteConfig reloadWriteConfig() throws Exception {
HoodieWriteConfig writeConfig = writeClient.getConfig();
CompactionUtil.setAvroSchema(writeConfig, writeClient.getHoodieTable().getMetaClient());
return writeConfig;
}
@VisibleForTesting
public void setExecutor(NonThrownExecutor executor) {
this.executor = executor;

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
@@ -106,6 +107,18 @@ public class CompactionUtil {
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString());
}
/**
* Sets up the avro schema string into the HoodieWriteConfig {@code HoodieWriteConfig}
* through reading from the hoodie table metadata.
*
* @param writeConfig The HoodieWriteConfig
*/
public static void setAvroSchema(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
writeConfig.setSchema(tableAvroSchema.toString());
}
/**
* Infers the changelog mode based on the data file schema(including metadata fields).
*