[HUDI-4031] Avoid clustering update handling when no pending replacecommit (#5487)
This commit is contained in:
@@ -112,10 +112,8 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
|
private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
|
||||||
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering");
|
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering");
|
||||||
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
|
|
||||||
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
|
|
||||||
UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = (UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
|
UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = (UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
|
||||||
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
|
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
|
||||||
Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
|
Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
|
||||||
@@ -166,7 +164,9 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handle records update with clustering
|
// handle records update with clustering
|
||||||
HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords);
|
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
|
||||||
|
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
|
||||||
|
HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);
|
||||||
|
|
||||||
context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data");
|
context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data");
|
||||||
HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
|
HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
|
||||||
|
|||||||
Reference in New Issue
Block a user