From 1fed44af84b4726d40c57f1dad012c8e4a510f91 Mon Sep 17 00:00:00 2001 From: swuferhong <337361684@qq.com> Date: Thu, 19 Aug 2021 17:15:26 +0800 Subject: [PATCH] [HUDI-2316] Support Flink batch upsert (#3494) --- .../sink/bootstrap/BootstrapFunction.java | 10 +- .../batch/BatchBootstrapFunction.java | 65 +++++++ .../org/apache/hudi/sink/utils/Pipelines.java | 165 ++++++++++++++++++ .../apache/hudi/table/HoodieTableSink.java | 105 ++--------- .../org/apache/hudi/util/StreamerUtil.java | 4 + .../hudi/table/HoodieDataSourceITCase.java | 99 ++++++++--- .../apache/hudi/utils/TestConfigurations.java | 18 +- .../java/org/apache/hudi/utils/TestSQL.java | 10 ++ 8 files changed, 343 insertions(+), 133 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java index 49233626f..fc3c5f4ed 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java @@ -72,12 +72,12 @@ public class BootstrapFunction private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class); - private HoodieTable hoodieTable; + protected HoodieTable hoodieTable; - private final Configuration conf; + protected final Configuration conf; - private transient org.apache.hadoop.conf.Configuration hadoopConf; - private transient HoodieWriteConfig writeConfig; + protected transient org.apache.hadoop.conf.Configuration hadoopConf; + protected transient HoodieWriteConfig writeConfig; private GlobalAggregateManager aggregateManager; @@ -153,7 +153,7 @@ public class BootstrapFunction * @param partitionPath The partition path */ @SuppressWarnings("unchecked") - private void loadRecords(String partitionPath, Collector out) throws Exception { + protected void loadRecords(String partitionPath, Collector out) throws Exception { long start = System.currentTimeMillis(); BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java new file mode 100644 index 000000000..8b136b4cc --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bootstrap.batch; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.util.HashSet; +import java.util.Set; + +/** + * The function to load specify partition index from existing hoodieTable. + */ +public class BatchBootstrapFunction + extends BootstrapFunction { + + private Set partitionPathSet; + private boolean haveSuccessfulCommits; + + public BatchBootstrapFunction(Configuration conf) { + super(conf); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.partitionPathSet = new HashSet<>(); + this.haveSuccessfulCommits = StreamerUtil.haveSuccessfulCommits(hoodieTable.getMetaClient()); + } + + @Override + public void processElement(I value, Context context, Collector out) throws Exception { + final HoodieRecord record = (HoodieRecord) value; + final String partitionPath = record.getKey().getPartitionPath(); + + if (haveSuccessfulCommits && !partitionPathSet.contains(partitionPath)) { + loadRecords(partitionPath, out); + partitionPathSet.add(partitionPath); + } + + // send the trigger record + out.collect((O) value); + } + +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java new file mode 100644 index 000000000..4808eb79d --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.CleanFunction; +import org.apache.hudi.sink.StreamWriteOperatorFactory; +import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapFunction; +import org.apache.hudi.sink.bulk.BulkInsertWriteOperator; +import org.apache.hudi.sink.bulk.RowDataKeyGen; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.compact.CompactFunction; +import org.apache.hudi.sink.compact.CompactionCommitEvent; +import org.apache.hudi.sink.compact.CompactionCommitSink; +import org.apache.hudi.sink.compact.CompactionPlanEvent; +import org.apache.hudi.sink.compact.CompactionPlanOperator; +import org.apache.hudi.sink.partitioner.BucketAssignFunction; +import org.apache.hudi.sink.partitioner.BucketAssignOperator; +import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; +import org.apache.hudi.table.format.FilePathUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.types.logical.RowType; + +/** + * Utilities to generate all kinds of sub-pipelines. + */ +public class Pipelines { + + public static DataStreamSink bulkInsert(Configuration conf, RowType rowType, DataStream dataStream) { + BulkInsertWriteOperator.OperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType); + + final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); + if (partitionFields.length > 0) { + RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); + if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) { + + // shuffle by partition keys + dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath); + } + if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) { + SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields); + // sort by partition keys + dataStream = dataStream + .transform("partition_key_sorter", + TypeInformation.of(RowData.class), + sortOperatorGen.createSortOperator()) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + } + } + return dataStream + .transform("hoodie_bulk_insert_write", + TypeInformation.of(Object.class), + operatorFactory) + // follow the parallelism of upstream operators to avoid shuffle + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) + .addSink(new CleanFunction<>(conf)) + .setParallelism(1) + .name("clean_commits"); + } + + public static DataStream bootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream dataStream) { + DataStream dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream); + + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { + dataStream1 = dataStream1.rebalance() + .transform( + "index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))) + .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) + .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); + } + + return dataStream1; + } + + public static DataStream batchBootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream dataStream) { + // shuffle and sort by partition keys + final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); + if (partitionFields.length > 0) { + RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); + // shuffle by partition keys + dataStream = dataStream + .keyBy(rowDataKeyGen::getPartitionPath); + } + + return rowDataToHoodieRecord(conf, rowType, dataStream) + .transform( + "batch_index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BatchBootstrapFunction<>(conf))) + .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) + .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); + } + + public static DataStream rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream dataStream) { + return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)); + } + + public static DataStream hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream dataStream) { + StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); + return dataStream + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) + .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism)) + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) + .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) + .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + } + + public static DataStreamSink compact(Configuration conf, DataStream dataStream) { + return dataStream.transform("compact_plan_generate", + TypeInformation.of(CompactionPlanEvent.class), + new CompactionPlanOperator(conf)) + .setParallelism(1) // plan generate must be singleton + .rebalance() + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new ProcessOperator<>(new CompactFunction(conf))) + .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) + .addSink(new CompactionCommitSink(conf)) + .name("compact_commit") + .setParallelism(1); // compaction commit should be singleton + } + + public static DataStreamSink clean(Configuration conf, DataStream dataStream) { + return dataStream.addSink(new CleanFunction<>(conf)) + .setParallelism(1) + .name("clean_commits"); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index ca6d33a54..d49fdf193 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -21,37 +21,19 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.CleanFunction; -import org.apache.hudi.sink.StreamWriteOperatorFactory; -import org.apache.hudi.sink.bootstrap.BootstrapFunction; -import org.apache.hudi.sink.bulk.BulkInsertWriteOperator; -import org.apache.hudi.sink.bulk.RowDataKeyGen; -import org.apache.hudi.sink.bulk.sort.SortOperatorGen; -import org.apache.hudi.sink.compact.CompactFunction; -import org.apache.hudi.sink.compact.CompactionCommitEvent; -import org.apache.hudi.sink.compact.CompactionCommitSink; -import org.apache.hudi.sink.compact.CompactionPlanEvent; -import org.apache.hudi.sink.compact.CompactionPlanOperator; -import org.apache.hudi.sink.partitioner.BucketAssignFunction; -import org.apache.hudi.sink.partitioner.BucketAssignOperator; -import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; -import org.apache.hudi.table.format.FilePathUtils; +import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.util.ChangelogModes; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.types.logical.RowType; import java.util.Map; @@ -90,90 +72,23 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // bulk_insert mode final String writeOperation = this.conf.get(FlinkOptions.OPERATION); if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { - BulkInsertWriteOperator.OperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType); - - final String[] partitionFields = FilePathUtils.extractPartitionKeys(this.conf); - if (partitionFields.length > 0) { - RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); - if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) { - - // shuffle by partition keys - dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath); - } - if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) { - SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields); - // sort by partition keys - dataStream = dataStream - .transform("partition_key_sorter", - TypeInformation.of(RowData.class), - sortOperatorGen.createSortOperator()) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); - ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), - conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); - } - } - return dataStream - .transform("hoodie_bulk_insert_write", - TypeInformation.of(Object.class), - operatorFactory) - // follow the parallelism of upstream operators to avoid shuffle - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) - .addSink(new CleanFunction<>(conf)) - .setParallelism(1) - .name("clean_commits"); + return Pipelines.bulkInsert(conf, rowType, dataStream); } - // stream write + // default parallelism int parallelism = dataStream.getExecutionConfig().getParallelism(); - StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); + final DataStream dataStream1 = context.isBounded() + ? Pipelines.batchBootstrap(conf, rowType, parallelism, dataStream) + : Pipelines.bootstrap(conf, rowType, parallelism, dataStream); - DataStream dataStream1 = dataStream - .map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)); + // write pipeline + DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1); - // bootstrap index - // TODO: This is a very time-consuming operation, will optimization - if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - dataStream1 = dataStream1.rebalance() - .transform( - "index_bootstrap", - TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))) - .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism)) - .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); - } - - DataStream pipeline = dataStream1 - // Key-by record key, to avoid multiple subtasks write to a bucket at the same time - .keyBy(HoodieRecord::getRecordKey) - .transform( - "bucket_assigner", - TypeInformation.of(HoodieRecord.class), - new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) - .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism)) - // shuffle by fileId(bucket id) - .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // compaction if (StreamerUtil.needsAsyncCompaction(conf)) { - return pipeline.transform("compact_plan_generate", - TypeInformation.of(CompactionPlanEvent.class), - new CompactionPlanOperator(conf)) - .setParallelism(1) // plan generate must be singleton - .rebalance() - .transform("compact_task", - TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new CompactFunction(conf))) - .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) - .addSink(new CompactionCommitSink(conf)) - .name("compact_commit") - .setParallelism(1); // compaction commit should be singleton + return Pipelines.compact(conf, pipeline); } else { - return pipeline.addSink(new CleanFunction<>(conf)) - .setParallelism(1) - .name("clean_commits"); + return Pipelines.clean(conf, pipeline); } }; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 0fac2cc49..f89d089ae 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -379,4 +379,8 @@ public class StreamerUtil { WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP); } + + public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) { + return !metaClient.getCommitsTimeline().filterCompletedInstants().empty(); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 0764f5586..99effba3e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -292,7 +292,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { } @ParameterizedTest - @MethodSource("configParams") + @MethodSource("executionModeAndPartitioningParams") void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; Map options = new HashMap<>(); @@ -317,6 +317,56 @@ public class HoodieDataSourceITCase extends AbstractTestBase { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) { + TableEnvironment tableEnv = batchTableEnv; + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_NAME.key(), tableType.name()); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_T1); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + + // batchMode update + execInsertSql(tableEnv, TestSQL.UPDATE_INSERT_T1); + List result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result2, TestData.DATA_SET_SOURCE_MERGED); + } + + @ParameterizedTest + @MethodSource("tableTypeAndPartitioningParams") + void testBatchModeUpsert(HoodieTableType tableType, boolean hiveStylePartitioning) { + TableEnvironment tableEnv = batchTableEnv; + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_NAME.key(), tableType.name()); + if (hiveStylePartitioning) { + options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); + } + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_T1); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + + // batchMode update + execInsertSql(tableEnv, TestSQL.UPDATE_INSERT_T1); + + List result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result2, TestData.DATA_SET_SOURCE_MERGED); + } + @ParameterizedTest @EnumSource(value = ExecMode.class) void testWriteAndReadParMiddle(ExecMode execMode) throws Exception { @@ -436,18 +486,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @EnumSource(value = ExecMode.class) void testWriteNonPartitionedTable(ExecMode execMode) { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; - String hoodieTableDDL = "create table t1(\n" - + " uuid varchar(20),\n" - + " name varchar(10),\n" - + " age int,\n" - + " ts timestamp(3),\n" - + " `partition` varchar(20),\n" - + " PRIMARY KEY(uuid) NOT ENFORCED\n" - + ")\n" - + "with (\n" - + " 'connector' = 'hudi',\n" - + " 'path' = '" + tempFile.getAbsolutePath() + "'\n" - + ")"; + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false); tableEnv.executeSql(hoodieTableDDL); final String insertInto1 = "insert into t1 values\n" @@ -627,19 +668,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @Test void testBulkInsertNonPartitionedTable() { TableEnvironment tableEnv = batchTableEnv; - String hoodieTableDDL = "create table t1(\n" - + " uuid varchar(20),\n" - + " name varchar(10),\n" - + " age int,\n" - + " ts timestamp(3),\n" - + " `partition` varchar(20),\n" - + " PRIMARY KEY(uuid) NOT ENFORCED\n" - + ")\n" - + "with (\n" - + " 'connector' = 'hudi',\n" - + " 'path' = '" + tempFile.getAbsolutePath() + "',\n" - + " 'write.operation' = 'bulk_insert'\n" - + ")"; + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.OPERATION.key(), "bulk_insert"); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false); tableEnv.executeSql(hoodieTableDDL); final String insertInto1 = "insert into t1 values\n" @@ -675,7 +707,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { /** * Return test params => (execution mode, hive style partitioning). */ - private static Stream configParams() { + private static Stream executionModeAndPartitioningParams() { Object[][] data = new Object[][] { {ExecMode.BATCH, false}, @@ -685,6 +717,19 @@ public class HoodieDataSourceITCase extends AbstractTestBase { return Stream.of(data).map(Arguments::of); } + /** + * Return test params => (HoodieTableType, hive style partitioning). + */ + private static Stream tableTypeAndPartitioningParams() { + Object[][] data = + new Object[][] { + {HoodieTableType.COPY_ON_WRITE, false}, + {HoodieTableType.COPY_ON_WRITE, true}, + {HoodieTableType.MERGE_ON_READ, false}, + {HoodieTableType.MERGE_ON_READ, true}}; + return Stream.of(data).map(Arguments::of); + } + private void execInsertSql(TableEnvironment tEnv, String insert) { TableResult tableResult = tEnv.executeSql(insert); // wait to finish diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index a7e38c0d9..f9db04c95 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -56,18 +56,24 @@ public class TestConfigurations { .build(); public static String getCreateHoodieTableDDL(String tableName, Map options) { - String createTable = "create table " + tableName + "(\n" + return getCreateHoodieTableDDL(tableName, options, true); + } + + public static String getCreateHoodieTableDDL(String tableName, Map options, boolean havePartition) { + StringBuilder builder = new StringBuilder(); + builder.append("create table " + tableName + "(\n" + " uuid varchar(20),\n" + " name varchar(10),\n" + " age int,\n" + " ts timestamp(3),\n" + " `partition` varchar(20),\n" + " PRIMARY KEY(uuid) NOT ENFORCED\n" - + ")\n" - + "PARTITIONED BY (`partition`)\n" - + "with (\n" - + " 'connector' = 'hudi'"; - StringBuilder builder = new StringBuilder(createTable); + + ")\n"); + if (havePartition) { + builder.append("PARTITIONED BY (`partition`)\n"); + } + builder.append("with (\n" + + " 'connector' = 'hudi'"); options.forEach((k, v) -> builder.append(",\n") .append(" '").append(k).append("' = '").append(v).append("'")); builder.append("\n)"); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java index 8fb15edf2..6b0b71c71 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java @@ -33,4 +33,14 @@ public class TestSQL { + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n" + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; + + public static final String UPDATE_INSERT_T1 = "insert into t1 values\n" + + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" + + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" + + "('id3','Julian',54,TIMESTAMP '1970-01-01 00:00:03','par2'),\n" + + "('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'),\n" + + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n" + + "('id9','Jane',19,TIMESTAMP '1970-01-01 00:00:06','par3'),\n" + + "('id10','Ella',38,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" + + "('id11','Phoebe',52,TIMESTAMP '1970-01-01 00:00:08','par4')"; }