From 299958650916eecc243f20bf9362ed70073d30db Mon Sep 17 00:00:00 2001 From: satishkotha Date: Mon, 26 Apr 2021 23:35:01 -0700 Subject: [PATCH] [HUDI-1690] use jsc union instead of rdd union (#2872) --- ...ExecuteClusteringCommitActionExecutor.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java index ed965dfb0..9f6e39afc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java @@ -66,6 +66,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import java.util.stream.Stream; public class SparkExecuteClusteringCommitActionExecutor> extends BaseSparkCommitActionExecutor { @@ -90,10 +91,12 @@ public class SparkExecuteClusteringCommitActionExecutor writeStatusRDD = clusteringPlan.getInputGroups().stream() + Stream> writeStatusRDDStream = clusteringPlan.getInputGroups().stream() .map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams())) - .map(CompletableFuture::join) - .reduce((rdd1, rdd2) -> rdd1.union(rdd2)).orElse(engineContext.emptyRDD()); + .map(CompletableFuture::join); + + JavaRDD[] writeStatuses = convertStreamToArray(writeStatusRDDStream); + JavaRDD writeStatusRDD = engineContext.union(writeStatuses); HoodieWriteMetadata> writeMetadata = buildWriteMetadata(writeStatusRDD); JavaRDD statuses = updateIndex(writeStatusRDD, writeMetadata); @@ -109,6 +112,19 @@ public class SparkExecuteClusteringCommitActionExecutor[] convertStreamToArray(Stream> writeStatusRDDStream) { + Object[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new); + JavaRDD[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length]; + for (int i = 0; i < writeStatusObjects.length; i++) { + writeStatusRDDArray[i] = (JavaRDD) writeStatusObjects[i]; + } + return writeStatusRDDArray; + } + /** * Validate actions taken by clustering. In the first implementation, we validate at least one new file is written. * But we can extend this to add more validation. E.g. number of records read = number of records written etc.