From 23a4a96eb416c13d8b1f921cc51286f93d61d9b3 Mon Sep 17 00:00:00 2001 From: moranyuwen <798537634@qq.com> Date: Wed, 14 Jul 2021 17:38:34 +0800 Subject: [PATCH] [HUDI-2153] Fix BucketAssignFunction Context NullPointerException --- .../java/org/apache/hudi/streamer/HoodieFlinkStreamer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 20cd833af..60f481d29 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -30,6 +30,7 @@ import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; +import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.util.AvroSchemaConverter; @@ -43,7 +44,6 @@ import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.data.RowData; @@ -125,7 +125,7 @@ public class HoodieFlinkStreamer { .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), - new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) .uid("uid_bucket_assigner") // shuffle by fileId(bucket id)