diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 43d476cf2..87a655198 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.common.model.ClusteringOperation; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; @@ -65,6 +66,7 @@ import org.apache.flink.table.types.logical.RowType; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; /** * Utilities to generate all kinds of sub-pipelines. @@ -357,7 +359,7 @@ public class Pipelines { * *
* /=== | task1 | ===\
- * | plan generation | ===> re-balance | commit |
+ * | plan generation | ===> hash | commit |
* \=== | task2 | ===/
*
* Note: both the compaction plan generation task and commission task are singleton.
@@ -372,7 +374,7 @@ public class Pipelines {
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
- .rebalance()
+ .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
@@ -392,7 +394,7 @@ public class Pipelines {
*
*
* /=== | task1 | ===\
- * | plan generation | ===> re-balance | commit |
+ * | plan generation | ===> hash | commit |
* \=== | task2 | ===/
*
* Note: both the clustering plan generation task and commission task are singleton.
@@ -408,7 +410,9 @@ public class Pipelines {
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
- .rebalance()
+ .keyBy(plan -> plan.getClusteringGroupInfo().getOperations()
+ .stream().map(ClusteringOperation::getFileId)
+ .collect(Collectors.joining()))
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))