[HUDI-1555] Remove isEmpty to improve clustering execution performance (#2502)
This commit is contained in:
@@ -71,20 +71,26 @@ public class SparkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload
|
||||
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
|
||||
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
|
||||
List<FileSlice> currentGroup = new ArrayList<>();
|
||||
int totalSizeSoFar = 0;
|
||||
long totalSizeSoFar = 0;
|
||||
for (FileSlice currentSlice : fileSlices) {
|
||||
// assume each filegroup size is ~= parquet.max.file.size
|
||||
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize();
|
||||
// check if max size is reached and create new group, if needed.
|
||||
if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
|
||||
fileSliceGroups.add(Pair.of(currentGroup, getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes())));
|
||||
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes());
|
||||
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
|
||||
+ getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
|
||||
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
|
||||
currentGroup = new ArrayList<>();
|
||||
totalSizeSoFar = 0;
|
||||
}
|
||||
currentGroup.add(currentSlice);
|
||||
}
|
||||
if (!currentGroup.isEmpty()) {
|
||||
fileSliceGroups.add(Pair.of(currentGroup, getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes())));
|
||||
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes());
|
||||
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
|
||||
+ getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
|
||||
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
|
||||
}
|
||||
|
||||
return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder()
|
||||
|
||||
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
|
||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||
@@ -66,10 +67,12 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
|
||||
@Override
|
||||
public JavaRDD<WriteStatus> performClustering(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,
|
||||
final String instantTime, final Map<String, String> strategyParams, final Schema schema) {
|
||||
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
|
||||
Properties props = getWriteConfig().getProps();
|
||||
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM, String.valueOf(numOutputGroups));
|
||||
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
|
||||
props.put(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, Boolean.FALSE.toString());
|
||||
props.put(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
|
||||
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
|
||||
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
|
||||
false, getPartitioner(strategyParams, schema), true, numOutputGroups);
|
||||
|
||||
@@ -94,12 +94,13 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
|
||||
.map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams()))
|
||||
.map(CompletableFuture::join)
|
||||
.reduce((rdd1, rdd2) -> rdd1.union(rdd2)).orElse(engineContext.emptyRDD());
|
||||
if (writeStatusRDD.isEmpty()) {
|
||||
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime + " #groups: " + clusteringPlan.getInputGroups().size());
|
||||
}
|
||||
|
||||
|
||||
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = buildWriteMetadata(writeStatusRDD);
|
||||
updateIndexAndCommitIfNeeded(writeStatusRDD, writeMetadata);
|
||||
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
|
||||
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
|
||||
// validate clustering action before committing result
|
||||
validateWriteResult(writeMetadata);
|
||||
commitOnAutoCommit(writeMetadata);
|
||||
if (!writeMetadata.getCommitMetadata().isPresent()) {
|
||||
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatusRDD.map(WriteStatus::getStat).collect(), writeMetadata.getPartitionToReplaceFileIds(),
|
||||
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
|
||||
@@ -108,6 +109,21 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
|
||||
return writeMetadata;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
|
||||
* But we can extend this to add more validation. E.g. number of records read = number of records written etc.
|
||||
*
|
||||
* We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
|
||||
*/
|
||||
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
|
||||
if (writeMetadata.getWriteStatuses().isEmpty()) {
|
||||
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime
|
||||
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
|
||||
+ clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
|
||||
+ " write statuses");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit job to execute clustering for the group.
|
||||
*/
|
||||
@@ -222,7 +238,6 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
|
||||
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
|
||||
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeStatusJavaRDD));
|
||||
result.setWriteStatuses(writeStatusJavaRDD);
|
||||
result.setWriteStats(writeStatusJavaRDD.map(WriteStatus::getStat).collect());
|
||||
result.setCommitMetadata(Option.empty());
|
||||
result.setCommitted(false);
|
||||
return result;
|
||||
|
||||
@@ -208,7 +208,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
||||
return partitionedRDD.map(Tuple2::_2);
|
||||
}
|
||||
|
||||
protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
|
||||
protected JavaRDD<WriteStatus> updateIndex(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
|
||||
// cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
|
||||
// RDD actions that are performed after updating the index.
|
||||
writeStatusRDD = writeStatusRDD.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
|
||||
@@ -218,6 +218,11 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
||||
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
|
||||
result.setWriteStatuses(statuses);
|
||||
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses));
|
||||
return statuses;
|
||||
}
|
||||
|
||||
protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
|
||||
updateIndex(writeStatusRDD, result);
|
||||
commitOnAutoCommit(result);
|
||||
}
|
||||
|
||||
|
||||
@@ -20,8 +20,8 @@ package org.apache.hudi.functional
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hudi.common.model.FileSlice
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable}
|
||||
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
||||
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable}
|
||||
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, HoodieWriteConfig}
|
||||
import org.apache.hudi.exception.TableNotFoundException
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase
|
||||
@@ -243,17 +243,24 @@ class TestStructuredStreaming extends HoodieClientTestBase {
|
||||
val f2 = Future {
|
||||
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
|
||||
// wait for spark streaming to process one microbatch
|
||||
val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
|
||||
var currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
|
||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
|
||||
|
||||
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
|
||||
// wait for spark streaming to process second microbatch
|
||||
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
|
||||
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
|
||||
|
||||
// check have more than one file group
|
||||
this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
|
||||
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
|
||||
currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
|
||||
// for inline clustering, clustering may be complete along with 2nd commit
|
||||
if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline().countInstants() > 0) {
|
||||
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
|
||||
// check have at least one file group
|
||||
this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
|
||||
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
|
||||
} else {
|
||||
assertEquals(currNumCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
|
||||
// check have more than one file group
|
||||
this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
|
||||
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
|
||||
}
|
||||
|
||||
// check clustering result
|
||||
checkClusteringResult(destPath)
|
||||
|
||||
Reference in New Issue
Block a user