1
0

[HUDI-1927] Improve HoodieFlinkStreamer (#3019)

Co-authored-by: enter58xuan <enter58xuan@zto.com>
This commit is contained in:
taylorliao
2021-06-01 18:35:14 +08:00
committed by GitHub
parent 83b0301c1a
commit 83c31e356f

View File

@@ -22,6 +22,11 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -33,6 +38,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
@@ -76,7 +82,7 @@ public class HoodieFlinkStreamer {
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
env.addSource(new FlinkKafkaConsumer<>(
DataStream<Object> pipeline = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
@@ -99,11 +105,26 @@ public class HoodieFlinkStreamer {
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", null, operatorFactory)
.uid("uid_hoodie_stream_write")
.setParallelism(numWriteTask)
.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits")
.uid("uid_clean_commits");
.setParallelism(numWriteTask);
if (StreamerUtil.needsScheduleCompaction(conf)) {
pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.uid("uid_compact_plan_generate")
.setParallelism(1) // plan generate must be singleton
.keyBy(event -> event.getOperation().hashCode())
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new KeyedProcessOperator<>(new CompactFunction(conf)))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
.setParallelism(1); // compaction commit should be singleton
} else {
pipeline.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits").uid("uid_clean_commits");
}
env.execute(cfg.targetTableName);
}