diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 92dda123f..4824c757c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.RewriteAvroPayload; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.FutureUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -88,16 +89,17 @@ public abstract class MultipleSparkJobExecutionStrategy> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) { - // execute clustering for each group async and collect WriteStatus JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext()); // execute clustering for each group async and collect WriteStatus - Stream> writeStatusRDDStream = clusteringPlan.getInputGroups().stream() + Stream> writeStatusRDDStream = FutureUtils.allOf( + clusteringPlan.getInputGroups().stream() .map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams(), Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), instantTime)) - .map(CompletableFuture::join); - + .collect(Collectors.toList())) + .join() + .stream(); JavaRDD[] writeStatuses = convertStreamToArray(writeStatusRDDStream); JavaRDD writeStatusRDD = engineContext.union(writeStatuses); @@ -145,7 +147,6 @@ public abstract class MultipleSparkJobExecutionStrategy CompletableFuture> allOf(@Nonnull List> futures) { + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .thenApply(aVoid -> + futures.stream() + // NOTE: This join wouldn't block, since all the + // futures are completed at this point. + .map(CompletableFuture::join) + .collect(Collectors.toList())); + } +}