diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 5d2ec768a..a0d1867f9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -489,9 +489,9 @@ public class HoodieWriteClient extends AbstractHo private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) { if (isUpsert) { - return table.getUpsertPartitioner(profile); + return table.getUpsertPartitioner(profile, jsc); } else { - return table.getInsertPartitioner(profile); + return table.getInsertPartitioner(profile, jsc); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 874c3e82d..de43900b9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -81,6 +81,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; /** @@ -142,16 +143,16 @@ public class HoodieCopyOnWriteTable extends Hoodi } @Override - public Partitioner getUpsertPartitioner(WorkloadProfile profile) { + public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) { if (profile == null) { throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); } - return new UpsertPartitioner(profile); + return new UpsertPartitioner(profile, jsc); } @Override - public Partitioner getInsertPartitioner(WorkloadProfile profile) { - return getUpsertPartitioner(profile); + public Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) { + return getUpsertPartitioner(profile, jsc); } @Override @@ -573,14 +574,14 @@ public class HoodieCopyOnWriteTable extends Hoodi */ protected HoodieRollingStatMetadata rollingStatMetadata; - UpsertPartitioner(WorkloadProfile profile) { + UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) { updateLocationToBucket = new HashMap<>(); partitionPathToInsertBuckets = new HashMap<>(); bucketInfoMap = new HashMap<>(); globalStat = profile.getGlobalStat(); rollingStatMetadata = getRollingStats(); assignUpdates(profile); - assignInserts(profile); + assignInserts(profile, jsc); LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n" + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n" @@ -610,18 +611,24 @@ public class HoodieCopyOnWriteTable extends Hoodi return bucket; } - private void assignInserts(WorkloadProfile profile) { + private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) { // for new inserts, compute buckets depending on how many records we have for each partition Set partitionPaths = profile.getPartitionPaths(); long averageRecordSize = averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), config.getCopyOnWriteRecordSizeEstimate()); LOG.info("AvgRecordSize => " + averageRecordSize); + + Map> partitionSmallFilesMap = + getSmallFilesForPartitions(new ArrayList(partitionPaths), jsc); + for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); if (pStat.getNumInserts() > 0) { - List smallFiles = getSmallFiles(partitionPath); + List smallFiles = partitionSmallFilesMap.get(partitionPath); + this.smallFiles.addAll(smallFiles); + LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); long totalUnassignedInserts = pStat.getNumInserts(); @@ -684,6 +691,18 @@ public class HoodieCopyOnWriteTable extends Hoodi } } + private Map> getSmallFilesForPartitions(List partitionPaths, JavaSparkContext jsc) { + + Map> partitionSmallFilesMap = new HashMap<>(); + if (partitionPaths != null && partitionPaths.size() > 0) { + JavaRDD partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size()); + partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction>) + partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap(); + } + + return partitionSmallFilesMap; + } + /** * Returns a list of small files in the given partition path. */ @@ -706,8 +725,6 @@ public class HoodieCopyOnWriteTable extends Hoodi sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); sf.sizeBytes = file.getFileSize(); smallFileLocations.add(sf); - // Update the global small files list - smallFiles.add(sf); } } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index 6cb604c66..7e96eb687 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -89,11 +89,11 @@ public class HoodieMergeOnReadTable extends Hoodi } @Override - public Partitioner getUpsertPartitioner(WorkloadProfile profile) { + public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) { if (profile == null) { throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); } - mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile); + mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile, jsc); return mergeOnReadUpsertPartitioner; } @@ -325,8 +325,8 @@ public class HoodieMergeOnReadTable extends Hoodi */ class MergeOnReadUpsertPartitioner extends HoodieCopyOnWriteTable.UpsertPartitioner { - MergeOnReadUpsertPartitioner(WorkloadProfile profile) { - super(profile); + MergeOnReadUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) { + super(profile, jsc); } @Override @@ -376,16 +376,12 @@ public class HoodieMergeOnReadTable extends Hoodi sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); - // Update the global small files list - smallFiles.add(sf); } else { HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), FSUtils.getFileIdFromLogPath(logFile.getPath())); sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); - // Update the global small files list - smallFiles.add(sf); } } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 143086f4d..e3c134c10 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -118,12 +118,12 @@ public abstract class HoodieTable implements Seri /** * Provides a partitioner to perform the upsert operation, based on the workload profile. */ - public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile); + public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc); /** * Provides a partitioner to perform the insert operation, based on the workload profile. */ - public abstract Partitioner getInsertPartitioner(WorkloadProfile profile); + public abstract Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc); /** * Return whether this HoodieTable implementation can benefit from workload profiling. diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index f02afddb4..f670b868f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -415,7 +415,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { records.addAll(updateRecords); WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records)); HoodieCopyOnWriteTable.UpsertPartitioner partitioner = - (HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile); + (HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile, jsc); assertEquals("Update record should have gone to the 1 update partition", 0, partitioner.getPartition( new Tuple2<>(updateRecords.get(0).getKey(), Option.ofNullable(updateRecords.get(0).getCurrentLocation())))); return partitioner; diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index 966cc5210..b48ad3f16 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -1272,7 +1272,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { JavaRDD deleteRDD = jsc.parallelize(fewRecordsForDelete, 1); // initialize partitioner - hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD)); + hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD), jsc); final List> deleteStatus = jsc.parallelize(Arrays.asList(1)).map(x -> { return hoodieTable.handleUpdate(newDeleteTime, partitionPath, fileId, fewRecordsForDelete.iterator()); }).map(x -> (List) HoodieClientTestUtils.collectStatuses(x)).collect();