[HUDI-2550] Expand File-Group candidates list for appending for MOR tables (#3986)
This commit is contained in:
@@ -354,6 +354,12 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
.withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)."
|
||||
+ " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained.");
|
||||
|
||||
public static final ConfigProperty<Integer> MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT = ConfigProperty
|
||||
.key("hoodie.merge.small.file.group.candidates.limit")
|
||||
.defaultValue(1)
|
||||
.withDocumentation("Limits number of file groups, whose base file satisfies small-file limit, to consider for appending records during upsert operation. "
|
||||
+ "Only applicable to MOR tables");
|
||||
|
||||
public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS = ConfigProperty
|
||||
.key("hoodie.client.heartbeat.interval_in_ms")
|
||||
.defaultValue(60 * 1000)
|
||||
@@ -1035,6 +1041,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE);
|
||||
}
|
||||
|
||||
public int getSmallFileGroupCandidatesLimit() {
|
||||
return getInt(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT);
|
||||
}
|
||||
|
||||
public EngineType getEngineType() {
|
||||
return engineType;
|
||||
}
|
||||
@@ -2116,6 +2126,11 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withMergeSmallFileGroupCandidatesLimit(int limit) {
|
||||
writeConfig.setValue(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT, String.valueOf(limit));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) {
|
||||
writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS, String.valueOf(heartbeatIntervalInMs));
|
||||
return this;
|
||||
|
||||
@@ -288,6 +288,10 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
||||
return smallFileLocations;
|
||||
}
|
||||
|
||||
public List<BucketInfo> getBucketInfos() {
|
||||
return Collections.unmodifiableList(new ArrayList<>(bucketInfoMap.values()));
|
||||
}
|
||||
|
||||
public BucketInfo getBucketInfo(int bucketNumber) {
|
||||
return bucketInfoMap.get(bucketNumber);
|
||||
}
|
||||
|
||||
@@ -26,15 +26,16 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.WorkloadProfile;
|
||||
|
||||
import org.apache.hudi.table.action.commit.SmallFile;
|
||||
import org.apache.hudi.table.action.commit.UpsertPartitioner;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -51,68 +52,68 @@ public class SparkUpsertDeltaCommitPartitioner<T extends HoodieRecordPayload<T>>
|
||||
|
||||
@Override
|
||||
protected List<SmallFile> getSmallFiles(String partitionPath) {
|
||||
|
||||
// smallFiles only for partitionPath
|
||||
List<SmallFile> smallFileLocations = new ArrayList<>();
|
||||
|
||||
// Init here since this class (and member variables) might not have been initialized
|
||||
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
|
||||
|
||||
// Find out all eligible small file slices
|
||||
if (!commitTimeline.empty()) {
|
||||
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
||||
// find smallest file in partition and append to it
|
||||
List<FileSlice> allSmallFileSlices = new ArrayList<>();
|
||||
// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
|
||||
// it. Doing this overtime for a partition, we ensure that we handle small file issues
|
||||
if (!table.getIndex().canIndexLogFiles()) {
|
||||
// TODO : choose last N small files since there can be multiple small files written to a single partition
|
||||
// by different spark partitions in a single batch
|
||||
Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView()
|
||||
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
|
||||
.filter(
|
||||
fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config
|
||||
.getParquetSmallFileLimit())
|
||||
.min((FileSlice left, FileSlice right) ->
|
||||
left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
|
||||
if (smallFileSlice.isPresent()) {
|
||||
allSmallFileSlices.add(smallFileSlice.get());
|
||||
}
|
||||
if (commitTimeline.empty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
||||
|
||||
// Find out all eligible small file slices, looking for
|
||||
// smallest file in the partition to append to
|
||||
List<FileSlice> smallFileSlicesCandidates = getSmallFileCandidates(partitionPath, latestCommitTime);
|
||||
List<SmallFile> smallFileLocations = new ArrayList<>();
|
||||
|
||||
// Create SmallFiles from the eligible file slices
|
||||
for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
|
||||
SmallFile sf = new SmallFile();
|
||||
if (smallFileSlice.getBaseFile().isPresent()) {
|
||||
// TODO : Move logic of file name, file id, base commit time handling inside file slice
|
||||
String filename = smallFileSlice.getBaseFile().get().getFileName();
|
||||
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
|
||||
sf.sizeBytes = getTotalFileSize(smallFileSlice);
|
||||
smallFileLocations.add(sf);
|
||||
} else {
|
||||
// If we can index log files, we can add more inserts to log files for fileIds NOT including those under
|
||||
// pending compaction
|
||||
List<FileSlice> allFileSlices =
|
||||
table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
|
||||
.collect(Collectors.toList());
|
||||
for (FileSlice fileSlice : allFileSlices) {
|
||||
if (isSmallFile(fileSlice)) {
|
||||
allSmallFileSlices.add(fileSlice);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Create SmallFiles from the eligible file slices
|
||||
for (FileSlice smallFileSlice : allSmallFileSlices) {
|
||||
SmallFile sf = new SmallFile();
|
||||
if (smallFileSlice.getBaseFile().isPresent()) {
|
||||
// TODO : Move logic of file name, file id, base commit time handling inside file slice
|
||||
String filename = smallFileSlice.getBaseFile().get().getFileName();
|
||||
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
|
||||
sf.sizeBytes = getTotalFileSize(smallFileSlice);
|
||||
smallFileLocations.add(sf);
|
||||
} else {
|
||||
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
|
||||
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
|
||||
FSUtils.getFileIdFromLogPath(logFile.getPath()));
|
||||
sf.sizeBytes = getTotalFileSize(smallFileSlice);
|
||||
smallFileLocations.add(sf);
|
||||
}
|
||||
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
|
||||
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
|
||||
FSUtils.getFileIdFromLogPath(logFile.getPath()));
|
||||
sf.sizeBytes = getTotalFileSize(smallFileSlice);
|
||||
smallFileLocations.add(sf);
|
||||
}
|
||||
}
|
||||
return smallFileLocations;
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private List<FileSlice> getSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) {
|
||||
// If we can index log files, we can add more inserts to log files for fileIds NOT including those under
|
||||
// pending compaction
|
||||
if (table.getIndex().canIndexLogFiles()) {
|
||||
return table.getSliceView()
|
||||
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false)
|
||||
.filter(this::isSmallFile)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
|
||||
// it. Doing this overtime for a partition, we ensure that we handle small file issues
|
||||
return table.getSliceView()
|
||||
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false)
|
||||
.filter(
|
||||
fileSlice ->
|
||||
// NOTE: We can not pad slices with existing log-files w/o compacting these,
|
||||
// hence skipping
|
||||
fileSlice.getLogFiles().count() < 1
|
||||
&& fileSlice.getBaseFile().get().getFileSize() < config.getParquetSmallFileLimit())
|
||||
.sorted(Comparator.comparing(fileSlice -> fileSlice.getBaseFile().get().getFileSize()))
|
||||
.limit(config.getSmallFileGroupCandidatesLimit())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<String> getSmallFileIds() {
|
||||
return (List<String>) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId())
|
||||
return smallFiles.stream().map(smallFile -> smallFile.location.getFileId())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@@ -132,8 +133,12 @@ public class SparkUpsertDeltaCommitPartitioner<T extends HoodieRecordPayload<T>>
|
||||
|
||||
// TODO (NA) : Make this static part of utility
|
||||
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
|
||||
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
|
||||
.filter(size -> size > 0).reduce(Long::sum).orElse(0L);
|
||||
long totalSizeOfLogFiles =
|
||||
hoodieLogFiles.stream()
|
||||
.map(HoodieLogFile::getFileSize)
|
||||
.filter(size -> size > 0)
|
||||
.reduce(Long::sum)
|
||||
.orElse(0L);
|
||||
// Here we assume that if there is no base parquet file, all log files contain only inserts.
|
||||
// We can then just get the parquet equivalent size of these log files, compare that with
|
||||
// {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
|
||||
|
||||
@@ -56,6 +56,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
@@ -336,7 +337,6 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
||||
HoodieCompactionPlan plan = CompactionTestUtils.createCompactionPlan(metaClient, "001", "002", 1, true, false);
|
||||
FileCreateUtils.createRequestedCompactionCommit(basePath, "002", plan);
|
||||
// Simulate one more commit so that inflight compaction is considered when building file groups in file system view
|
||||
//
|
||||
FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "2", 1);
|
||||
FileCreateUtils.createCommit(basePath, "003");
|
||||
|
||||
@@ -434,6 +434,49 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
||||
"Insert should be assigned to fg1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertPartitionerWithSmallFileHandlingPickingMultipleCandidates() throws Exception {
|
||||
final String partitionPath = DEFAULT_PARTITION_PATHS[0];
|
||||
|
||||
HoodieWriteConfig config =
|
||||
makeHoodieClientConfigBuilder()
|
||||
.withMergeSmallFileGroupCandidatesLimit(3)
|
||||
.withStorageConfig(
|
||||
HoodieStorageConfig.newBuilder()
|
||||
.parquetMaxFileSize(2048)
|
||||
.build()
|
||||
)
|
||||
.build();
|
||||
|
||||
// Bootstrap base files ("small-file targets")
|
||||
FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-1", 1024);
|
||||
FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-2", 1024);
|
||||
FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-3", 1024);
|
||||
|
||||
FileCreateUtils.createCommit(basePath, "002");
|
||||
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {partitionPath});
|
||||
// Default estimated record size will be 1024 based on last file group created.
|
||||
// Only 1 record can be added to small file
|
||||
WorkloadProfile profile =
|
||||
new WorkloadProfile(buildProfile(jsc.parallelize(dataGenerator.generateInserts("003", 3))));
|
||||
|
||||
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(this.metaClient);
|
||||
|
||||
HoodieSparkTable<?> table = HoodieSparkTable.create(config, context, reloadedMetaClient);
|
||||
|
||||
SparkUpsertDeltaCommitPartitioner<?> partitioner = new SparkUpsertDeltaCommitPartitioner<>(profile, context, table, config);
|
||||
|
||||
assertEquals(3, partitioner.numPartitions());
|
||||
assertEquals(
|
||||
Arrays.asList(
|
||||
new BucketInfo(BucketType.UPDATE, "fg-1", partitionPath),
|
||||
new BucketInfo(BucketType.UPDATE, "fg-2", partitionPath),
|
||||
new BucketInfo(BucketType.UPDATE, "fg-3", partitionPath)
|
||||
),
|
||||
partitioner.getBucketInfos());
|
||||
}
|
||||
|
||||
private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {
|
||||
// Prepare the AvroParquetIO
|
||||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString());
|
||||
|
||||
Reference in New Issue
Block a user