1
0

[HUDI-25] Optimize HoodieInputformat.listStatus() for faster Hive incremental queries on Hoodie

Summary:
    - InputPathHandler class classifies  inputPaths into incremental, non incremental and non hoodie paths.
    - Incremental queries leverage HoodieCommitMetadata to get partitions that are affected and only lists those partitions as opposed to listing all partitions
    - listStatus() processes each category separately
This commit is contained in:
Bhavani Sudha Saktheeswaran
2019-07-17 11:51:49 -07:00
parent 480fc7869d
commit d09eacdc13
6 changed files with 602 additions and 119 deletions

View File

@@ -50,18 +50,10 @@
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
@@ -91,6 +83,18 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@@ -18,6 +18,12 @@
package org.apache.hudi.hadoop;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.log4j.LogManager;
@@ -36,6 +42,7 @@ public class HoodieHiveUtil {
public static final int DEFAULT_MAX_COMMITS = 1;
public static final int MAX_COMMIT_ALL = -1;
public static final int DEFAULT_LEVELS_TO_BASEPATH = 3;
public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");
public static Integer readMaxCommits(JobContext job, String tableName) {
String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName);
@@ -67,4 +74,22 @@ public class HoodieHiveUtil {
}
return parent;
}
public static List<String> getIncrementalTableNames(JobContext job) {
Map<String, String> tablesModeMap = job.getConfiguration()
.getValByRegex(HOODIE_CONSUME_MODE_PATTERN_STRING.pattern());
List<String> result = tablesModeMap.entrySet().stream().map(s -> {
if (s.getValue().trim().equals(INCREMENTAL_SCAN_MODE)) {
Matcher matcher = HOODIE_CONSUME_MODE_PATTERN_STRING.matcher(s.getKey());
return (!matcher.find() ? null : matcher.group(1));
}
return null;
}).filter(s -> s != null)
.collect(Collectors.toList());
if (result == null) {
// Returns an empty list instead of null.
result = new ArrayList<>();
}
return result;
}
}

View File

@@ -18,17 +18,15 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView.ReadOptimizedView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidTableException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -42,16 +40,18 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* HoodieInputFormat which understands the Hoodie File Structure and filters files based on the Hoodie Mode. If paths
* that does not correspond to a hoodie table then they are passed in as is (as what FileInputFormat.listStatus()
@@ -66,61 +66,191 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Get all the file status from FileInputFormat and then do the filter
FileStatus[] fileStatuses = super.listStatus(job);
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus = groupFileStatus(fileStatuses);
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
List<String> incrementalTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(job));
InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
List<FileStatus> returns = new ArrayList<>();
for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
HoodieTableMetaClient metadata = entry.getKey();
if (metadata == null) {
// Add all the paths which are not hoodie specific
returns.addAll(entry.getValue());
Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
// process incremental pulls first
for (String table : incrementalTables) {
HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
if (metaClient == null) {
/* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
* in the jobConf
*/
continue;
}
FileStatus[] statuses = entry.getValue().toArray(new FileStatus[entry.getValue().size()]);
if (LOG.isDebugEnabled()) {
LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
List<Path> inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths);
if (result != null) {
returns.addAll(result);
}
String tableName = metadata.getTableConfig().getTableName();
String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
// Get all commits, delta commits, compactions, as all of them produce a base parquet file
// today
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
ReadOptimizedView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
}
if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
// this is of the form commitTs_partition_sequenceNumber
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
List<String> commitsToReturn = timeline.findInstantsAfter(lastIncrementalTs, maxCommits).getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
List<HoodieDataFile> filteredFiles =
roView.getLatestDataFilesInRange(commitsToReturn).collect(Collectors.toList());
for (HoodieDataFile filteredFile : filteredFiles) {
LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
filteredFile = checkFileStatus(filteredFile);
returns.add(filteredFile.getFileStatus());
}
LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
} else {
// filter files on the latest commit found
List<HoodieDataFile> filteredFiles = roView.getLatestDataFiles().collect(Collectors.toList());
LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
for (HoodieDataFile filteredFile : filteredFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
}
filteredFile = checkFileStatus(filteredFile);
returns.add(filteredFile.getFileStatus());
// process non hoodie Paths next.
List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
if (nonHoodiePaths.size() > 0) {
setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
FileStatus[] fileStatuses = super.listStatus(job);
for (FileStatus fileStatus: fileStatuses) {
returns.add(fileStatus);
}
}
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
FileStatus[] fileStatuses = super.listStatus(job);
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
List<FileStatus> result = filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue());
if (result != null) {
returns.addAll(result);
}
}
}
return returns.toArray(new FileStatus[returns.size()]);
}
/**
* Achieves listStatus functionality for an incrementally queried table. Instead of listing all
* partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions.
*/
private List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
String tableName = tableMetaClient.getTableConfig().getTableName();
HoodieTimeline timeline = tableMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
.getInstants().collect(Collectors.toList());
// Extract partitions touched by the commitsToCheck
Set<String> partitionsToList = new HashSet<>();
for (int i = 0; i < commitsToCheck.size(); i++) {
HoodieInstant commit = commitsToCheck.get(i);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
}
if (partitionsToList.isEmpty()) {
return null;
}
String incrementalInputPaths = partitionsToList.stream()
.map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
.filter(s -> {
/*
* Ensure to return only results from the original input path that has incremental changes
* This check is needed for the following corner case - When the caller invokes
* HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
* time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
* accidentally return all incremental changes for the entire table in every listStatus()
* call. This will create redundant splits. Instead we only want to return the incremental
* changes (if so any) in that batch of input paths.
*
* NOTE on Hive queries that are executed using Fetch task:
* Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
* listed in every such listStatus() call. In order to avoid this, it might be useful to
* disable fetch tasks using the hive session property for incremental queries:
* `set hive.fetch.task.conversion=none;`
* This would ensure Map Reduce execution is chosen for a Hive query, which combines
* partitions (comma separated) and calls InputFormat.listStatus() only once with all
* those partitions.
*/
for (Path path : inputPaths) {
if (path.toString().contains(s)) {
return true;
}
}
return false;
})
.collect(Collectors.joining(","));
if (incrementalInputPaths == null || incrementalInputPaths.isEmpty()) {
return null;
}
// Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
setInputPaths(job, incrementalInputPaths);
FileStatus[] fileStatuses = super.listStatus(job);
TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(tableMetaClient, timeline,
fileStatuses);
List<String> commitsList = commitsToCheck.stream().map(s -> s.getTimestamp()).collect(Collectors.toList());
List<HoodieDataFile> filteredFiles = roView.getLatestDataFilesInRange(commitsList).collect(Collectors.toList());
List<FileStatus> returns = new ArrayList<>();
for (HoodieDataFile filteredFile : filteredFiles) {
LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath());
filteredFile = checkFileStatus(filteredFile);
returns.add(filteredFile.getFileStatus());
}
LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
return returns;
}
/**
* Takes in a list of filesStatus and a list of table metadatas. Groups the files status list
* based on given table metadata.
* @param fileStatuses
* @param metaClientList
* @return
* @throws IOException
*/
private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) throws IOException {
// This assumes the paths for different tables are grouped together
Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
HoodieTableMetaClient metadata = null;
for (FileStatus status : fileStatuses) {
Path inputPath = status.getPath();
if (!inputPath.getName().endsWith(".parquet")) {
//FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
// with "."
continue;
}
if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath()))) {
for (HoodieTableMetaClient metaClient : metaClientList) {
if (inputPath.toString().contains(metaClient.getBasePath())) {
metadata = metaClient;
if (!grouped.containsKey(metadata)) {
grouped.put(metadata, new ArrayList<>());
}
break;
}
}
}
grouped.get(metadata).add(status);
}
return grouped;
}
/**
* Filters data files for a snapshot queried table.
*/
private List<FileStatus> filterFileStatusForSnapshotMode(
HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) throws IOException {
FileStatus[] statuses = fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
if (LOG.isDebugEnabled()) {
LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
}
// Get all commits, delta commits, compactions, as all of them produce a base parquet file today
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
// filter files on the latest commit found
List<HoodieDataFile> filteredFiles = roView.getLatestDataFiles().collect(Collectors.toList());
LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
List<FileStatus> returns = new ArrayList<>();
for (HoodieDataFile filteredFile : filteredFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
}
filteredFile = checkFileStatus(filteredFile);
returns.add(filteredFile.getFileStatus());
}
return returns;
}
/**
@@ -142,38 +272,6 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
}
}
private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatus(FileStatus[] fileStatuses) throws IOException {
// This assumes the paths for different tables are grouped together
Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
HoodieTableMetaClient metadata = null;
String nonHoodieBasePath = null;
for (FileStatus status : fileStatuses) {
if (!status.getPath().getName().endsWith(".parquet")) {
// FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
// with "."
continue;
}
if ((metadata == null && nonHoodieBasePath == null)
|| (metadata == null && !status.getPath().toString().contains(nonHoodieBasePath))
|| (metadata != null && !status.getPath().toString().contains(metadata.getBasePath()))) {
try {
metadata = getTableMetaClient(status.getPath().getFileSystem(conf), status.getPath().getParent());
nonHoodieBasePath = null;
} catch (TableNotFoundException | InvalidTableException e) {
LOG.info("Handling a non-hoodie path " + status.getPath());
metadata = null;
nonHoodieBasePath = status.getPath().getParent().toString();
}
if (!grouped.containsKey(metadata)) {
grouped.put(metadata, new ArrayList<>());
}
}
grouped.get(metadata).add(status);
}
return grouped;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}

View File

@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import static org.apache.hudi.hadoop.HoodieParquetInputFormat.getTableMetaClient;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* InputPathHandler takes in a set of input paths and incremental tables list. Then, classifies the
* input paths to incremental, snapshot paths and non-hoodie paths. This is then accessed later to
* mutate the JobConf before processing incremental mode queries and snapshot queries.
*/
public class InputPathHandler {
public static final Logger LOG = LogManager.getLogger(InputPathHandler.class);
private final Configuration conf;
// tablename to metadata mapping for all Hoodie tables(both incremental & snapshot)
private final Map<String, HoodieTableMetaClient> tableMetaClientMap;
private final Map<HoodieTableMetaClient, List<Path>> groupedIncrementalPaths;
private final List<Path> snapshotPaths;
private final List<Path> nonHoodieInputPaths;
InputPathHandler(Configuration conf, Path[] inputPaths, List<String> incrementalTables) throws IOException {
this.conf = conf;
tableMetaClientMap = new HashMap<>();
snapshotPaths = new ArrayList<>();
nonHoodieInputPaths = new ArrayList<>();
groupedIncrementalPaths = new HashMap<>();
parseInputPaths(inputPaths, incrementalTables);
}
/**
* Takes in the original InputPaths and classifies each of them into incremental, snapshot and
* non-hoodie InputPaths. The logic is as follows:
*
* 1. Check if an inputPath starts with the same basepath as any of the metadata basepaths we know
* 1a. If yes, this belongs to a Hoodie table that we already know about. Simply classify this
* as incremental or snapshot - We can get the table name of this inputPath from the
* metadata. Then based on the list of incrementalTables, we can classify this inputPath.
* 1b. If no, this could be a new Hoodie Table we haven't seen yet or a non-Hoodie Input Path.
* Try creating the HoodieTableMetadataClient.
* - If it succeeds, further classify as incremental on snapshot as described in step
* 1a above.
* - If DatasetNotFoundException/InvalidDatasetException is caught, this is a
* non-Hoodie inputPath
* @param inputPaths - InputPaths from the original jobConf that was passed to HoodieInputFormat
* @param incrementalTables - List of all incremental tables extracted from the config
* `hoodie.&lt;table-name&gt;.consume.mode=INCREMENTAL`
* @throws IOException
*/
private void parseInputPaths(Path[] inputPaths, List<String> incrementalTables)
throws IOException {
for (Path inputPath : inputPaths) {
boolean basePathKnown = false;
for (HoodieTableMetaClient metaClient : tableMetaClientMap.values()) {
if (inputPath.toString().contains(metaClient.getBasePath())) {
// We already know the base path for this inputPath.
basePathKnown = true;
// Check if this is for a snapshot query
String tableName = metaClient.getTableConfig().getTableName();
tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
break;
}
}
if (!basePathKnown) {
// This path is for a table that we dont know about yet.
HoodieTableMetaClient metaClient;
try {
metaClient = getTableMetaClient(inputPath.getFileSystem(conf), inputPath);
String tableName = metaClient.getTableConfig().getTableName();
tableMetaClientMap.put(tableName, metaClient);
tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
} catch (TableNotFoundException | InvalidTableException e) {
// This is a non Hoodie inputPath
LOG.info("Handling a non-hoodie path " + inputPath);
nonHoodieInputPaths.add(inputPath);
}
}
}
}
private void tagAsIncrementalOrSnapshot(Path inputPath, String tableName,
HoodieTableMetaClient metaClient, List<String> incrementalTables) {
if (!incrementalTables.contains(tableName)) {
snapshotPaths.add(inputPath);
} else {
// Group incremental Paths belonging to same table.
if (!groupedIncrementalPaths.containsKey(metaClient)) {
groupedIncrementalPaths.put(metaClient, new ArrayList<>());
}
groupedIncrementalPaths.get(metaClient).add(inputPath);
}
}
public Map<HoodieTableMetaClient, List<Path>> getGroupedIncrementalPaths() {
return groupedIncrementalPaths;
}
public Map<String, HoodieTableMetaClient> getTableMetaClientMap() {
return tableMetaClientMap;
}
public List<Path> getSnapshotPaths() {
return snapshotPaths;
}
public List<Path> getNonHoodieInputPaths() {
return nonHoodieInputPaths;
}
}

View File

@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class InputPathHandlerTest {
// Incremental Table
public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
public static final String MODEL_TRIPS_TEST_NAME = "model_trips";
// snapshot Table
public static final String ETL_TRIPS_TEST_NAME = "etl_trips";
// non Hoodie table
public static final String TRIPS_STATS_TEST_NAME = "trips_stats";
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private static HdfsTestService hdfsTestService;
private static InputPathHandler inputPathHandler;
private static String basePathTable1 = null;
private static String basePathTable2 = null;
private static String basePathTable3 = null;
private static String basePathTable4 = null; // non hoodie Path
private static List<String> incrementalTables;
private static List<Path> incrementalPaths;
private static List<Path> snapshotPaths;
private static List<Path> nonHoodiePaths;
private static List<Path> inputPaths;
@BeforeClass
public static void setUpDFS() throws IOException {
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
if (hdfsTestService == null) {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
}
inputPaths = new ArrayList<>();
incrementalPaths = new ArrayList<>();
snapshotPaths = new ArrayList<>();
nonHoodiePaths = new ArrayList<>();
initTables();
}
@AfterClass
public static void cleanUp() throws Exception {
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
dfsCluster = null;
dfs = null;
hdfsTestService = null;
}
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
}
static void initTables() throws IOException {
// Create a temp folder as the base path
TemporaryFolder parentFolder = new TemporaryFolder();
parentFolder.create();
basePathTable1 = parentFolder.newFolder(RAW_TRIPS_TEST_NAME).getAbsolutePath();
basePathTable2 = parentFolder.newFolder(MODEL_TRIPS_TEST_NAME).getAbsolutePath();
basePathTable3 = parentFolder.newFolder(ETL_TRIPS_TEST_NAME).getAbsolutePath();
basePathTable4 = parentFolder.newFolder(TRIPS_STATS_TEST_NAME).getAbsolutePath();
dfs.mkdirs(new Path(basePathTable1));
initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ);
incrementalPaths.addAll(generatePartitions(dfs, basePathTable1));
dfs.mkdirs(new Path(basePathTable2));
initTableType(dfs.getConf(), basePathTable2, MODEL_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ);
incrementalPaths.addAll(generatePartitions(dfs, basePathTable2));
dfs.mkdirs(new Path(basePathTable3));
initTableType(dfs.getConf(), basePathTable3, ETL_TRIPS_TEST_NAME, HoodieTableType.COPY_ON_WRITE);
snapshotPaths.addAll(generatePartitions(dfs, basePathTable3));
dfs.mkdirs(new Path(basePathTable4));
nonHoodiePaths.addAll(generatePartitions(dfs, basePathTable4));
inputPaths.addAll(incrementalPaths);
inputPaths.addAll(snapshotPaths);
inputPaths.addAll(nonHoodiePaths);
incrementalTables = new ArrayList<>();
incrementalTables.add(RAW_TRIPS_TEST_NAME);
incrementalTables.add(MODEL_TRIPS_TEST_NAME);
}
static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
String tableName, HoodieTableType tableType) throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
}
static List<Path> generatePartitions(DistributedFileSystem dfs, String basePath)
throws IOException {
List<Path> paths = new ArrayList<>();
paths.add(new Path(basePath + Path.SEPARATOR + "2019/05/21"));
paths.add(new Path(basePath + Path.SEPARATOR + "2019/05/22"));
paths.add(new Path(basePath + Path.SEPARATOR + "2019/05/23"));
paths.add(new Path(basePath + Path.SEPARATOR + "2019/05/24"));
paths.add(new Path(basePath + Path.SEPARATOR + "2019/05/25"));
for (Path path: paths) {
dfs.mkdirs(path);
}
return paths;
}
@Test
public void testInputPathHandler() throws IOException {
inputPathHandler = new InputPathHandler(dfs.getConf(), inputPaths.toArray(
new Path[inputPaths.size()]), incrementalTables);
List<Path> actualPaths = inputPathHandler.getGroupedIncrementalPaths().values().stream()
.flatMap(List::stream).collect(Collectors.toList());
assertTrue(actualComparesToExpected(actualPaths, incrementalPaths));
actualPaths = inputPathHandler.getSnapshotPaths();
assertTrue(actualComparesToExpected(actualPaths, snapshotPaths));
actualPaths = inputPathHandler.getNonHoodieInputPaths();
assertTrue(actualComparesToExpected(actualPaths, nonHoodiePaths));
}
private boolean actualComparesToExpected(List<Path> actualPaths, List<Path> expectedPaths) {
if (actualPaths.size() != expectedPaths.size()) {
return false;
}
for (Path path: actualPaths) {
if (!expectedPaths.contains(path)) {
return false;
}
}
return true;
}
}

View File

@@ -18,8 +18,14 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.common.util.FSUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.ArrayWritable;
@@ -28,17 +34,17 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.FSUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class TestHoodieInputFormat {
public class TestHoodieParquetInputFormat {
private HoodieParquetInputFormat inputFormat;
private JobConf jobConf;
@@ -100,7 +106,7 @@ public class TestHoodieInputFormat {
public void testIncrementalSimple() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
InputFormatTestUtil.commit(basePath, "100");
createCommitFile(basePath, "100", "2016/05/01");
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
@@ -112,28 +118,42 @@ public class TestHoodieInputFormat {
files.length);
}
private void createCommitFile(TemporaryFolder basePath, String commitNumber, String partitionPath)
throws IOException {
List<HoodieWriteStat> writeStats = HoodieTestUtils.generateFakeHoodieWriteStat(1);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
writeStats.stream().forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat));
File file = new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit");
file.createNewFile();
FileOutputStream fileOutputStream = new FileOutputStream(file);
fileOutputStream.write(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
fileOutputStream.flush();
fileOutputStream.close();
}
@Test
public void testIncrementalWithMultipleCommits() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
InputFormatTestUtil.commit(basePath, "100");
createCommitFile(basePath, "100", "2016/05/01");
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
// update files
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", false);
InputFormatTestUtil.commit(basePath, "200");
createCommitFile(basePath, "200", "2016/05/01");
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 4, "300", false);
InputFormatTestUtil.commit(basePath, "300");
createCommitFile(basePath, "300", "2016/05/01");
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 3, "400", false);
InputFormatTestUtil.commit(basePath, "400");
createCommitFile(basePath, "400", "2016/05/01");
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 2, "500", false);
InputFormatTestUtil.commit(basePath, "500");
createCommitFile(basePath, "500", "2016/05/01");
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 1, "600", false);
InputFormatTestUtil.commit(basePath, "600");
createCommitFile(basePath, "600", "2016/05/01");
InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
FileStatus[] files = inputFormat.listStatus(jobConf);
@@ -190,8 +210,24 @@ public class TestHoodieInputFormat {
2, 10);
}
private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit, int totalExpected)
throws IOException {
@Test
public void testGetIncrementalTableNames() throws IOException {
String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips"};
JobConf conf = new JobConf();
String incrementalMode1 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
conf.set(incrementalMode1, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
String incrementalMode2 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
conf.set(incrementalMode2,HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
String defaultmode = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
conf.set(defaultmode, HoodieHiveUtil.DEFAULT_SCAN_MODE);
List<String> actualincrTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(conf));
for (int i = 0; i < expectedincrTables.length; i++) {
assertTrue(actualincrTables.contains(expectedincrTables[i]));
}
}
private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit,
int totalExpected) throws IOException {
int actualCount = 0;
int totalCount = 0;
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);