[HUDI-1967] Fix the NPE for MOR Hive rt table query (#3032)
The HoodieInputFormatUtils.getTableMetaClientByBasePath returns the map with table base path as keys while the HoodieRealtimeInputFormatUtils query it with the partition path.
This commit is contained in:
@@ -283,24 +283,17 @@ public class HoodieInputFormatUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract HoodieTableMetaClient by base path.
|
||||
* @param conf
|
||||
* @param partitions
|
||||
* @return
|
||||
* Extract HoodieTableMetaClient by partition path.
|
||||
* @param conf The hadoop conf
|
||||
* @param partitions The partitions
|
||||
* @return partition path to table meta client mapping
|
||||
*/
|
||||
public static Map<Path, HoodieTableMetaClient> getTableMetaClientByBasePath(Configuration conf, Set<Path> partitions) {
|
||||
Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
|
||||
public static Map<Path, HoodieTableMetaClient> getTableMetaClientByPartitionPath(Configuration conf, Set<Path> partitions) {
|
||||
Map<Path, HoodieTableMetaClient> metaClientMap = new HashMap<>();
|
||||
return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> {
|
||||
// Get meta client if this path is the base path.
|
||||
Option<String> matchingBasePath = Option.fromJavaOptional(
|
||||
metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst());
|
||||
if (matchingBasePath.isPresent()) {
|
||||
return metaClientMap.get(matchingBasePath.get());
|
||||
}
|
||||
|
||||
try {
|
||||
HoodieTableMetaClient metaClient = getTableMetaClientForBasePath(p.getFileSystem(conf), p);
|
||||
metaClientMap.put(metaClient.getBasePath(), metaClient);
|
||||
metaClientMap.put(p, metaClient);
|
||||
return metaClient;
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
|
||||
|
||||
@@ -65,7 +65,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
|
||||
Map<Path, List<FileSplit>> partitionsToParquetSplits =
|
||||
fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
|
||||
// TODO(vc): Should we handle also non-hoodie splits here?
|
||||
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
|
||||
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet());
|
||||
|
||||
// Create file system cache so metadata table is only instantiated once. Also can benefit normal file listing if
|
||||
// partition path is listed twice so file groups will already be loaded in file system
|
||||
@@ -141,7 +141,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
|
||||
Map<Path, List<HoodieBaseFile>> partitionsToParquetSplits =
|
||||
fileStatuses.stream().collect(Collectors.groupingBy(file -> file.getFileStatus().getPath().getParent()));
|
||||
// TODO(vc): Should we handle also non-hoodie splits here?
|
||||
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
|
||||
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet());
|
||||
|
||||
// for all unique split parents, obtain all delta files based on delta commit timeline,
|
||||
// grouped on file id
|
||||
|
||||
Reference in New Issue
Block a user