[HUDI-1953] Fix NPE due to not set the output type of the operator (#3023)
Co-authored-by: enter58xuan <enter58xuan@zto.com>
This commit is contained in:
@@ -103,7 +103,7 @@ public class HoodieFlinkStreamer {
|
|||||||
.uid("uid_bucket_assigner")
|
.uid("uid_bucket_assigner")
|
||||||
// shuffle by fileId(bucket id)
|
// shuffle by fileId(bucket id)
|
||||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||||
.transform("hoodie_stream_write", null, operatorFactory)
|
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_hoodie_stream_write")
|
.uid("uid_hoodie_stream_write")
|
||||||
.setParallelism(numWriteTask);
|
.setParallelism(numWriteTask);
|
||||||
if (StreamerUtil.needsScheduleCompaction(conf)) {
|
if (StreamerUtil.needsScheduleCompaction(conf)) {
|
||||||
|
|||||||
@@ -125,7 +125,7 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
.uid("uid_bucket_assigner")
|
.uid("uid_bucket_assigner")
|
||||||
// shuffle by fileId(bucket id)
|
// shuffle by fileId(bucket id)
|
||||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||||
.transform("hoodie_stream_write", null, operatorFactory)
|
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_hoodie_stream_write");
|
.uid("uid_hoodie_stream_write");
|
||||||
execEnv.addOperator(dataStream.getTransformation());
|
execEnv.addOperator(dataStream.getTransformation());
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user