Filtering partition paths before performing a list status on all partitions
This commit is contained in:
committed by
vinoth chandar
parent
d1bb804577
commit
169e3f66bb
@@ -181,6 +181,9 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
|
||||
.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
|
||||
config.shouldAssumeDatePartitioning());
|
||||
|
||||
// filter the partition paths if needed to reduce list status
|
||||
partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
|
||||
|
||||
TableFileSystemView.RealtimeView fileSystemView = hoodieTable.getRTFileSystemView();
|
||||
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
|
||||
List<HoodieCompactionOperation> operations =
|
||||
|
||||
@@ -105,9 +105,19 @@ public abstract class CompactionStrategy implements Serializable {
|
||||
* @param pendingCompactionPlans Pending Compaction Plans for strategy to schedule next compaction plan
|
||||
* @return list of compactions to perform in this run
|
||||
*/
|
||||
protected List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
|
||||
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
|
||||
List<HoodieCompactionOperation> operations,
|
||||
List<HoodieCompactionPlan> pendingCompactionPlans) {
|
||||
return operations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter the partition paths based on compaction strategy
|
||||
* @param writeConfig
|
||||
* @param allPartitionPaths
|
||||
* @return
|
||||
*/
|
||||
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) {
|
||||
return allPartitionPaths;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,4 +70,12 @@ public class DayBasedCompactionStrategy extends CompactionStrategy {
|
||||
.collect(Collectors.toList());
|
||||
return filteredList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) {
|
||||
List<String> filteredPartitionPaths = allPartitionPaths.stream().map(partition -> partition.replace("/", "-"))
|
||||
.sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/"))
|
||||
.collect(Collectors.toList()).subList(0, writeConfig.getTargetPartitionsPerDayBasedCompaction());
|
||||
return filteredPartitionPaths;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user