[HUDI-2229] Refact HoodieFlinkStreamer to reuse the pipeline of HoodieTableSink (#3495)
Co-authored-by: mikewu <xingbo.wxb@alibaba-inc.com>
This commit is contained in:
@@ -21,30 +21,18 @@ package org.apache.hudi.streamer;
|
|||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.sink.CleanFunction;
|
|
||||||
import org.apache.hudi.sink.StreamWriteOperatorFactory;
|
|
||||||
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
|
|
||||||
import org.apache.hudi.sink.compact.CompactFunction;
|
|
||||||
import org.apache.hudi.sink.compact.CompactionCommitEvent;
|
|
||||||
import org.apache.hudi.sink.compact.CompactionCommitSink;
|
|
||||||
import org.apache.hudi.sink.compact.CompactionPlanEvent;
|
|
||||||
import org.apache.hudi.sink.compact.CompactionPlanOperator;
|
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
|
||||||
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
|
|
||||||
import org.apache.hudi.sink.transform.Transformer;
|
import org.apache.hudi.sink.transform.Transformer;
|
||||||
|
import org.apache.hudi.sink.utils.Pipelines;
|
||||||
import org.apache.hudi.util.AvroSchemaConverter;
|
import org.apache.hudi.util.AvroSchemaConverter;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
import com.beust.jcommander.JCommander;
|
import com.beust.jcommander.JCommander;
|
||||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.formats.common.TimestampFormat;
|
import org.apache.flink.formats.common.TimestampFormat;
|
||||||
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
|
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
|
||||||
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
|
||||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||||
import org.apache.flink.table.data.RowData;
|
import org.apache.flink.table.data.RowData;
|
||||||
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
|
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
|
||||||
@@ -88,9 +76,6 @@ public class HoodieFlinkStreamer {
|
|||||||
int parallelism = env.getParallelism();
|
int parallelism = env.getParallelism();
|
||||||
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
|
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
|
||||||
|
|
||||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
|
|
||||||
new StreamWriteOperatorFactory<>(conf);
|
|
||||||
|
|
||||||
DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
|
DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
|
||||||
cfg.kafkaTopic,
|
cfg.kafkaTopic,
|
||||||
new JsonRowDataDeserializationSchema(
|
new JsonRowDataDeserializationSchema(
|
||||||
@@ -110,51 +95,12 @@ public class HoodieFlinkStreamer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DataStream<HoodieRecord> dataStream2 = dataStream
|
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
|
||||||
.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
|
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
||||||
|
|
||||||
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
|
||||||
dataStream2 = dataStream2
|
|
||||||
.transform(
|
|
||||||
"index_bootstrap",
|
|
||||||
TypeInformation.of(HoodieRecord.class),
|
|
||||||
new BootstrapOperator<>(conf))
|
|
||||||
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
|
|
||||||
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
|
|
||||||
}
|
|
||||||
|
|
||||||
DataStream<Object> pipeline = dataStream2
|
|
||||||
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
|
|
||||||
.keyBy(HoodieRecord::getRecordKey)
|
|
||||||
.transform(
|
|
||||||
"bucket_assigner",
|
|
||||||
TypeInformation.of(HoodieRecord.class),
|
|
||||||
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
|
|
||||||
.uid("uid_bucket_assigner" + conf.getString(FlinkOptions.TABLE_NAME))
|
|
||||||
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
|
|
||||||
// shuffle by fileId(bucket id)
|
|
||||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
|
||||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
|
||||||
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
|
||||||
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
||||||
pipeline.transform("compact_plan_generate",
|
Pipelines.compact(conf, pipeline);
|
||||||
TypeInformation.of(CompactionPlanEvent.class),
|
|
||||||
new CompactionPlanOperator(conf))
|
|
||||||
.uid("uid_compact_plan_generate")
|
|
||||||
.setParallelism(1) // plan generate must be singleton
|
|
||||||
.rebalance()
|
|
||||||
.transform("compact_task",
|
|
||||||
TypeInformation.of(CompactionCommitEvent.class),
|
|
||||||
new ProcessOperator<>(new CompactFunction(conf)))
|
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
|
|
||||||
.addSink(new CompactionCommitSink(conf))
|
|
||||||
.name("compact_commit")
|
|
||||||
.setParallelism(1); // compaction commit should be singleton
|
|
||||||
} else {
|
} else {
|
||||||
pipeline.addSink(new CleanFunction<>(conf))
|
Pipelines.clean(conf, pipeline);
|
||||||
.setParallelism(1)
|
|
||||||
.name("clean_commits").uid("uid_clean_commits");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
env.execute(cfg.targetTableName);
|
env.execute(cfg.targetTableName);
|
||||||
|
|||||||
@@ -77,13 +77,10 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
|||||||
|
|
||||||
// default parallelism
|
// default parallelism
|
||||||
int parallelism = dataStream.getExecutionConfig().getParallelism();
|
int parallelism = dataStream.getExecutionConfig().getParallelism();
|
||||||
|
|
||||||
// bootstrap
|
// bootstrap
|
||||||
final DataStream<HoodieRecord> dataStream1 = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
|
final DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
|
||||||
|
|
||||||
// write pipeline
|
// write pipeline
|
||||||
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1);
|
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
||||||
|
|
||||||
// compaction
|
// compaction
|
||||||
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
||||||
return Pipelines.compact(conf, pipeline);
|
return Pipelines.compact(conf, pipeline);
|
||||||
|
|||||||
@@ -28,19 +28,14 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|||||||
import org.apache.hudi.common.util.CompactionUtils;
|
import org.apache.hudi.common.util.CompactionUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
|
|
||||||
import org.apache.hudi.sink.compact.CompactFunction;
|
import org.apache.hudi.sink.compact.CompactFunction;
|
||||||
import org.apache.hudi.sink.compact.CompactionCommitEvent;
|
import org.apache.hudi.sink.compact.CompactionCommitEvent;
|
||||||
import org.apache.hudi.sink.compact.CompactionCommitSink;
|
import org.apache.hudi.sink.compact.CompactionCommitSink;
|
||||||
import org.apache.hudi.sink.compact.CompactionPlanEvent;
|
|
||||||
import org.apache.hudi.sink.compact.CompactionPlanOperator;
|
|
||||||
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
|
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
|
||||||
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
|
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
|
||||||
import org.apache.hudi.sink.transform.ChainedTransformer;
|
import org.apache.hudi.sink.transform.ChainedTransformer;
|
||||||
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
|
|
||||||
import org.apache.hudi.sink.transform.Transformer;
|
import org.apache.hudi.sink.transform.Transformer;
|
||||||
|
import org.apache.hudi.sink.utils.Pipelines;
|
||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
import org.apache.hudi.util.AvroSchemaConverter;
|
import org.apache.hudi.util.AvroSchemaConverter;
|
||||||
import org.apache.hudi.util.CompactionUtil;
|
import org.apache.hudi.util.CompactionUtil;
|
||||||
@@ -235,12 +230,13 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMergeOnReadWriteWithCompaction() throws Exception {
|
public void testMergeOnReadWriteWithCompaction() throws Exception {
|
||||||
|
int parallelism = 4;
|
||||||
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||||
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
|
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
|
||||||
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
|
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
|
||||||
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
|
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
execEnv.getConfig().disableObjectReuse();
|
execEnv.getConfig().disableObjectReuse();
|
||||||
execEnv.setParallelism(4);
|
execEnv.setParallelism(parallelism);
|
||||||
// set up checkpoint interval
|
// set up checkpoint interval
|
||||||
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
|
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
|
||||||
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
|
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
|
||||||
@@ -267,49 +263,16 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
|
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
|
||||||
format.setCharsetName("UTF-8");
|
format.setCharsetName("UTF-8");
|
||||||
|
|
||||||
DataStream<HoodieRecord> hoodieDataStream = execEnv
|
DataStream<RowData> dataStream = execEnv
|
||||||
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
|
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
|
||||||
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
|
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
|
||||||
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
|
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
|
||||||
.setParallelism(4)
|
.setParallelism(parallelism);
|
||||||
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
|
|
||||||
|
|
||||||
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
|
||||||
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
|
|
||||||
TypeInformation.of(HoodieRecord.class),
|
|
||||||
new BootstrapOperator<>(conf));
|
|
||||||
}
|
|
||||||
|
|
||||||
DataStream<Object> pipeline = hoodieDataStream
|
|
||||||
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
|
|
||||||
.keyBy(HoodieRecord::getRecordKey)
|
|
||||||
.transform(
|
|
||||||
"bucket_assigner",
|
|
||||||
TypeInformation.of(HoodieRecord.class),
|
|
||||||
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
|
|
||||||
.uid("uid_bucket_assigner")
|
|
||||||
// shuffle by fileId(bucket id)
|
|
||||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
|
||||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
|
||||||
.uid("uid_hoodie_stream_write");
|
|
||||||
|
|
||||||
pipeline.addSink(new CleanFunction<>(conf))
|
|
||||||
.setParallelism(1)
|
|
||||||
.name("clean_commits").uid("uid_clean_commits");
|
|
||||||
|
|
||||||
pipeline.transform("compact_plan_generate",
|
|
||||||
TypeInformation.of(CompactionPlanEvent.class),
|
|
||||||
new CompactionPlanOperator(conf))
|
|
||||||
.uid("uid_compact_plan_generate")
|
|
||||||
.setParallelism(1) // plan generate must be singleton
|
|
||||||
.rebalance()
|
|
||||||
.transform("compact_task",
|
|
||||||
TypeInformation.of(CompactionCommitEvent.class),
|
|
||||||
new ProcessOperator<>(new CompactFunction(conf)))
|
|
||||||
.addSink(new CompactionCommitSink(conf))
|
|
||||||
.name("compact_commit")
|
|
||||||
.setParallelism(1);
|
|
||||||
|
|
||||||
|
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
|
||||||
|
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
||||||
|
Pipelines.clean(conf, pipeline);
|
||||||
|
Pipelines.compact(conf, pipeline);
|
||||||
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
|
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
|
||||||
if (client.getJobStatus().get() != JobStatus.FAILED) {
|
if (client.getJobStatus().get() != JobStatus.FAILED) {
|
||||||
try {
|
try {
|
||||||
@@ -364,27 +327,9 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
dataStream = transformer.apply(dataStream);
|
dataStream = transformer.apply(dataStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
DataStream<HoodieRecord> hoodieDataStream = dataStream
|
int parallelism = execEnv.getParallelism();
|
||||||
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
|
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
|
||||||
|
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
|
||||||
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
|
||||||
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
|
|
||||||
TypeInformation.of(HoodieRecord.class),
|
|
||||||
new BootstrapOperator<>(conf));
|
|
||||||
}
|
|
||||||
|
|
||||||
DataStream<Object> pipeline = hoodieDataStream
|
|
||||||
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
|
|
||||||
.keyBy(HoodieRecord::getRecordKey)
|
|
||||||
.transform(
|
|
||||||
"bucket_assigner",
|
|
||||||
TypeInformation.of(HoodieRecord.class),
|
|
||||||
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
|
|
||||||
.uid("uid_bucket_assigner")
|
|
||||||
// shuffle by fileId(bucket id)
|
|
||||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
|
||||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
|
||||||
.uid("uid_hoodie_stream_write");
|
|
||||||
execEnv.addOperator(pipeline.getTransformation());
|
execEnv.addOperator(pipeline.getTransformation());
|
||||||
|
|
||||||
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
|
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
|
||||||
|
|||||||
Reference in New Issue
Block a user