From 2d2d5c83b1ca33ad413746987c4aa816dd43520d Mon Sep 17 00:00:00 2001 From: satishkotha Date: Fri, 29 Jan 2021 10:27:09 -0800 Subject: [PATCH] [HUDI-1555] Remove isEmpty to improve clustering execution performance (#2502) --- ...SparkRecentDaysClusteringPlanStrategy.java | 12 ++++++--- .../SparkSortAndSizeExecutionStrategy.java | 3 +++ ...ExecuteClusteringCommitActionExecutor.java | 27 ++++++++++++++----- .../commit/BaseSparkCommitActionExecutor.java | 7 ++++- .../functional/TestStructuredStreaming.scala | 23 ++++++++++------ 5 files changed, 54 insertions(+), 18 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java index f1382ac3c..ed61c0cc7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java @@ -71,20 +71,26 @@ public class SparkRecentDaysClusteringPlanStrategy buildClusteringGroupsForPartition(String partitionPath, List fileSlices) { List, Integer>> fileSliceGroups = new ArrayList<>(); List currentGroup = new ArrayList<>(); - int totalSizeSoFar = 0; + long totalSizeSoFar = 0; for (FileSlice currentSlice : fileSlices) { // assume each filegroup size is ~= parquet.max.file.size totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize(); // check if max size is reached and create new group, if needed. if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { - fileSliceGroups.add(Pair.of(currentGroup, getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()))); + int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()); + LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: " + + getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); + fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); currentGroup = new ArrayList<>(); totalSizeSoFar = 0; } currentGroup.add(currentSlice); } if (!currentGroup.isEmpty()) { - fileSliceGroups.add(Pair.of(currentGroup, getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()))); + int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()); + LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: " + + getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); + fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); } return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder() diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index cfbc2ec22..07f9bc14f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; import org.apache.hudi.table.BulkInsertPartitioner; @@ -66,10 +67,12 @@ public class SparkSortAndSizeExecutionStrategy> @Override public JavaRDD performClustering(final JavaRDD> inputRecords, final int numOutputGroups, final String instantTime, final Map strategyParams, final Schema schema) { + LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); Properties props = getWriteConfig().getProps(); props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM, String.valueOf(numOutputGroups)); // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. props.put(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, Boolean.FALSE.toString()); + props.put(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); return (JavaRDD) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java index 1c8d9022a..9a9109afb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java @@ -94,12 +94,13 @@ public class SparkExecuteClusteringCommitActionExecutor runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams())) .map(CompletableFuture::join) .reduce((rdd1, rdd2) -> rdd1.union(rdd2)).orElse(engineContext.emptyRDD()); - if (writeStatusRDD.isEmpty()) { - throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime + " #groups: " + clusteringPlan.getInputGroups().size()); - } - + HoodieWriteMetadata> writeMetadata = buildWriteMetadata(writeStatusRDD); - updateIndexAndCommitIfNeeded(writeStatusRDD, writeMetadata); + JavaRDD statuses = updateIndex(writeStatusRDD, writeMetadata); + writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect()); + // validate clustering action before committing result + validateWriteResult(writeMetadata); + commitOnAutoCommit(writeMetadata); if (!writeMetadata.getCommitMetadata().isPresent()) { HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatusRDD.map(WriteStatus::getStat).collect(), writeMetadata.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); @@ -108,6 +109,21 @@ public class SparkExecuteClusteringCommitActionExecutor> writeMetadata) { + if (writeMetadata.getWriteStatuses().isEmpty()) { + throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime + + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " + + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + + " write statuses"); + } + } + /** * Submit job to execute clustering for the group. */ @@ -222,7 +238,6 @@ public class SparkExecuteClusteringCommitActionExecutor> result = new HoodieWriteMetadata<>(); result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeStatusJavaRDD)); result.setWriteStatuses(writeStatusJavaRDD); - result.setWriteStats(writeStatusJavaRDD.map(WriteStatus::getStat).collect()); result.setCommitMetadata(Option.empty()); result.setCommitted(false); return result; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 5a4d79c7e..a99b00162 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -208,7 +208,7 @@ public abstract class BaseSparkCommitActionExecutor writeStatusRDD, HoodieWriteMetadata result) { + protected JavaRDD updateIndex(JavaRDD writeStatusRDD, HoodieWriteMetadata result) { // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future // RDD actions that are performed after updating the index. writeStatusRDD = writeStatusRDD.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps())); @@ -218,6 +218,11 @@ public abstract class BaseSparkCommitActionExecutor writeStatusRDD, HoodieWriteMetadata result) { + updateIndex(writeStatusRDD, result); commitOnAutoCommit(result); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index fd768c669..08e1c82dc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -20,8 +20,8 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable} import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, HoodieWriteConfig} import org.apache.hudi.exception.TableNotFoundException import org.apache.hudi.testutils.HoodieClientTestBase @@ -243,17 +243,24 @@ class TestStructuredStreaming extends HoodieClientTestBase { val f2 = Future { inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) // wait for spark streaming to process one microbatch - val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5) + var currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5) assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000")) inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) // wait for spark streaming to process second microbatch - waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) - assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) - - // check have more than one file group - this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true) - assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1) + currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) + // for inline clustering, clustering may be complete along with 2nd commit + if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline().countInstants() > 0) { + assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) + // check have at least one file group + this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true) + assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0) + } else { + assertEquals(currNumCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) + // check have more than one file group + this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true) + assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1) + } // check clustering result checkClusteringResult(destPath)