diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index bd485634d..05fcda6a6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -22,6 +22,11 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; +import org.apache.hudi.sink.compact.CompactionPlanOperator; +import org.apache.hudi.sink.compact.CompactionPlanEvent; +import org.apache.hudi.sink.compact.CompactionCommitEvent; +import org.apache.hudi.sink.compact.CompactFunction; +import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.util.AvroSchemaConverter; @@ -33,6 +38,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; @@ -76,7 +82,7 @@ public class HoodieFlinkStreamer { StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); - env.addSource(new FlinkKafkaConsumer<>( + DataStream pipeline = env.addSource(new FlinkKafkaConsumer<>( cfg.kafkaTopic, new JsonRowDataDeserializationSchema( rowType, @@ -99,11 +105,26 @@ public class HoodieFlinkStreamer { .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", null, operatorFactory) .uid("uid_hoodie_stream_write") - .setParallelism(numWriteTask) - .addSink(new CleanFunction<>(conf)) - .setParallelism(1) - .name("clean_commits") - .uid("uid_clean_commits"); + .setParallelism(numWriteTask); + if (StreamerUtil.needsScheduleCompaction(conf)) { + pipeline.transform("compact_plan_generate", + TypeInformation.of(CompactionPlanEvent.class), + new CompactionPlanOperator(conf)) + .uid("uid_compact_plan_generate") + .setParallelism(1) // plan generate must be singleton + .keyBy(event -> event.getOperation().hashCode()) + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new KeyedProcessOperator<>(new CompactFunction(conf))) + .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) + .addSink(new CompactionCommitSink(conf)) + .name("compact_commit") + .setParallelism(1); // compaction commit should be singleton + } else { + pipeline.addSink(new CleanFunction<>(conf)) + .setParallelism(1) + .name("clean_commits").uid("uid_clean_commits"); + } env.execute(cfg.targetTableName); }