From 169e3f66bbdc2a8c4163e8966461b1d891e0ca11 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Fri, 28 Dec 2018 10:24:23 -0800 Subject: [PATCH] Filtering partition paths before performing a list status on all partitions --- .../io/compact/HoodieRealtimeTableCompactor.java | 3 +++ .../io/compact/strategy/CompactionStrategy.java | 12 +++++++++++- .../compact/strategy/DayBasedCompactionStrategy.java | 8 ++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index ae74a58f3..549034e3b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -181,6 +181,9 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); + // filter the partition paths if needed to reduce list status + partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths); + TableFileSystemView.RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); List operations = diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java index d7444818c..3efc532a4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java @@ -105,9 +105,19 @@ public abstract class CompactionStrategy implements Serializable { * @param pendingCompactionPlans Pending Compaction Plans for strategy to schedule next compaction plan * @return list of compactions to perform in this run */ - protected List orderAndFilter(HoodieWriteConfig writeConfig, + public List orderAndFilter(HoodieWriteConfig writeConfig, List operations, List pendingCompactionPlans) { return operations; } + + /** + * Filter the partition paths based on compaction strategy + * @param writeConfig + * @param allPartitionPaths + * @return + */ + public List filterPartitionPaths(HoodieWriteConfig writeConfig, List allPartitionPaths) { + return allPartitionPaths; + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java index 3360feb78..ee4d9a5ac 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java @@ -70,4 +70,12 @@ public class DayBasedCompactionStrategy extends CompactionStrategy { .collect(Collectors.toList()); return filteredList; } + + @Override + public List filterPartitionPaths(HoodieWriteConfig writeConfig, List allPartitionPaths) { + List filteredPartitionPaths = allPartitionPaths.stream().map(partition -> partition.replace("/", "-")) + .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/")) + .collect(Collectors.toList()).subList(0, writeConfig.getTargetPartitionsPerDayBasedCompaction()); + return filteredPartitionPaths; + } } \ No newline at end of file