From 748dcc9aae88d8d8b7d4fb70341c78d02b9ab5fb Mon Sep 17 00:00:00 2001 From: wangxianghu Date: Fri, 22 Jan 2021 13:46:25 +0800 Subject: [PATCH] [MINOR] Remove InstantGeneratorOperator parallelism limit in HoodieFlinkStreamer and update docs (#2471) --- .../java/org/apache/hudi/HoodieFlinkStreamer.java | 3 +-- .../hudi/operator/InstantGenerateOperator.java | 13 ++++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java index cef4ddaf7..b65b43c0d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java @@ -93,11 +93,10 @@ public class HoodieFlinkStreamer { .name("kafka_to_hudi_record") .uid("kafka_to_hudi_record_uid"); - // InstantGenerateOperator helps to emit globally unique instantTime, it must be executed in one parallelism + // InstantGenerateOperator helps to emit globally unique instantTime inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator()) .name("instant_generator") .uid("instant_generator_id") - .setParallelism(1) // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time .keyBy(HoodieRecord::getPartitionPath) diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 7879243e7..58cad4ef8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -57,10 +57,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** - * Operator helps to generate globally unique instant, it must be executed in one parallelism. Before generate a new - * instant , {@link InstantGenerateOperator} will always check whether the last instant has completed. if it is - * completed, a new instant will be generated immediately, otherwise, wait and check the state of last instant until - * time out and throw an exception. + * Operator helps to generate globally unique instant. Before generate a new instant {@link InstantGenerateOperator} + * will always check whether the last instant has completed. if it is completed and has records flows in, a new instant + * will be generated immediately, otherwise, wait and check the state of last instant until time out and throw an exception. */ public class InstantGenerateOperator extends AbstractStreamOperator implements OneInputStreamOperator { @@ -128,11 +127,11 @@ public class InstantGenerateOperator extends AbstractStreamOperator