1
0

[HUDI-3438] Avoid getSmallFiles if hoodie.parquet.small.file.limit is 0 (#4823)

Co-authored-by: Hui An <hui.an@shopee.com>
This commit is contained in:
RexAn
2022-02-18 21:57:04 +08:00
committed by GitHub
parent fba5822ee3
commit 5009138d04
5 changed files with 26 additions and 4 deletions

View File

@@ -173,7 +173,8 @@ public class HoodieCompactionConfig extends HoodieConfig {
.defaultValue(String.valueOf(104857600))
.withDocumentation("During upsert operation, we opportunistically expand existing small files on storage, instead of writing"
+ " new files, to keep number of files to an optimum. This config sets the file size limit below which a file on storage "
+ " becomes a candidate to be selected as such a `small file`. By default, treat any file <= 100MB as a small file.");
+ " becomes a candidate to be selected as such a `small file`. By default, treat any file <= 100MB as a small file."
+ " Also note that if this set <= 0, will not try to get small files and directly write new files");
public static final ConfigProperty<String> RECORD_SIZE_ESTIMATION_THRESHOLD = ConfigProperty
.key("hoodie.record.size.estimation.threshold")

View File

@@ -135,7 +135,7 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
if (pStat.getNumInserts() > 0) {
List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
List<SmallFile> smallFiles = partitionSmallFilesMap.getOrDefault(partitionPath, new ArrayList<>());
this.smallFiles.addAll(smallFiles);
LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
@@ -205,6 +205,11 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, HoodieEngineContext context) {
Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
if (config.getParquetSmallFileLimit() <= 0) {
return partitionSmallFilesMap;
}
if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions");
partitionSmallFilesMap = context.mapToPair(partitionPaths,

View File

@@ -165,7 +165,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
List<SmallFile> smallFiles =
filterSmallFilesInClustering(partitionPathToPendingClusteringFileGroupsId.getOrDefault(partitionPath, Collections.emptySet()),
partitionSmallFilesMap.get(partitionPath));
partitionSmallFilesMap.getOrDefault(partitionPath, new ArrayList<>()));
this.smallFiles.addAll(smallFiles);
@@ -241,6 +241,11 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, HoodieEngineContext context) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
if (config.getParquetSmallFileLimit() <= 0) {
return partitionSmallFilesMap;
}
if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions");
JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());

View File

@@ -97,6 +97,10 @@ public class SparkUpsertDeltaCommitPartitioner<T extends HoodieRecordPayload<T>>
.collect(Collectors.toList());
}
if (config.getParquetSmallFileLimit() <= 0) {
return Collections.emptyList();
}
// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
// it. Doing this overtime for a partition, we ensure that we handle small file issues
return table.getSliceView()

View File

@@ -180,7 +180,14 @@ public class WriteProfile {
if (smallFilesMap.containsKey(partitionPath)) {
return smallFilesMap.get(partitionPath);
}
List<SmallFile> smallFiles = smallFilesProfile(partitionPath);
List<SmallFile> smallFiles = new ArrayList<>();
if (config.getParquetSmallFileLimit() <= 0) {
this.smallFilesMap.put(partitionPath, smallFiles);
return smallFiles;
}
smallFiles = smallFilesProfile(partitionPath);
this.smallFilesMap.put(partitionPath, smallFiles);
return smallFiles;
}