1
0

[HUDI-2153] Fix BucketAssignFunction Context NullPointerException

This commit is contained in:
moranyuwen
2021-07-14 17:38:34 +08:00
committed by Danny Chan
parent d024439764
commit 23a4a96eb4

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -43,7 +44,6 @@ import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.RowData;
@@ -125,7 +125,7 @@ public class HoodieFlinkStreamer {
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
.uid("uid_bucket_assigner")
// shuffle by fileId(bucket id)