[HUDI-1312] [RFC-15] Support for metadata listing for snapshot queries through Hive/SparkSQL (#2366)
Co-authored-by: Ryan Pifer <ryanpife@amazon.com>
This commit is contained in:
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
@@ -97,13 +96,10 @@ public class HoodieHFileInputFormat extends FileInputFormat<NullWritable, ArrayW
|
||||
// process snapshot queries next.
|
||||
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
|
||||
if (snapshotPaths.size() > 0) {
|
||||
setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
|
||||
FileStatus[] fileStatuses = super.listStatus(job);
|
||||
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
|
||||
HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, HoodieFileFormat.HFILE.getFileExtension(),
|
||||
tableMetaClientMap.values());
|
||||
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
|
||||
for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
|
||||
Map<HoodieTableMetaClient, List<Path>> groupedPaths =
|
||||
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
|
||||
LOG.info("Found a total of " + groupedPaths.size() + " groups");
|
||||
for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
|
||||
List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
|
||||
if (result != null) {
|
||||
returns.addAll(result);
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hudi.hadoop;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
|
||||
@@ -109,13 +108,11 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
|
||||
// process snapshot queries next.
|
||||
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
|
||||
if (snapshotPaths.size() > 0) {
|
||||
setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
|
||||
FileStatus[] fileStatuses = super.listStatus(job);
|
||||
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
|
||||
HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses,
|
||||
HoodieFileFormat.PARQUET.getFileExtension(), tableMetaClientMap.values());
|
||||
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
|
||||
for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
|
||||
Map<HoodieTableMetaClient, List<Path>> groupedPaths =
|
||||
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
|
||||
LOG.info("Found a total of " + groupedPaths.size() + " groups");
|
||||
for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
|
||||
HoodieTableMetaClient metaClient = entry.getKey();
|
||||
List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
|
||||
if (result != null) {
|
||||
returns.addAll(result);
|
||||
|
||||
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewManager;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
@@ -62,6 +63,11 @@ import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
|
||||
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
|
||||
import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
|
||||
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
|
||||
|
||||
public class HoodieInputFormatUtils {
|
||||
|
||||
// These positions have to be deterministic across all tables
|
||||
@@ -391,27 +397,48 @@ public class HoodieInputFormatUtils {
|
||||
return grouped;
|
||||
}
|
||||
|
||||
public static Map<HoodieTableMetaClient, List<Path>> groupSnapshotPathsByMetaClient(
|
||||
Collection<HoodieTableMetaClient> metaClientList,
|
||||
List<Path> snapshotPaths
|
||||
) {
|
||||
Map<HoodieTableMetaClient, List<Path>> grouped = new HashMap<>();
|
||||
metaClientList.forEach(metaClient -> grouped.put(metaClient, new ArrayList<>()));
|
||||
for (Path path : snapshotPaths) {
|
||||
// Find meta client associated with the input path
|
||||
metaClientList.stream().filter(metaClient -> path.toString().contains(metaClient.getBasePath()))
|
||||
.forEach(metaClient -> grouped.get(metaClient).add(path));
|
||||
}
|
||||
return grouped;
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters data files for a snapshot queried table.
|
||||
* Filters data files under @param paths for a snapshot queried table.
|
||||
* @param job
|
||||
* @param metadata
|
||||
* @param fileStatuses
|
||||
* @param metaClient
|
||||
* @param paths
|
||||
* @return
|
||||
*/
|
||||
public static List<FileStatus> filterFileStatusForSnapshotMode(
|
||||
JobConf job, HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) throws IOException {
|
||||
FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
|
||||
JobConf job, HoodieTableMetaClient metaClient, List<Path> paths) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
|
||||
LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metaClient);
|
||||
}
|
||||
// Get all commits, delta commits, compactions, as all of them produce a base parquet file today
|
||||
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
|
||||
// filter files on the latest commit found
|
||||
List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
|
||||
LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
|
||||
|
||||
boolean useFileListingFromMetadata = job.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
|
||||
boolean verifyFileListing = job.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
|
||||
HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient,
|
||||
useFileListingFromMetadata, verifyFileListing);
|
||||
|
||||
List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
|
||||
for (Path p : paths) {
|
||||
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p);
|
||||
List<HoodieBaseFile> matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
|
||||
filteredBaseFiles.addAll(matched);
|
||||
}
|
||||
|
||||
LOG.info("Total paths to process after hoodie filter " + filteredBaseFiles.size());
|
||||
List<FileStatus> returns = new ArrayList<>();
|
||||
for (HoodieBaseFile filteredFile : filteredFiles) {
|
||||
for (HoodieBaseFile filteredFile : filteredBaseFiles) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewManager;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
@@ -53,6 +54,11 @@ import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
|
||||
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
|
||||
import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
|
||||
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
|
||||
|
||||
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
|
||||
@@ -63,13 +69,25 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
|
||||
// TODO(vc): Should we handle also non-hoodie splits here?
|
||||
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
|
||||
|
||||
boolean useFileListingFromMetadata = conf.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
|
||||
boolean verifyFileListing = conf.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
|
||||
// Create file system cache so metadata table is only instantiated once. Also can benefit normal file listing if
|
||||
// partition path is listed twice so file groups will already be loaded in file system
|
||||
Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsCache = new HashMap<>();
|
||||
// for all unique split parents, obtain all delta files based on delta commit timeline,
|
||||
// grouped on file id
|
||||
List<InputSplit> rtSplits = new ArrayList<>();
|
||||
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
|
||||
// for each partition path obtain the data & log file groupings, then map back to inputsplits
|
||||
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
|
||||
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
|
||||
if (!fsCache.containsKey(metaClient)) {
|
||||
|
||||
HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient,
|
||||
useFileListingFromMetadata, verifyFileListing);
|
||||
fsCache.put(metaClient, fsView);
|
||||
}
|
||||
HoodieTableFileSystemView fsView = fsCache.get(metaClient);
|
||||
|
||||
String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
|
||||
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user