diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 05fcda6a6..b0f7ada0f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -103,7 +103,7 @@ public class HoodieFlinkStreamer { .uid("uid_bucket_assigner") // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", null, operatorFactory) + .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write") .setParallelism(numWriteTask); if (StreamerUtil.needsScheduleCompaction(conf)) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 82360f496..5256cd3d4 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -125,7 +125,7 @@ public class StreamWriteITCase extends TestLogger { .uid("uid_bucket_assigner") // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", null, operatorFactory) + .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write"); execEnv.addOperator(dataStream.getTransformation());