From 86007e9a13341e1181f940c5f6e5f6dba8eed755 Mon Sep 17 00:00:00 2001 From: taylorliao <726830328@qq.com> Date: Thu, 3 Jun 2021 14:20:57 +0800 Subject: [PATCH] [HUDI-1953] Fix NPE due to not set the output type of the operator (#3023) Co-authored-by: enter58xuan --- .../main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java | 2 +- .../src/test/java/org/apache/hudi/sink/StreamWriteITCase.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 05fcda6a6..b0f7ada0f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -103,7 +103,7 @@ public class HoodieFlinkStreamer { .uid("uid_bucket_assigner") // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", null, operatorFactory) + .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write") .setParallelism(numWriteTask); if (StreamerUtil.needsScheduleCompaction(conf)) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 82360f496..5256cd3d4 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -125,7 +125,7 @@ public class StreamWriteITCase extends TestLogger { .uid("uid_bucket_assigner") // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", null, operatorFactory) + .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write"); execEnv.addOperator(dataStream.getTransformation());