diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 20cd833af..60f481d29 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -30,6 +30,7 @@ import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; +import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.util.AvroSchemaConverter; @@ -43,7 +44,6 @@ import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.data.RowData; @@ -125,7 +125,7 @@ public class HoodieFlinkStreamer { .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), - new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) .uid("uid_bucket_assigner") // shuffle by fileId(bucket id)