1
0

[MINOR] Some cosmetic changes for Flink (#3503)

This commit is contained in:
Danny Chan
2021-08-19 23:21:20 +08:00
committed by GitHub
parent 7dddd54406
commit 9762e4c08c
6 changed files with 184 additions and 108 deletions

View File

@@ -148,7 +148,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
} }
/** /**
* Load all the indices of give partition path into the backup state. * Loads all the indices of give partition path into the backup state.
* *
* @param partitionPath The partition path * @param partitionPath The partition path
*/ */

View File

@@ -29,7 +29,15 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
/** /**
* The function to load specify partition index from existing hoodieTable. * The function to load index from existing hoodieTable.
*
* <p>This function should only be used for bounded source.
*
* <p>When a record comes in, the function firstly checks whether the partition path of the record is already loaded,
* if the partition is not loaded yet, loads the entire partition and sends the index records to downstream operators
* before it sends the input record; if the partition is loaded already, sends the input record directly.
*
* <p>The input records should shuffle by the partition path to avoid repeated loading.
*/ */
public class BatchBootstrapFunction<I, O extends HoodieRecord> public class BatchBootstrapFunction<I, O extends HoodieRecord>
extends BootstrapFunction<I, O> { extends BootstrapFunction<I, O> {
@@ -61,5 +69,4 @@ public class BatchBootstrapFunction<I, O extends HoodieRecord>
// send the trigger record // send the trigger record
out.collect((O) value); out.collect((O) value);
} }
} }

View File

@@ -85,7 +85,22 @@ public class Pipelines {
.name("clean_commits"); .name("clean_commits");
} }
public static DataStream<HoodieRecord> bootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream) { public static DataStream<HoodieRecord> bootstrap(
Configuration conf,
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream,
boolean bounded) {
return bounded
? boundedBootstrap(conf, rowType, defaultParallelism, dataStream)
: streamBootstrap(conf, rowType, defaultParallelism, dataStream);
}
private static DataStream<HoodieRecord> streamBootstrap(
Configuration conf,
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream) {
DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream); DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
@@ -101,8 +116,11 @@ public class Pipelines {
return dataStream1; return dataStream1;
} }
public static DataStream<HoodieRecord> batchBootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream) { private static DataStream<HoodieRecord> boundedBootstrap(
// shuffle and sort by partition keys Configuration conf,
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream) {
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
if (partitionFields.length > 0) { if (partitionFields.length > 0) {
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);

View File

@@ -77,9 +77,9 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// default parallelism // default parallelism
int parallelism = dataStream.getExecutionConfig().getParallelism(); int parallelism = dataStream.getExecutionConfig().getParallelism();
final DataStream<HoodieRecord> dataStream1 = context.isBounded()
? Pipelines.batchBootstrap(conf, rowType, parallelism, dataStream) // bootstrap
: Pipelines.bootstrap(conf, rowType, parallelism, dataStream); final DataStream<HoodieRecord> dataStream1 = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
// write pipeline // write pipeline
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1); DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1);

View File

@@ -62,6 +62,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.hudi.utils.TestConfigurations.sql;
import static org.apache.hudi.utils.TestData.assertRowsEquals; import static org.apache.hudi.utils.TestData.assertRowsEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -98,19 +99,23 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String createSource = TestConfigurations.getFileSourceDDL("source"); String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource); streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); .option(FlinkOptions.READ_AS_STREAMING, "true")
options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); .option(FlinkOptions.TABLE_TYPE, tableType.name())
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source"; String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto); execInsertSql(streamTableEnv, insertInto);
String firstCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath()); String firstCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath());
options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), firstCommit);
streamTableEnv.executeSql("drop table t1"); streamTableEnv.executeSql("drop table t1");
hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, "true")
.option(FlinkOptions.TABLE_TYPE, tableType.name())
.option(FlinkOptions.READ_STREAMING_START_COMMIT, firstCommit)
.end();
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10); List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
@@ -128,11 +133,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String createSource = TestConfigurations.getFileSourceDDL("source"); String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource); streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); .option(FlinkOptions.READ_AS_STREAMING, "true")
options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); .option(FlinkOptions.TABLE_TYPE, tableType.name())
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source"; String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto); execInsertSql(streamTableEnv, insertInto);
@@ -156,11 +161,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
streamTableEnv.executeSql(createSource); streamTableEnv.executeSql(createSource);
streamTableEnv.executeSql(createSource2); streamTableEnv.executeSql(createSource2);
Map<String, String> options = new HashMap<>(); String createHoodieTable = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); .option(FlinkOptions.READ_AS_STREAMING, "true")
options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); .option(FlinkOptions.TABLE_TYPE, tableType.name())
String createHoodieTable = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
streamTableEnv.executeSql(createHoodieTable); streamTableEnv.executeSql(createHoodieTable);
String insertInto = "insert into t1 select * from source"; String insertInto = "insert into t1 select * from source";
// execute 2 times // execute 2 times
@@ -171,8 +176,12 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String insertInto2 = "insert into t1 select * from source2"; String insertInto2 = "insert into t1 select * from source2";
execInsertSql(streamTableEnv, insertInto2); execInsertSql(streamTableEnv, insertInto2);
// now we consume starting from the oldest commit // now we consume starting from the oldest commit
options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), specifiedCommit); String createHoodieTable2 = sql("t2")
String createHoodieTable2 = TestConfigurations.getCreateHoodieTableDDL("t2", options); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, "true")
.option(FlinkOptions.TABLE_TYPE, tableType.name())
.option(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit)
.end();
streamTableEnv.executeSql(createHoodieTable2); streamTableEnv.executeSql(createHoodieTable2);
List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10); List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10);
// all the data with same keys are appended within one data bucket and one log file, // all the data with same keys are appended within one data bucket and one log file,
@@ -186,9 +195,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String createSource = TestConfigurations.getFileSourceDDL("source"); String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource); streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source"; String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto); execInsertSql(streamTableEnv, insertInto);
@@ -204,16 +213,16 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String createSource = TestConfigurations.getFileSourceDDL("source"); String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource); streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
// read optimized is supported for both MOR and COR table, .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
// test MOR streaming write with compaction then reads as // read optimized is supported for both MOR and COR table,
// query type 'read_optimized'. // test MOR streaming write with compaction then reads as
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); // query type 'read_optimized'.
options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_READ_OPTIMIZED); .option(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); .option(FlinkOptions.COMPACTION_DELTA_COMMITS, "1")
options.put(FlinkOptions.COMPACTION_TASKS.key(), "1"); .option(FlinkOptions.COMPACTION_TASKS, "1")
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source"; String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto); execInsertSql(streamTableEnv, insertInto);
@@ -233,10 +242,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
"source", "test_source_3.data", 4); "source", "test_source_3.data", 4);
streamTableEnv.executeSql(createSource); streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "1"); // only keep 1 commits .option(FlinkOptions.CLEAN_RETAIN_COMMITS, "1")
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source"; String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto); execInsertSql(streamTableEnv, insertInto);
@@ -269,14 +278,14 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String latestCommit = StreamerUtil.createWriteClient(conf, null) String latestCommit = StreamerUtil.createWriteClient(conf, null)
.getLastCompletedInstant(HoodieTableType.MERGE_ON_READ); .getLastCompletedInstant(HoodieTableType.MERGE_ON_READ);
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); .option(FlinkOptions.READ_AS_STREAMING, "true")
options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "2"); .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2")
options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), latestCommit); .option(FlinkOptions.READ_STREAMING_START_COMMIT, latestCommit)
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true"); .option(FlinkOptions.CHANGELOG_ENABLED, "true")
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
final String sinkDDL = "create table sink(\n" final String sinkDDL = "create table sink(\n"
@@ -295,12 +304,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@MethodSource("executionModeAndPartitioningParams") @MethodSource("executionModeAndPartitioningParams")
void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) { void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
if (hiveStylePartitioning) { .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); .end();
}
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);
execInsertSql(tableEnv, TestSQL.INSERT_T1); execInsertSql(tableEnv, TestSQL.INSERT_T1);
@@ -321,10 +328,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@EnumSource(value = HoodieTableType.class) @EnumSource(value = HoodieTableType.class)
void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) { void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) {
TableEnvironment tableEnv = batchTableEnv; TableEnvironment tableEnv = batchTableEnv;
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.TABLE_NAME.key(), tableType.name()); .option(FlinkOptions.TABLE_NAME, tableType.name())
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false); .withPartition(false)
.end();
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);
execInsertSql(tableEnv, TestSQL.INSERT_T1); execInsertSql(tableEnv, TestSQL.INSERT_T1);
@@ -344,13 +352,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@MethodSource("tableTypeAndPartitioningParams") @MethodSource("tableTypeAndPartitioningParams")
void testBatchModeUpsert(HoodieTableType tableType, boolean hiveStylePartitioning) { void testBatchModeUpsert(HoodieTableType tableType, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = batchTableEnv; TableEnvironment tableEnv = batchTableEnv;
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.TABLE_NAME.key(), tableType.name()); .option(FlinkOptions.TABLE_NAME, tableType.name())
if (hiveStylePartitioning) { .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); .end();
}
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);
execInsertSql(tableEnv, TestSQL.INSERT_T1); execInsertSql(tableEnv, TestSQL.INSERT_T1);
@@ -421,9 +427,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@EnumSource(value = ExecMode.class) @EnumSource(value = ExecMode.class)
void testInsertOverwrite(ExecMode execMode) { void testInsertOverwrite(ExecMode execMode) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);
execInsertSql(tableEnv, TestSQL.INSERT_T1); execInsertSql(tableEnv, TestSQL.INSERT_T1);
@@ -458,10 +464,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@EnumSource(value = ExecMode.class) @EnumSource(value = ExecMode.class)
void testUpsertWithMiniBatches(ExecMode execMode) { void testUpsertWithMiniBatches(ExecMode execMode) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.001"); .option(FlinkOptions.WRITE_BATCH_SIZE, "0.001")
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);
final String insertInto1 = "insert into t1 values\n" final String insertInto1 = "insert into t1 values\n"
@@ -486,9 +492,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@EnumSource(value = ExecMode.class) @EnumSource(value = ExecMode.class)
void testWriteNonPartitionedTable(ExecMode execMode) { void testWriteNonPartitionedTable(ExecMode execMode) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false); .withPartition(false)
.end();
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);
final String insertInto1 = "insert into t1 values\n" final String insertInto1 = "insert into t1 values\n"
@@ -516,11 +523,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
"source", "test_source_4.data", 4); "source", "test_source_4.data", 4);
streamTableEnv.executeSql(createSource); streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "true"); .option(FlinkOptions.INDEX_GLOBAL_ENABLED, "true")
options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true"); .option(FlinkOptions.INSERT_DROP_DUPS, "true")
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
final String insertInto2 = "insert into t1 select * from source"; final String insertInto2 = "insert into t1 select * from source";
@@ -538,12 +545,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String createSource = TestConfigurations.getFileSourceDDL( String createSource = TestConfigurations.getFileSourceDDL(
"source", "test_source_4.data", 4); "source", "test_source_4.data", 4);
streamTableEnv.executeSql(createSource); streamTableEnv.executeSql(createSource);
String hoodieTableDDL = sql("t1")
Map<String, String> options = new HashMap<>(); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.INDEX_GLOBAL_ENABLED, "false")
options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "false"); .option(FlinkOptions.INSERT_DROP_DUPS, "true")
options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true"); .end();
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
final String insertInto2 = "insert into t1 select * from source"; final String insertInto2 = "insert into t1 select * from source";
@@ -567,11 +573,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
StreamerUtil.initTableIfNotExists(conf); StreamerUtil.initTableIfNotExists(conf);
// create a flink source table // create a flink source table
Map<String, String> options = new HashMap<>(); String createHoodieTable = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); .option(FlinkOptions.READ_AS_STREAMING, "true")
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
String createHoodieTable = TestConfigurations.getCreateHoodieTableDDL("t1", options); .end();
streamTableEnv.executeSql(createHoodieTable); streamTableEnv.executeSql(createHoodieTable);
// execute query and assert throws exception // execute query and assert throws exception
@@ -640,14 +646,12 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data"); String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data");
tableEnv.executeSql(csvSourceDDL); tableEnv.executeSql(csvSourceDDL);
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("hoodie_sink")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.OPERATION.key(), "bulk_insert"); .option(FlinkOptions.OPERATION, "bulk_insert")
options.put(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION.key(), "true"); .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, "true")
if (hiveStylePartitioning) { .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); .end();
}
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("hoodie_sink", options);
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into hoodie_sink select * from csv_source"; String insertInto = "insert into hoodie_sink select * from csv_source";
@@ -668,10 +672,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@Test @Test
void testBulkInsertNonPartitionedTable() { void testBulkInsertNonPartitionedTable() {
TableEnvironment tableEnv = batchTableEnv; TableEnvironment tableEnv = batchTableEnv;
Map<String, String> options = new HashMap<>(); String hoodieTableDDL = sql("t1")
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
options.put(FlinkOptions.OPERATION.key(), "bulk_insert"); .option(FlinkOptions.OPERATION, "bulk_insert")
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false); .withPartition(false)
.end();
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);
final String insertInto1 = "insert into t1 values\n" final String insertInto1 = "insert into t1 values\n"

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.utils.factory.CollectSinkTableFactory; import org.apache.hudi.utils.factory.CollectSinkTableFactory;
import org.apache.hudi.utils.factory.ContinuousFileSourceFactory; import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.TableSchema;
@@ -31,6 +32,7 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@@ -182,4 +184,48 @@ public class TestConfigurations {
streamerConf.checkpointInterval = 4000L; streamerConf.checkpointInterval = 4000L;
return streamerConf; return streamerConf;
} }
/**
* Creates the tool to build hoodie table DDL.
*/
public static Sql sql(String tableName) {
return new Sql(tableName);
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
/**
* Tool to build hoodie table DDL with schema {@link #TABLE_SCHEMA}.
*/
public static class Sql {
private final Map<String, String> options;
private String tableName;
private boolean withPartition = true;
public Sql(String tableName) {
options = new HashMap<>();
this.tableName = tableName;
}
public Sql option(ConfigOption<?> option, String val) {
this.options.put(option.key(), val);
return this;
}
public Sql option(ConfigOption<?> option, boolean val) {
this.options.put(option.key(), val + "");
return this;
}
public Sql withPartition(boolean withPartition) {
this.withPartition = withPartition;
return this;
}
public String end() {
return TestConfigurations.getCreateHoodieTableDDL(this.tableName, options, this.withPartition);
}
}
} }