From 9850e90e2e0fa602a5346911810bd65c5f3e0ea8 Mon Sep 17 00:00:00 2001 From: mikewu Date: Fri, 27 Aug 2021 10:14:04 +0800 Subject: [PATCH] [HUDI-2229] Refact HoodieFlinkStreamer to reuse the pipeline of HoodieTableSink (#3495) Co-authored-by: mikewu --- .../hudi/streamer/HoodieFlinkStreamer.java | 64 ++------------- .../apache/hudi/table/HoodieTableSink.java | 7 +- .../apache/hudi/sink/StreamWriteITCase.java | 79 +++---------------- 3 files changed, 19 insertions(+), 131 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 3d3f804af..077633ee9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -21,30 +21,18 @@ package org.apache.hudi.streamer; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.CleanFunction; -import org.apache.hudi.sink.StreamWriteOperatorFactory; -import org.apache.hudi.sink.bootstrap.BootstrapOperator; -import org.apache.hudi.sink.compact.CompactFunction; -import org.apache.hudi.sink.compact.CompactionCommitEvent; -import org.apache.hudi.sink.compact.CompactionCommitSink; -import org.apache.hudi.sink.compact.CompactionPlanEvent; -import org.apache.hudi.sink.compact.CompactionPlanOperator; -import org.apache.hudi.sink.partitioner.BucketAssignFunction; -import org.apache.hudi.sink.partitioner.BucketAssignOperator; -import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; import org.apache.hudi.sink.transform.Transformer; +import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.JCommander; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -88,9 +76,6 @@ public class HoodieFlinkStreamer { int parallelism = env.getParallelism(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); - StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(conf); - DataStream dataStream = env.addSource(new FlinkKafkaConsumer<>( cfg.kafkaTopic, new JsonRowDataDeserializationSchema( @@ -110,51 +95,12 @@ public class HoodieFlinkStreamer { } } - DataStream dataStream2 = dataStream - .map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)); - - if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - dataStream2 = dataStream2 - .transform( - "index_bootstrap", - TypeInformation.of(HoodieRecord.class), - new BootstrapOperator<>(conf)) - .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism)) - .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); - } - - DataStream pipeline = dataStream2 - // Key-by record key, to avoid multiple subtasks write to a bucket at the same time - .keyBy(HoodieRecord::getRecordKey) - .transform( - "bucket_assigner", - TypeInformation.of(HoodieRecord.class), - new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner" + conf.getString(FlinkOptions.TABLE_NAME)) - .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism)) - // shuffle by fileId(bucket id) - .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false); + DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); if (StreamerUtil.needsAsyncCompaction(conf)) { - pipeline.transform("compact_plan_generate", - TypeInformation.of(CompactionPlanEvent.class), - new CompactionPlanOperator(conf)) - .uid("uid_compact_plan_generate") - .setParallelism(1) // plan generate must be singleton - .rebalance() - .transform("compact_task", - TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new CompactFunction(conf))) - .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) - .addSink(new CompactionCommitSink(conf)) - .name("compact_commit") - .setParallelism(1); // compaction commit should be singleton + Pipelines.compact(conf, pipeline); } else { - pipeline.addSink(new CleanFunction<>(conf)) - .setParallelism(1) - .name("clean_commits").uid("uid_clean_commits"); + Pipelines.clean(conf, pipeline); } env.execute(cfg.targetTableName); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index feab7b7be..2ced22aaf 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -77,13 +77,10 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // default parallelism int parallelism = dataStream.getExecutionConfig().getParallelism(); - // bootstrap - final DataStream dataStream1 = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded()); - + final DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded()); // write pipeline - DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1); - + DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); // compaction if (StreamerUtil.needsAsyncCompaction(conf)) { return Pipelines.compact(conf, pipeline); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index a40a01069..659e0225e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -28,19 +28,14 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.bootstrap.BootstrapOperator; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; -import org.apache.hudi.sink.compact.CompactionPlanEvent; -import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.compact.CompactionPlanSourceFunction; import org.apache.hudi.sink.compact.FlinkCompactionConfig; -import org.apache.hudi.sink.partitioner.BucketAssignFunction; -import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.ChainedTransformer; -import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.sink.transform.Transformer; +import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.CompactionUtil; @@ -235,12 +230,13 @@ public class StreamWriteITCase extends TestLogger { @Test public void testMergeOnReadWriteWithCompaction() throws Exception { + int parallelism = 4; Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.getConfig().disableObjectReuse(); - execEnv.setParallelism(4); + execEnv.setParallelism(parallelism); // set up checkpoint interval execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); @@ -267,49 +263,16 @@ public class StreamWriteITCase extends TestLogger { TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName("UTF-8"); - DataStream hoodieDataStream = execEnv + DataStream dataStream = execEnv // use PROCESS_CONTINUOUSLY mode to trigger checkpoint .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) - .setParallelism(4) - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); - - if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.transform("index_bootstrap", - TypeInformation.of(HoodieRecord.class), - new BootstrapOperator<>(conf)); - } - - DataStream pipeline = hoodieDataStream - // Key-by record key, to avoid multiple subtasks write to a bucket at the same time - .keyBy(HoodieRecord::getRecordKey) - .transform( - "bucket_assigner", - TypeInformation.of(HoodieRecord.class), - new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner") - // shuffle by fileId(bucket id) - .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write"); - - pipeline.addSink(new CleanFunction<>(conf)) - .setParallelism(1) - .name("clean_commits").uid("uid_clean_commits"); - - pipeline.transform("compact_plan_generate", - TypeInformation.of(CompactionPlanEvent.class), - new CompactionPlanOperator(conf)) - .uid("uid_compact_plan_generate") - .setParallelism(1) // plan generate must be singleton - .rebalance() - .transform("compact_task", - TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new CompactFunction(conf))) - .addSink(new CompactionCommitSink(conf)) - .name("compact_commit") - .setParallelism(1); + .setParallelism(parallelism); + DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false); + DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); + Pipelines.clean(conf, pipeline); + Pipelines.compact(conf, pipeline); JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); if (client.getJobStatus().get() != JobStatus.FAILED) { try { @@ -364,27 +327,9 @@ public class StreamWriteITCase extends TestLogger { dataStream = transformer.apply(dataStream); } - DataStream hoodieDataStream = dataStream - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); - - if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.transform("index_bootstrap", - TypeInformation.of(HoodieRecord.class), - new BootstrapOperator<>(conf)); - } - - DataStream pipeline = hoodieDataStream - // Key-by record key, to avoid multiple subtasks write to a bucket at the same time - .keyBy(HoodieRecord::getRecordKey) - .transform( - "bucket_assigner", - TypeInformation.of(HoodieRecord.class), - new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner") - // shuffle by fileId(bucket id) - .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write"); + int parallelism = execEnv.getParallelism(); + DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false); + DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); execEnv.addOperator(pipeline.getTransformation()); JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));