1
0

[HUDI-4397] Flink Inline Cluster and Compact plan distribute strategy changed from rebalance to hash to avoid potential multiple threads accessing the same file (#6106)

Co-authored-by: jerryyue <jerryyue@didiglobal.com>
This commit is contained in:
JerryYue-M
2022-07-15 12:21:50 +08:00
committed by GitHub
parent 4898ea52f7
commit b781b31045

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.sink.utils;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
@@ -65,6 +66,7 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Utilities to generate all kinds of sub-pipelines.
@@ -357,7 +359,7 @@ public class Pipelines {
*
* <pre>
* /=== | task1 | ===\
* | plan generation | ===> re-balance | commit |
* | plan generation | ===> hash | commit |
* \=== | task2 | ===/
*
* Note: both the compaction plan generation task and commission task are singleton.
@@ -372,7 +374,7 @@ public class Pipelines {
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.rebalance()
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
@@ -392,7 +394,7 @@ public class Pipelines {
*
* <pre>
* /=== | task1 | ===\
* | plan generation | ===> re-balance | commit |
* | plan generation | ===> hash | commit |
* \=== | task2 | ===/
*
* Note: both the clustering plan generation task and commission task are singleton.
@@ -408,7 +410,9 @@ public class Pipelines {
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.rebalance()
.keyBy(plan -> plan.getClusteringGroupInfo().getOperations()
.stream().map(ClusteringOperation::getFileId)
.collect(Collectors.joining()))
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))