diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 8bbd4aad8..da962f97e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -210,6 +210,7 @@ public class StreamerUtil { .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .build()) + .withKeyGenerator(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)) // needed by TwoToThreeUpgradeHandler .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false)