diff --git a/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java index cef4ddaf7..b65b43c0d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java @@ -93,11 +93,10 @@ public class HoodieFlinkStreamer { .name("kafka_to_hudi_record") .uid("kafka_to_hudi_record_uid"); - // InstantGenerateOperator helps to emit globally unique instantTime, it must be executed in one parallelism + // InstantGenerateOperator helps to emit globally unique instantTime inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator()) .name("instant_generator") .uid("instant_generator_id") - .setParallelism(1) // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time .keyBy(HoodieRecord::getPartitionPath) diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 7879243e7..58cad4ef8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -57,10 +57,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** - * Operator helps to generate globally unique instant, it must be executed in one parallelism. Before generate a new - * instant , {@link InstantGenerateOperator} will always check whether the last instant has completed. if it is - * completed, a new instant will be generated immediately, otherwise, wait and check the state of last instant until - * time out and throw an exception. + * Operator helps to generate globally unique instant. Before generate a new instant {@link InstantGenerateOperator} + * will always check whether the last instant has completed. if it is completed and has records flows in, a new instant + * will be generated immediately, otherwise, wait and check the state of last instant until time out and throw an exception. */ public class InstantGenerateOperator extends AbstractStreamOperator implements OneInputStreamOperator { @@ -128,11 +127,11 @@ public class InstantGenerateOperator extends AbstractStreamOperator