From b781b310452f117dd11c2cd5322b0cb4bea5c423 Mon Sep 17 00:00:00 2001 From: JerryYue-M <272614347@qq.com> Date: Fri, 15 Jul 2022 12:21:50 +0800 Subject: [PATCH] [HUDI-4397] Flink Inline Cluster and Compact plan distribute strategy changed from rebalance to hash to avoid potential multiple threads accessing the same file (#6106) Co-authored-by: jerryyue --- .../java/org/apache/hudi/sink/utils/Pipelines.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 43d476cf2..87a655198 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.common.model.ClusteringOperation; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; @@ -65,6 +66,7 @@ import org.apache.flink.table.types.logical.RowType; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; /** * Utilities to generate all kinds of sub-pipelines. @@ -357,7 +359,7 @@ public class Pipelines { * *
    *                                           /=== | task1 | ===\
-   *      | plan generation | ===> re-balance                      | commit |
+   *      | plan generation | ===> hash                           | commit |
    *                                           \=== | task2 | ===/
    *
    *      Note: both the compaction plan generation task and commission task are singleton.
@@ -372,7 +374,7 @@ public class Pipelines {
             TypeInformation.of(CompactionPlanEvent.class),
             new CompactionPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
-        .rebalance()
+        .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
         .transform("compact_task",
             TypeInformation.of(CompactionCommitEvent.class),
             new ProcessOperator<>(new CompactFunction(conf)))
@@ -392,7 +394,7 @@ public class Pipelines {
    *
    * 
    *                                           /=== | task1 | ===\
-   *      | plan generation | ===> re-balance                      | commit |
+   *      | plan generation | ===> hash                           | commit |
    *                                           \=== | task2 | ===/
    *
    *      Note: both the clustering plan generation task and commission task are singleton.
@@ -408,7 +410,9 @@ public class Pipelines {
             TypeInformation.of(ClusteringPlanEvent.class),
             new ClusteringPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
-        .rebalance()
+        .keyBy(plan -> plan.getClusteringGroupInfo().getOperations()
+          .stream().map(ClusteringOperation::getFileId)
+          .collect(Collectors.joining()))
         .transform("clustering_task",
             TypeInformation.of(ClusteringCommitEvent.class),
             new ClusteringOperator(conf, rowType))