From 5009138d044b4d859237f0f581aeeb71065dc526 Mon Sep 17 00:00:00 2001 From: RexAn Date: Fri, 18 Feb 2022 21:57:04 +0800 Subject: [PATCH] [HUDI-3438] Avoid getSmallFiles if hoodie.parquet.small.file.limit is 0 (#4823) Co-authored-by: Hui An --- .../org/apache/hudi/config/HoodieCompactionConfig.java | 3 ++- .../hudi/table/action/commit/JavaUpsertPartitioner.java | 7 ++++++- .../hudi/table/action/commit/UpsertPartitioner.java | 7 ++++++- .../deltacommit/SparkUpsertDeltaCommitPartitioner.java | 4 ++++ .../hudi/sink/partitioner/profile/WriteProfile.java | 9 ++++++++- 5 files changed, 26 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 0d0984354..65b36fcb6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -173,7 +173,8 @@ public class HoodieCompactionConfig extends HoodieConfig { .defaultValue(String.valueOf(104857600)) .withDocumentation("During upsert operation, we opportunistically expand existing small files on storage, instead of writing" + " new files, to keep number of files to an optimum. This config sets the file size limit below which a file on storage " - + " becomes a candidate to be selected as such a `small file`. By default, treat any file <= 100MB as a small file."); + + " becomes a candidate to be selected as such a `small file`. By default, treat any file <= 100MB as a small file." + + " Also note that if this set <= 0, will not try to get small files and directly write new files"); public static final ConfigProperty RECORD_SIZE_ESTIMATION_THRESHOLD = ConfigProperty .key("hoodie.record.size.estimation.threshold") diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java index 33f59f440..f8aaba529 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java @@ -135,7 +135,7 @@ public class JavaUpsertPartitioner> implements WorkloadStat pStat = profile.getWorkloadStat(partitionPath); if (pStat.getNumInserts() > 0) { - List smallFiles = partitionSmallFilesMap.get(partitionPath); + List smallFiles = partitionSmallFilesMap.getOrDefault(partitionPath, new ArrayList<>()); this.smallFiles.addAll(smallFiles); LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); @@ -205,6 +205,11 @@ public class JavaUpsertPartitioner> implements private Map> getSmallFilesForPartitions(List partitionPaths, HoodieEngineContext context) { Map> partitionSmallFilesMap = new HashMap<>(); + + if (config.getParquetSmallFileLimit() <= 0) { + return partitionSmallFilesMap; + } + if (partitionPaths != null && partitionPaths.size() > 0) { context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions"); partitionSmallFilesMap = context.mapToPair(partitionPaths, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 6729da72d..98f2539c6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -165,7 +165,7 @@ public class UpsertPartitioner> extends SparkHo List smallFiles = filterSmallFilesInClustering(partitionPathToPendingClusteringFileGroupsId.getOrDefault(partitionPath, Collections.emptySet()), - partitionSmallFilesMap.get(partitionPath)); + partitionSmallFilesMap.getOrDefault(partitionPath, new ArrayList<>())); this.smallFiles.addAll(smallFiles); @@ -241,6 +241,11 @@ public class UpsertPartitioner> extends SparkHo private Map> getSmallFilesForPartitions(List partitionPaths, HoodieEngineContext context) { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); Map> partitionSmallFilesMap = new HashMap<>(); + + if (config.getParquetSmallFileLimit() <= 0) { + return partitionSmallFilesMap; + } + if (partitionPaths != null && partitionPaths.size() > 0) { context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions"); JavaRDD partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java index 8dd3146f5..e498019c4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java @@ -97,6 +97,10 @@ public class SparkUpsertDeltaCommitPartitioner> .collect(Collectors.toList()); } + if (config.getParquetSmallFileLimit() <= 0) { + return Collections.emptyList(); + } + // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to // it. Doing this overtime for a partition, we ensure that we handle small file issues return table.getSliceView() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 84fcd03f0..fdb8152b0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -180,7 +180,14 @@ public class WriteProfile { if (smallFilesMap.containsKey(partitionPath)) { return smallFilesMap.get(partitionPath); } - List smallFiles = smallFilesProfile(partitionPath); + + List smallFiles = new ArrayList<>(); + if (config.getParquetSmallFileLimit() <= 0) { + this.smallFilesMap.put(partitionPath, smallFiles); + return smallFiles; + } + + smallFiles = smallFilesProfile(partitionPath); this.smallFilesMap.put(partitionPath, smallFiles); return smallFiles; }