1
0

[HUDI-724] Parallelize getSmallFiles for partitions (#1421)

Co-authored-by: Feichi Feng <feicfeng@amazon.com>
This commit is contained in:
ffcchi
2020-03-30 01:14:38 -06:00
committed by GitHub
parent fa36082554
commit 1f5b0c77d6
6 changed files with 37 additions and 24 deletions

View File

@@ -489,9 +489,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) { private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) {
if (isUpsert) { if (isUpsert) {
return table.getUpsertPartitioner(profile); return table.getUpsertPartitioner(profile, jsc);
} else { } else {
return table.getInsertPartitioner(profile); return table.getInsertPartitioner(profile, jsc);
} }
} }

View File

@@ -81,6 +81,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2; import scala.Tuple2;
/** /**
@@ -142,16 +143,16 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
} }
@Override @Override
public Partitioner getUpsertPartitioner(WorkloadProfile profile) { public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
if (profile == null) { if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
} }
return new UpsertPartitioner(profile); return new UpsertPartitioner(profile, jsc);
} }
@Override @Override
public Partitioner getInsertPartitioner(WorkloadProfile profile) { public Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
return getUpsertPartitioner(profile); return getUpsertPartitioner(profile, jsc);
} }
@Override @Override
@@ -573,14 +574,14 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
*/ */
protected HoodieRollingStatMetadata rollingStatMetadata; protected HoodieRollingStatMetadata rollingStatMetadata;
UpsertPartitioner(WorkloadProfile profile) { UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
updateLocationToBucket = new HashMap<>(); updateLocationToBucket = new HashMap<>();
partitionPathToInsertBuckets = new HashMap<>(); partitionPathToInsertBuckets = new HashMap<>();
bucketInfoMap = new HashMap<>(); bucketInfoMap = new HashMap<>();
globalStat = profile.getGlobalStat(); globalStat = profile.getGlobalStat();
rollingStatMetadata = getRollingStats(); rollingStatMetadata = getRollingStats();
assignUpdates(profile); assignUpdates(profile);
assignInserts(profile); assignInserts(profile, jsc);
LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n" LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
+ "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n" + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
@@ -610,18 +611,24 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return bucket; return bucket;
} }
private void assignInserts(WorkloadProfile profile) { private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
// for new inserts, compute buckets depending on how many records we have for each partition // for new inserts, compute buckets depending on how many records we have for each partition
Set<String> partitionPaths = profile.getPartitionPaths(); Set<String> partitionPaths = profile.getPartitionPaths();
long averageRecordSize = long averageRecordSize =
averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
config.getCopyOnWriteRecordSizeEstimate()); config.getCopyOnWriteRecordSizeEstimate());
LOG.info("AvgRecordSize => " + averageRecordSize); LOG.info("AvgRecordSize => " + averageRecordSize);
Map<String, List<SmallFile>> partitionSmallFilesMap =
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);
for (String partitionPath : partitionPaths) { for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath); WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
if (pStat.getNumInserts() > 0) { if (pStat.getNumInserts() > 0) {
List<SmallFile> smallFiles = getSmallFiles(partitionPath); List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
this.smallFiles.addAll(smallFiles);
LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
long totalUnassignedInserts = pStat.getNumInserts(); long totalUnassignedInserts = pStat.getNumInserts();
@@ -684,6 +691,18 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
} }
} }
private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) {
Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
if (partitionPaths != null && partitionPaths.size() > 0) {
JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();
}
return partitionSmallFilesMap;
}
/** /**
* Returns a list of small files in the given partition path. * Returns a list of small files in the given partition path.
*/ */
@@ -706,8 +725,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = file.getFileSize(); sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf); smallFileLocations.add(sf);
// Update the global small files list
smallFiles.add(sf);
} }
} }
} }

View File

@@ -89,11 +89,11 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
} }
@Override @Override
public Partitioner getUpsertPartitioner(WorkloadProfile profile) { public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
if (profile == null) { if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
} }
mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile); mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile, jsc);
return mergeOnReadUpsertPartitioner; return mergeOnReadUpsertPartitioner;
} }
@@ -325,8 +325,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
*/ */
class MergeOnReadUpsertPartitioner extends HoodieCopyOnWriteTable.UpsertPartitioner { class MergeOnReadUpsertPartitioner extends HoodieCopyOnWriteTable.UpsertPartitioner {
MergeOnReadUpsertPartitioner(WorkloadProfile profile) { MergeOnReadUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
super(profile); super(profile, jsc);
} }
@Override @Override
@@ -376,16 +376,12 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(smallFileSlice); sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf); smallFileLocations.add(sf);
// Update the global small files list
smallFiles.add(sf);
} else { } else {
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath())); FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice); sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf); smallFileLocations.add(sf);
// Update the global small files list
smallFiles.add(sf);
} }
} }
} }

View File

@@ -118,12 +118,12 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
/** /**
* Provides a partitioner to perform the upsert operation, based on the workload profile. * Provides a partitioner to perform the upsert operation, based on the workload profile.
*/ */
public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile); public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc);
/** /**
* Provides a partitioner to perform the insert operation, based on the workload profile. * Provides a partitioner to perform the insert operation, based on the workload profile.
*/ */
public abstract Partitioner getInsertPartitioner(WorkloadProfile profile); public abstract Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc);
/** /**
* Return whether this HoodieTable implementation can benefit from workload profiling. * Return whether this HoodieTable implementation can benefit from workload profiling.

View File

@@ -415,7 +415,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
records.addAll(updateRecords); records.addAll(updateRecords);
WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records)); WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records));
HoodieCopyOnWriteTable.UpsertPartitioner partitioner = HoodieCopyOnWriteTable.UpsertPartitioner partitioner =
(HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile); (HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile, jsc);
assertEquals("Update record should have gone to the 1 update partition", 0, partitioner.getPartition( assertEquals("Update record should have gone to the 1 update partition", 0, partitioner.getPartition(
new Tuple2<>(updateRecords.get(0).getKey(), Option.ofNullable(updateRecords.get(0).getCurrentLocation())))); new Tuple2<>(updateRecords.get(0).getKey(), Option.ofNullable(updateRecords.get(0).getCurrentLocation()))));
return partitioner; return partitioner;

View File

@@ -1272,7 +1272,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
JavaRDD<HoodieRecord> deleteRDD = jsc.parallelize(fewRecordsForDelete, 1); JavaRDD<HoodieRecord> deleteRDD = jsc.parallelize(fewRecordsForDelete, 1);
// initialize partitioner // initialize partitioner
hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD)); hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD), jsc);
final List<List<WriteStatus>> deleteStatus = jsc.parallelize(Arrays.asList(1)).map(x -> { final List<List<WriteStatus>> deleteStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
return hoodieTable.handleUpdate(newDeleteTime, partitionPath, fileId, fewRecordsForDelete.iterator()); return hoodieTable.handleUpdate(newDeleteTime, partitionPath, fileId, fewRecordsForDelete.iterator());
}).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect(); }).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();