[HUDI-1690] use jsc union instead of rdd union (#2872)
This commit is contained in:
@@ -66,6 +66,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
|
public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
|
||||||
extends BaseSparkCommitActionExecutor<T> {
|
extends BaseSparkCommitActionExecutor<T> {
|
||||||
@@ -90,10 +91,12 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
|
|||||||
|
|
||||||
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(context);
|
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(context);
|
||||||
// execute clustering for each group async and collect WriteStatus
|
// execute clustering for each group async and collect WriteStatus
|
||||||
JavaRDD<WriteStatus> writeStatusRDD = clusteringPlan.getInputGroups().stream()
|
Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = clusteringPlan.getInputGroups().stream()
|
||||||
.map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams()))
|
.map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams()))
|
||||||
.map(CompletableFuture::join)
|
.map(CompletableFuture::join);
|
||||||
.reduce((rdd1, rdd2) -> rdd1.union(rdd2)).orElse(engineContext.emptyRDD());
|
|
||||||
|
JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusRDDStream);
|
||||||
|
JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
|
||||||
|
|
||||||
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = buildWriteMetadata(writeStatusRDD);
|
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = buildWriteMetadata(writeStatusRDD);
|
||||||
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
|
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
|
||||||
@@ -109,6 +112,19 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
|
|||||||
return writeMetadata;
|
return writeMetadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream to array conversion with generic type is not straightforward.
|
||||||
|
* Implement a utility method to abstract high level logic. This needs to be improved in future
|
||||||
|
*/
|
||||||
|
private JavaRDD<WriteStatus>[] convertStreamToArray(Stream<JavaRDD<WriteStatus>> writeStatusRDDStream) {
|
||||||
|
Object[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new);
|
||||||
|
JavaRDD<WriteStatus>[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length];
|
||||||
|
for (int i = 0; i < writeStatusObjects.length; i++) {
|
||||||
|
writeStatusRDDArray[i] = (JavaRDD<WriteStatus>) writeStatusObjects[i];
|
||||||
|
}
|
||||||
|
return writeStatusRDDArray;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
|
* Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
|
||||||
* But we can extend this to add more validation. E.g. number of records read = number of records written etc.
|
* But we can extend this to add more validation. E.g. number of records read = number of records written etc.
|
||||||
|
|||||||
Reference in New Issue
Block a user