diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java index fc3c5f4ed..a73831b7b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java @@ -148,7 +148,7 @@ public class BootstrapFunction } /** - * Load all the indices of give partition path into the backup state. + * Loads all the indices of give partition path into the backup state. * * @param partitionPath The partition path */ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java index 8b136b4cc..237858da8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java @@ -29,7 +29,15 @@ import java.util.HashSet; import java.util.Set; /** - * The function to load specify partition index from existing hoodieTable. + * The function to load index from existing hoodieTable. + * + *

This function should only be used for bounded source. + * + *

When a record comes in, the function firstly checks whether the partition path of the record is already loaded, + * if the partition is not loaded yet, loads the entire partition and sends the index records to downstream operators + * before it sends the input record; if the partition is loaded already, sends the input record directly. + * + *

The input records should shuffle by the partition path to avoid repeated loading. */ public class BatchBootstrapFunction extends BootstrapFunction { @@ -61,5 +69,4 @@ public class BatchBootstrapFunction // send the trigger record out.collect((O) value); } - } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 4808eb79d..b71b18013 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -85,7 +85,22 @@ public class Pipelines { .name("clean_commits"); } - public static DataStream bootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream dataStream) { + public static DataStream bootstrap( + Configuration conf, + RowType rowType, + int defaultParallelism, + DataStream dataStream, + boolean bounded) { + return bounded + ? boundedBootstrap(conf, rowType, defaultParallelism, dataStream) + : streamBootstrap(conf, rowType, defaultParallelism, dataStream); + } + + private static DataStream streamBootstrap( + Configuration conf, + RowType rowType, + int defaultParallelism, + DataStream dataStream) { DataStream dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { @@ -101,8 +116,11 @@ public class Pipelines { return dataStream1; } - public static DataStream batchBootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream dataStream) { - // shuffle and sort by partition keys + private static DataStream boundedBootstrap( + Configuration conf, + RowType rowType, + int defaultParallelism, + DataStream dataStream) { final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); if (partitionFields.length > 0) { RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index d49fdf193..feab7b7be 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -77,9 +77,9 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // default parallelism int parallelism = dataStream.getExecutionConfig().getParallelism(); - final DataStream dataStream1 = context.isBounded() - ? Pipelines.batchBootstrap(conf, rowType, parallelism, dataStream) - : Pipelines.bootstrap(conf, rowType, parallelism, dataStream); + + // bootstrap + final DataStream dataStream1 = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded()); // write pipeline DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 99effba3e..9effdcc8c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -62,6 +62,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.utils.TestConfigurations.sql; import static org.apache.hudi.utils.TestData.assertRowsEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -98,19 +99,23 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String createSource = TestConfigurations.getFileSourceDDL("source"); streamTableEnv.executeSql(createSource); - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); - options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.READ_AS_STREAMING, "true") + .option(FlinkOptions.TABLE_TYPE, tableType.name()) + .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); String firstCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath()); - options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), firstCommit); streamTableEnv.executeSql("drop table t1"); - hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.READ_AS_STREAMING, "true") + .option(FlinkOptions.TABLE_TYPE, tableType.name()) + .option(FlinkOptions.READ_STREAMING_START_COMMIT, firstCommit) + .end(); streamTableEnv.executeSql(hoodieTableDDL); List rows = execSelectSql(streamTableEnv, "select * from t1", 10); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); @@ -128,11 +133,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String createSource = TestConfigurations.getFileSourceDDL("source"); streamTableEnv.executeSql(createSource); - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); - options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.READ_AS_STREAMING, "true") + .option(FlinkOptions.TABLE_TYPE, tableType.name()) + .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); @@ -156,11 +161,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase { streamTableEnv.executeSql(createSource); streamTableEnv.executeSql(createSource2); - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); - options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); - String createHoodieTable = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String createHoodieTable = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.READ_AS_STREAMING, "true") + .option(FlinkOptions.TABLE_TYPE, tableType.name()) + .end(); streamTableEnv.executeSql(createHoodieTable); String insertInto = "insert into t1 select * from source"; // execute 2 times @@ -171,8 +176,12 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String insertInto2 = "insert into t1 select * from source2"; execInsertSql(streamTableEnv, insertInto2); // now we consume starting from the oldest commit - options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), specifiedCommit); - String createHoodieTable2 = TestConfigurations.getCreateHoodieTableDDL("t2", options); + String createHoodieTable2 = sql("t2") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.READ_AS_STREAMING, "true") + .option(FlinkOptions.TABLE_TYPE, tableType.name()) + .option(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit) + .end(); streamTableEnv.executeSql(createHoodieTable2); List rows = execSelectSql(streamTableEnv, "select * from t2", 10); // all the data with same keys are appended within one data bucket and one log file, @@ -186,9 +195,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String createSource = TestConfigurations.getFileSourceDDL("source"); streamTableEnv.executeSql(createSource); - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); @@ -204,16 +213,16 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String createSource = TestConfigurations.getFileSourceDDL("source"); streamTableEnv.executeSql(createSource); - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - // read optimized is supported for both MOR and COR table, - // test MOR streaming write with compaction then reads as - // query type 'read_optimized'. - options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); - options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_READ_OPTIMIZED); - options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); - options.put(FlinkOptions.COMPACTION_TASKS.key(), "1"); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + // read optimized is supported for both MOR and COR table, + // test MOR streaming write with compaction then reads as + // query type 'read_optimized'. + .option(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED) + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, "1") + .option(FlinkOptions.COMPACTION_TASKS, "1") + .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); @@ -233,10 +242,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase { "source", "test_source_3.data", 4); streamTableEnv.executeSql(createSource); - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "1"); // only keep 1 commits - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.CLEAN_RETAIN_COMMITS, "1") + .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); @@ -269,14 +278,14 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String latestCommit = StreamerUtil.createWriteClient(conf, null) .getLastCompletedInstant(HoodieTableType.MERGE_ON_READ); - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); - options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); - options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "2"); - options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), latestCommit); - options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true"); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .option(FlinkOptions.READ_AS_STREAMING, "true") + .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2") + .option(FlinkOptions.READ_STREAMING_START_COMMIT, latestCommit) + .option(FlinkOptions.CHANGELOG_ENABLED, "true") + .end(); streamTableEnv.executeSql(hoodieTableDDL); final String sinkDDL = "create table sink(\n" @@ -295,12 +304,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @MethodSource("executionModeAndPartitioningParams") void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - if (hiveStylePartitioning) { - options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); - } - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) + .end(); tableEnv.executeSql(hoodieTableDDL); execInsertSql(tableEnv, TestSQL.INSERT_T1); @@ -321,10 +328,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @EnumSource(value = HoodieTableType.class) void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) { TableEnvironment tableEnv = batchTableEnv; - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.TABLE_NAME.key(), tableType.name()); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_NAME, tableType.name()) + .withPartition(false) + .end(); tableEnv.executeSql(hoodieTableDDL); execInsertSql(tableEnv, TestSQL.INSERT_T1); @@ -344,13 +352,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @MethodSource("tableTypeAndPartitioningParams") void testBatchModeUpsert(HoodieTableType tableType, boolean hiveStylePartitioning) { TableEnvironment tableEnv = batchTableEnv; - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.TABLE_NAME.key(), tableType.name()); - if (hiveStylePartitioning) { - options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); - } - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_NAME, tableType.name()) + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) + .end(); tableEnv.executeSql(hoodieTableDDL); execInsertSql(tableEnv, TestSQL.INSERT_T1); @@ -421,9 +427,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @EnumSource(value = ExecMode.class) void testInsertOverwrite(ExecMode execMode) { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .end(); tableEnv.executeSql(hoodieTableDDL); execInsertSql(tableEnv, TestSQL.INSERT_T1); @@ -458,10 +464,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @EnumSource(value = ExecMode.class) void testUpsertWithMiniBatches(ExecMode execMode) { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.001"); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.WRITE_BATCH_SIZE, "0.001") + .end(); tableEnv.executeSql(hoodieTableDDL); final String insertInto1 = "insert into t1 values\n" @@ -486,9 +492,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @EnumSource(value = ExecMode.class) void testWriteNonPartitionedTable(ExecMode execMode) { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .withPartition(false) + .end(); tableEnv.executeSql(hoodieTableDDL); final String insertInto1 = "insert into t1 values\n" @@ -516,11 +523,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase { "source", "test_source_4.data", 4); streamTableEnv.executeSql(createSource); - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "true"); - options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true"); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.INDEX_GLOBAL_ENABLED, "true") + .option(FlinkOptions.INSERT_DROP_DUPS, "true") + .end(); streamTableEnv.executeSql(hoodieTableDDL); final String insertInto2 = "insert into t1 select * from source"; @@ -538,12 +545,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String createSource = TestConfigurations.getFileSourceDDL( "source", "test_source_4.data", 4); streamTableEnv.executeSql(createSource); - - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "false"); - options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true"); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.INDEX_GLOBAL_ENABLED, "false") + .option(FlinkOptions.INSERT_DROP_DUPS, "true") + .end(); streamTableEnv.executeSql(hoodieTableDDL); final String insertInto2 = "insert into t1 select * from source"; @@ -567,11 +573,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase { StreamerUtil.initTableIfNotExists(conf); // create a flink source table - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); - options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); - String createHoodieTable = TestConfigurations.getCreateHoodieTableDDL("t1", options); + String createHoodieTable = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.READ_AS_STREAMING, "true") + .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .end(); streamTableEnv.executeSql(createHoodieTable); // execute query and assert throws exception @@ -640,14 +646,12 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data"); tableEnv.executeSql(csvSourceDDL); - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.OPERATION.key(), "bulk_insert"); - options.put(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION.key(), "true"); - if (hiveStylePartitioning) { - options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); - } - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("hoodie_sink", options); + String hoodieTableDDL = sql("hoodie_sink") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.OPERATION, "bulk_insert") + .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, "true") + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) + .end(); tableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into hoodie_sink select * from csv_source"; @@ -668,10 +672,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @Test void testBulkInsertNonPartitionedTable() { TableEnvironment tableEnv = batchTableEnv; - Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.OPERATION.key(), "bulk_insert"); - String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.OPERATION, "bulk_insert") + .withPartition(false) + .end(); tableEnv.executeSql(hoodieTableDDL); final String insertInto1 = "insert into t1 values\n" diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index f9db04c95..6824090a2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -23,6 +23,7 @@ import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.utils.factory.CollectSinkTableFactory; import org.apache.hudi.utils.factory.ContinuousFileSourceFactory; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; @@ -31,6 +32,7 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -182,4 +184,48 @@ public class TestConfigurations { streamerConf.checkpointInterval = 4000L; return streamerConf; } + + /** + * Creates the tool to build hoodie table DDL. + */ + public static Sql sql(String tableName) { + return new Sql(tableName); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + /** + * Tool to build hoodie table DDL with schema {@link #TABLE_SCHEMA}. + */ + public static class Sql { + private final Map options; + private String tableName; + private boolean withPartition = true; + + public Sql(String tableName) { + options = new HashMap<>(); + this.tableName = tableName; + } + + public Sql option(ConfigOption option, String val) { + this.options.put(option.key(), val); + return this; + } + + public Sql option(ConfigOption option, boolean val) { + this.options.put(option.key(), val + ""); + return this; + } + + public Sql withPartition(boolean withPartition) { + this.withPartition = withPartition; + return this; + } + + public String end() { + return TestConfigurations.getCreateHoodieTableDDL(this.tableName, options, this.withPartition); + } + } }