From f897e6d73ebc26d32017774d452389023f53f742 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 14 Oct 2021 13:46:53 +0800 Subject: [PATCH] [HUDI-2551] Support DefaultHoodieRecordPayload for flink (#3792) --- .../execution/FlinkLazyInsertIterable.java | 2 +- .../hudi/configuration/FlinkOptions.java | 2 +- .../sink/bootstrap/BootstrapOperator.java | 7 +++++ .../batch/BatchBootstrapOperator.java | 5 ++++ .../org/apache/hudi/util/StreamerUtil.java | 5 ++++ .../hudi/table/HoodieDataSourceITCase.java | 29 +++++++++++++++++++ 6 files changed, 48 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index 8769f63e3..b0674b2a1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -65,7 +65,7 @@ public class FlinkLazyInsertIterable extends Hood try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = - new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema)); + new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema, hoodieConfig)); final List result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 81bd51748..b2359f4b3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -100,7 +100,7 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions .key("metadata.compaction.delta_commits") .intType() - .defaultValue(24) + .defaultValue(10) .withDescription("Max delta commits for metadata table to trigger compaction, default 24"); // ------------------------------------------------------------------------ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 3ac7aa1e6..0e7bb5472 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -129,6 +129,13 @@ public class BootstrapOperator WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)), HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE))); + preLoadIndexRecords(); + } + + /** + * Load the index records before {@link #processElement}. + */ + protected void preLoadIndexRecords() throws Exception { String basePath = hoodieTable.getMetaClient().getBasePath(); int taskID = getRuntimeContext().getIndexOfThisSubtask(); LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java index ac4c2b179..258f88403 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java @@ -56,6 +56,11 @@ public class BatchBootstrapOperator this.haveSuccessfulCommits = StreamerUtil.haveSuccessfulCommits(hoodieTable.getMetaClient()); } + @Override + protected void preLoadIndexRecords() { + // no operation + } + @Override @SuppressWarnings("unchecked") public void processElement(StreamRecord element) throws Exception { diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index cfa29801c..7fb550d47 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMemoryConfig; +import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; @@ -189,6 +190,10 @@ public class StreamerUtil { .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED)) .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS)) .build()) + .withPayloadConfig(HoodiePayloadConfig.newBuilder() + .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) + .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) + .build()) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index e31f974f3..621cd1c43 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; @@ -584,6 +585,34 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]"); } + @Test + void testUpdateWithDefaultHoodieRecordPayload() { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1") + .field("id int") + .field("name string") + .field("price double") + .field("ts bigint") + .pkField("id") + .noPartition() + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName()) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + final String insertInto1 = "insert into t1 values\n" + + "(1,'a1',20,20)"; + execInsertSql(tableEnv, insertInto1); + + final String insertInto4 = "insert into t1 values\n" + + "(1,'a1',20,1)"; + execInsertSql(tableEnv, insertInto4); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result, "[+I[1, a1, 20.0, 20]]"); + } + @ParameterizedTest @MethodSource("executionModeAndTableTypeParams") void testWriteNonPartitionedTable(ExecMode execMode, HoodieTableType tableType) {