From 1562bb658f8f29f57763eaa6f9bd5a2ed7e80a7c Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Wed, 4 May 2022 19:47:11 +0530 Subject: [PATCH] [HUDI-4031] Avoid clustering update handling when no pending replacecommit (#5487) --- .../action/commit/BaseSparkCommitActionExecutor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index ade550897..205da82ac 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -112,10 +112,8 @@ public abstract class BaseSparkCommitActionExecutor> clusteringHandleUpdate(HoodieData> inputRecords) { + private HoodieData> clusteringHandleUpdate(HoodieData> inputRecords, Set fileGroupsInPendingClustering) { context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering"); - Set fileGroupsInPendingClustering = - table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()); UpdateStrategy>> updateStrategy = (UpdateStrategy>>) ReflectionUtils .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering); Pair>, Set> recordsAndPendingClusteringFileGroups = @@ -166,7 +164,9 @@ public abstract class BaseSparkCommitActionExecutor> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords); + Set fileGroupsInPendingClustering = + table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()); + HoodieData> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering); context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data"); HoodieData writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);