[HUDI-2746] Do not bootstrap for flink insert overwrite (#3980)
This commit is contained in:
@@ -98,15 +98,42 @@ public class Pipelines {
|
|||||||
.name("dummy");
|
.name("dummy");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs bootstrap pipeline as streaming.
|
||||||
|
*/
|
||||||
|
public static DataStream<HoodieRecord> bootstrap(
|
||||||
|
Configuration conf,
|
||||||
|
RowType rowType,
|
||||||
|
int defaultParallelism,
|
||||||
|
DataStream<RowData> dataStream) {
|
||||||
|
return bootstrap(conf, rowType, defaultParallelism, dataStream, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs bootstrap pipeline.
|
||||||
|
*
|
||||||
|
* @param conf The configuration
|
||||||
|
* @param rowType The row type
|
||||||
|
* @param defaultParallelism The default parallelism
|
||||||
|
* @param dataStream The data stream
|
||||||
|
* @param bounded Whether the source is bounded
|
||||||
|
* @param overwrite Whether it is insert overwrite
|
||||||
|
*/
|
||||||
public static DataStream<HoodieRecord> bootstrap(
|
public static DataStream<HoodieRecord> bootstrap(
|
||||||
Configuration conf,
|
Configuration conf,
|
||||||
RowType rowType,
|
RowType rowType,
|
||||||
int defaultParallelism,
|
int defaultParallelism,
|
||||||
DataStream<RowData> dataStream,
|
DataStream<RowData> dataStream,
|
||||||
boolean bounded) {
|
boolean bounded,
|
||||||
return bounded
|
boolean overwrite) {
|
||||||
? boundedBootstrap(conf, rowType, defaultParallelism, dataStream)
|
final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
|
||||||
: streamBootstrap(conf, rowType, defaultParallelism, dataStream);
|
if (overwrite) {
|
||||||
|
return rowDataToHoodieRecord(conf, rowType, dataStream);
|
||||||
|
} else if (bounded && !globalIndex) {
|
||||||
|
return boundedBootstrap(conf, rowType, defaultParallelism, dataStream);
|
||||||
|
} else {
|
||||||
|
return streamBootstrap(conf, rowType, defaultParallelism, dataStream);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DataStream<HoodieRecord> streamBootstrap(
|
private static DataStream<HoodieRecord> streamBootstrap(
|
||||||
|
|||||||
@@ -96,7 +96,7 @@ public class HoodieFlinkStreamer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
|
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
|
||||||
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
||||||
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
||||||
Pipelines.compact(conf, pipeline);
|
Pipelines.compact(conf, pipeline);
|
||||||
|
|||||||
@@ -86,7 +86,8 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
|||||||
DataStream<Object> pipeline;
|
DataStream<Object> pipeline;
|
||||||
|
|
||||||
// bootstrap
|
// bootstrap
|
||||||
final DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
|
final DataStream<HoodieRecord> hoodieRecordDataStream =
|
||||||
|
Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded(), overwrite);
|
||||||
// write pipeline
|
// write pipeline
|
||||||
pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
||||||
// compaction
|
// compaction
|
||||||
|
|||||||
@@ -168,7 +168,7 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
|
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
|
||||||
.setParallelism(parallelism);
|
.setParallelism(parallelism);
|
||||||
|
|
||||||
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
|
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
|
||||||
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
||||||
Pipelines.clean(conf, pipeline);
|
Pipelines.clean(conf, pipeline);
|
||||||
Pipelines.compact(conf, pipeline);
|
Pipelines.compact(conf, pipeline);
|
||||||
@@ -225,7 +225,7 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int parallelism = execEnv.getParallelism();
|
int parallelism = execEnv.getParallelism();
|
||||||
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
|
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
|
||||||
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
||||||
execEnv.addOperator(pipeline.getTransformation());
|
execEnv.addOperator(pipeline.getTransformation());
|
||||||
|
|
||||||
|
|||||||
@@ -606,6 +606,36 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]");
|
assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource("executionModeAndTableTypeParams")
|
||||||
|
void testBatchUpsertWithMiniBatchesGlobalIndex(ExecMode execMode, HoodieTableType tableType) {
|
||||||
|
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
|
||||||
|
String hoodieTableDDL = sql("t1")
|
||||||
|
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||||
|
.option(FlinkOptions.WRITE_BATCH_SIZE, "0.001")
|
||||||
|
.option(FlinkOptions.TABLE_TYPE, tableType)
|
||||||
|
.option(FlinkOptions.INDEX_GLOBAL_ENABLED, true)
|
||||||
|
.end();
|
||||||
|
tableEnv.executeSql(hoodieTableDDL);
|
||||||
|
|
||||||
|
final String insertInto1 = "insert into t1 values\n"
|
||||||
|
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
|
||||||
|
|
||||||
|
execInsertSql(tableEnv, insertInto1);
|
||||||
|
|
||||||
|
final String insertInto2 = "insert into t1 values\n"
|
||||||
|
+ "('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n"
|
||||||
|
+ "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n"
|
||||||
|
+ "('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
|
||||||
|
+ "('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3')";
|
||||||
|
|
||||||
|
execInsertSql(tableEnv, insertInto2);
|
||||||
|
|
||||||
|
List<Row> result = CollectionUtil.iterableToList(
|
||||||
|
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
|
||||||
|
assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par3]]");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testUpdateWithDefaultHoodieRecordPayload() {
|
void testUpdateWithDefaultHoodieRecordPayload() {
|
||||||
TableEnvironment tableEnv = batchTableEnv;
|
TableEnvironment tableEnv = batchTableEnv;
|
||||||
|
|||||||
Reference in New Issue
Block a user