diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 7467628e0..e49d012aa 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -283,24 +283,17 @@ public class HoodieInputFormatUtils { } /** - * Extract HoodieTableMetaClient by base path. - * @param conf - * @param partitions - * @return + * Extract HoodieTableMetaClient by partition path. + * @param conf The hadoop conf + * @param partitions The partitions + * @return partition path to table meta client mapping */ - public static Map getTableMetaClientByBasePath(Configuration conf, Set partitions) { - Map metaClientMap = new HashMap<>(); + public static Map getTableMetaClientByPartitionPath(Configuration conf, Set partitions) { + Map metaClientMap = new HashMap<>(); return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> { - // Get meta client if this path is the base path. - Option matchingBasePath = Option.fromJavaOptional( - metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst()); - if (matchingBasePath.isPresent()) { - return metaClientMap.get(matchingBasePath.get()); - } - try { HoodieTableMetaClient metaClient = getTableMetaClientForBasePath(p.getFileSystem(conf), p); - metaClientMap.put(metaClient.getBasePath(), metaClient); + metaClientMap.put(p, metaClient); return metaClient; } catch (IOException e) { throw new HoodieIOException("Error creating hoodie meta client against : " + p, e); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index db8de64fe..a7fbf66f9 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -65,7 +65,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { Map> partitionsToParquetSplits = fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent())); // TODO(vc): Should we handle also non-hoodie splits here? - Map partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet()); + Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet()); // Create file system cache so metadata table is only instantiated once. Also can benefit normal file listing if // partition path is listed twice so file groups will already be loaded in file system @@ -141,7 +141,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { Map> partitionsToParquetSplits = fileStatuses.stream().collect(Collectors.groupingBy(file -> file.getFileStatus().getPath().getParent())); // TODO(vc): Should we handle also non-hoodie splits here? - Map partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet()); + Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet()); // for all unique split parents, obtain all delta files based on delta commit timeline, // grouped on file id