diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml
index 525aa0aae..0682d4ce9 100644
--- a/hudi-hadoop-mr/pom.xml
+++ b/hudi-hadoop-mr/pom.xml
@@ -50,18 +50,10 @@
-
- org.apache.hadoop
- hadoop-common
-
org.apache.hadoop
hadoop-auth
-
- org.apache.hadoop
- hadoop-hdfs
-
org.apache.hadoop
hadoop-mapreduce-client-core
@@ -91,6 +83,18 @@
test-jar
test
+
+ org.apache.hadoop
+ hadoop-common
+ tests
+ test
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ tests
+ test
+
junit
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
index 1db8c541d..17fbf4eec 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
@@ -18,6 +18,12 @@
package org.apache.hudi.hadoop;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.log4j.LogManager;
@@ -36,6 +42,7 @@ public class HoodieHiveUtil {
public static final int DEFAULT_MAX_COMMITS = 1;
public static final int MAX_COMMIT_ALL = -1;
public static final int DEFAULT_LEVELS_TO_BASEPATH = 3;
+ public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");
public static Integer readMaxCommits(JobContext job, String tableName) {
String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName);
@@ -67,4 +74,22 @@ public class HoodieHiveUtil {
}
return parent;
}
+
+ public static List getIncrementalTableNames(JobContext job) {
+ Map tablesModeMap = job.getConfiguration()
+ .getValByRegex(HOODIE_CONSUME_MODE_PATTERN_STRING.pattern());
+ List result = tablesModeMap.entrySet().stream().map(s -> {
+ if (s.getValue().trim().equals(INCREMENTAL_SCAN_MODE)) {
+ Matcher matcher = HOODIE_CONSUME_MODE_PATTERN_STRING.matcher(s.getKey());
+ return (!matcher.find() ? null : matcher.group(1));
+ }
+ return null;
+ }).filter(s -> s != null)
+ .collect(Collectors.toList());
+ if (result == null) {
+ // Returns an empty list instead of null.
+ result = new ArrayList<>();
+ }
+ return result;
+ }
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index a9750054a..3d91f5da1 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -18,17 +18,15 @@
package org.apache.hudi.hadoop;
-import org.apache.hudi.common.model.HoodieDataFile;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTimeline;
-import org.apache.hudi.common.table.TableFileSystemView.ReadOptimizedView;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.exception.TableNotFoundException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.InvalidTableException;
-
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -42,16 +40,18 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTimeline;
+import org.apache.hudi.common.table.TableFileSystemView;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
/**
* HoodieInputFormat which understands the Hoodie File Structure and filters files based on the Hoodie Mode. If paths
* that does not correspond to a hoodie table then they are passed in as is (as what FileInputFormat.listStatus()
@@ -66,61 +66,191 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
- // Get all the file status from FileInputFormat and then do the filter
- FileStatus[] fileStatuses = super.listStatus(job);
- Map> groupedFileStatus = groupFileStatus(fileStatuses);
- LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
+ // Segregate inputPaths[] to incremental, snapshot and non hoodie paths
+ List incrementalTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(job));
+ InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
List returns = new ArrayList<>();
- for (Map.Entry> entry : groupedFileStatus.entrySet()) {
- HoodieTableMetaClient metadata = entry.getKey();
- if (metadata == null) {
- // Add all the paths which are not hoodie specific
- returns.addAll(entry.getValue());
+
+ Map tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
+ // process incremental pulls first
+ for (String table : incrementalTables) {
+ HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+ if (metaClient == null) {
+ /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
+ * in the jobConf
+ */
continue;
}
-
- FileStatus[] statuses = entry.getValue().toArray(new FileStatus[entry.getValue().size()]);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
+ List inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
+ List result = listStatusForIncrementalMode(job, metaClient, inputPaths);
+ if (result != null) {
+ returns.addAll(result);
}
- String tableName = metadata.getTableConfig().getTableName();
- String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
- // Get all commits, delta commits, compactions, as all of them produce a base parquet file
- // today
- HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
- ReadOptimizedView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
+ }
- if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
- // this is of the form commitTs_partition_sequenceNumber
- String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
- // Total number of commits to return in this batch. Set this to -1 to get all the commits.
- Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
- LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
- List commitsToReturn = timeline.findInstantsAfter(lastIncrementalTs, maxCommits).getInstants()
- .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
- List filteredFiles =
- roView.getLatestDataFilesInRange(commitsToReturn).collect(Collectors.toList());
- for (HoodieDataFile filteredFile : filteredFiles) {
- LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
- filteredFile = checkFileStatus(filteredFile);
- returns.add(filteredFile.getFileStatus());
- }
- LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
- } else {
- // filter files on the latest commit found
- List filteredFiles = roView.getLatestDataFiles().collect(Collectors.toList());
- LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
- for (HoodieDataFile filteredFile : filteredFiles) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
- }
- filteredFile = checkFileStatus(filteredFile);
- returns.add(filteredFile.getFileStatus());
+ // process non hoodie Paths next.
+ List nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
+ if (nonHoodiePaths.size() > 0) {
+ setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
+ FileStatus[] fileStatuses = super.listStatus(job);
+ for (FileStatus fileStatus: fileStatuses) {
+ returns.add(fileStatus);
+ }
+ }
+
+ // process snapshot queries next.
+ List snapshotPaths = inputPathHandler.getSnapshotPaths();
+ if (snapshotPaths.size() > 0) {
+ setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
+ FileStatus[] fileStatuses = super.listStatus(job);
+ Map> groupedFileStatus =
+ groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
+ LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
+ for (Map.Entry> entry : groupedFileStatus.entrySet()) {
+ List result = filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue());
+ if (result != null) {
+ returns.addAll(result);
}
}
}
return returns.toArray(new FileStatus[returns.size()]);
+ }
+ /**
+ * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
+ * partitions and then filtering based on the commits of interest, this logic first extracts the
+ * partitions touched by the desired commits and then lists only those partitions.
+ */
+ private List listStatusForIncrementalMode(
+ JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException {
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ HoodieTimeline timeline = tableMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
+ // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+ Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
+ LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+ List commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+ .getInstants().collect(Collectors.toList());
+ // Extract partitions touched by the commitsToCheck
+ Set partitionsToList = new HashSet<>();
+ for (int i = 0; i < commitsToCheck.size(); i++) {
+ HoodieInstant commit = commitsToCheck.get(i);
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+ HoodieCommitMetadata.class);
+ partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+ }
+ if (partitionsToList.isEmpty()) {
+ return null;
+ }
+ String incrementalInputPaths = partitionsToList.stream()
+ .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+ .filter(s -> {
+ /*
+ * Ensure to return only results from the original input path that has incremental changes
+ * This check is needed for the following corner case - When the caller invokes
+ * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
+ * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
+ * accidentally return all incremental changes for the entire table in every listStatus()
+ * call. This will create redundant splits. Instead we only want to return the incremental
+ * changes (if so any) in that batch of input paths.
+ *
+ * NOTE on Hive queries that are executed using Fetch task:
+ * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
+ * listed in every such listStatus() call. In order to avoid this, it might be useful to
+ * disable fetch tasks using the hive session property for incremental queries:
+ * `set hive.fetch.task.conversion=none;`
+ * This would ensure Map Reduce execution is chosen for a Hive query, which combines
+ * partitions (comma separated) and calls InputFormat.listStatus() only once with all
+ * those partitions.
+ */
+ for (Path path : inputPaths) {
+ if (path.toString().contains(s)) {
+ return true;
+ }
+ }
+ return false;
+ })
+ .collect(Collectors.joining(","));
+ if (incrementalInputPaths == null || incrementalInputPaths.isEmpty()) {
+ return null;
+ }
+ // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
+ setInputPaths(job, incrementalInputPaths);
+ FileStatus[] fileStatuses = super.listStatus(job);
+ TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(tableMetaClient, timeline,
+ fileStatuses);
+ List commitsList = commitsToCheck.stream().map(s -> s.getTimestamp()).collect(Collectors.toList());
+ List filteredFiles = roView.getLatestDataFilesInRange(commitsList).collect(Collectors.toList());
+ List returns = new ArrayList<>();
+ for (HoodieDataFile filteredFile : filteredFiles) {
+ LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath());
+ filteredFile = checkFileStatus(filteredFile);
+ returns.add(filteredFile.getFileStatus());
+ }
+ LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
+ return returns;
+ }
+
+ /**
+ * Takes in a list of filesStatus and a list of table metadatas. Groups the files status list
+ * based on given table metadata.
+ * @param fileStatuses
+ * @param metaClientList
+ * @return
+ * @throws IOException
+ */
+ private Map> groupFileStatusForSnapshotPaths(
+ FileStatus[] fileStatuses, Collection metaClientList) throws IOException {
+ // This assumes the paths for different tables are grouped together
+ Map> grouped = new HashMap<>();
+ HoodieTableMetaClient metadata = null;
+ for (FileStatus status : fileStatuses) {
+ Path inputPath = status.getPath();
+ if (!inputPath.getName().endsWith(".parquet")) {
+ //FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
+ // with "."
+ continue;
+ }
+ if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath()))) {
+ for (HoodieTableMetaClient metaClient : metaClientList) {
+ if (inputPath.toString().contains(metaClient.getBasePath())) {
+ metadata = metaClient;
+ if (!grouped.containsKey(metadata)) {
+ grouped.put(metadata, new ArrayList<>());
+ }
+ break;
+ }
+ }
+ }
+ grouped.get(metadata).add(status);
+ }
+ return grouped;
+ }
+
+ /**
+ * Filters data files for a snapshot queried table.
+ */
+ private List filterFileStatusForSnapshotMode(
+ HoodieTableMetaClient metadata, List fileStatuses) throws IOException {
+ FileStatus[] statuses = fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
+ }
+ // Get all commits, delta commits, compactions, as all of them produce a base parquet file today
+ HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
+ // filter files on the latest commit found
+ List filteredFiles = roView.getLatestDataFiles().collect(Collectors.toList());
+ LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
+ List returns = new ArrayList<>();
+ for (HoodieDataFile filteredFile : filteredFiles) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
+ }
+ filteredFile = checkFileStatus(filteredFile);
+ returns.add(filteredFile.getFileStatus());
+ }
+ return returns;
}
/**
@@ -142,38 +272,6 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
}
}
- private Map> groupFileStatus(FileStatus[] fileStatuses) throws IOException {
- // This assumes the paths for different tables are grouped together
- Map> grouped = new HashMap<>();
- HoodieTableMetaClient metadata = null;
- String nonHoodieBasePath = null;
- for (FileStatus status : fileStatuses) {
- if (!status.getPath().getName().endsWith(".parquet")) {
- // FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
- // with "."
- continue;
- }
- if ((metadata == null && nonHoodieBasePath == null)
- || (metadata == null && !status.getPath().toString().contains(nonHoodieBasePath))
- || (metadata != null && !status.getPath().toString().contains(metadata.getBasePath()))) {
- try {
- metadata = getTableMetaClient(status.getPath().getFileSystem(conf), status.getPath().getParent());
- nonHoodieBasePath = null;
- } catch (TableNotFoundException | InvalidTableException e) {
- LOG.info("Handling a non-hoodie path " + status.getPath());
- metadata = null;
- nonHoodieBasePath = status.getPath().getParent().toString();
- }
- if (!grouped.containsKey(metadata)) {
- grouped.put(metadata, new ArrayList<>());
- }
- }
- grouped.get(metadata).add(status);
- }
- return grouped;
- }
-
- @Override
public void setConf(Configuration conf) {
this.conf = conf;
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
new file mode 100644
index 000000000..ae3081d30
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import static org.apache.hudi.hadoop.HoodieParquetInputFormat.getTableMetaClient;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * InputPathHandler takes in a set of input paths and incremental tables list. Then, classifies the
+ * input paths to incremental, snapshot paths and non-hoodie paths. This is then accessed later to
+ * mutate the JobConf before processing incremental mode queries and snapshot queries.
+ */
+public class InputPathHandler {
+
+ public static final Logger LOG = LogManager.getLogger(InputPathHandler.class);
+
+ private final Configuration conf;
+ // tablename to metadata mapping for all Hoodie tables(both incremental & snapshot)
+ private final Map tableMetaClientMap;
+ private final Map> groupedIncrementalPaths;
+ private final List snapshotPaths;
+ private final List nonHoodieInputPaths;
+
+ InputPathHandler(Configuration conf, Path[] inputPaths, List incrementalTables) throws IOException {
+ this.conf = conf;
+ tableMetaClientMap = new HashMap<>();
+ snapshotPaths = new ArrayList<>();
+ nonHoodieInputPaths = new ArrayList<>();
+ groupedIncrementalPaths = new HashMap<>();
+ parseInputPaths(inputPaths, incrementalTables);
+ }
+
+ /**
+ * Takes in the original InputPaths and classifies each of them into incremental, snapshot and
+ * non-hoodie InputPaths. The logic is as follows:
+ *
+ * 1. Check if an inputPath starts with the same basepath as any of the metadata basepaths we know
+ * 1a. If yes, this belongs to a Hoodie table that we already know about. Simply classify this
+ * as incremental or snapshot - We can get the table name of this inputPath from the
+ * metadata. Then based on the list of incrementalTables, we can classify this inputPath.
+ * 1b. If no, this could be a new Hoodie Table we haven't seen yet or a non-Hoodie Input Path.
+ * Try creating the HoodieTableMetadataClient.
+ * - If it succeeds, further classify as incremental on snapshot as described in step
+ * 1a above.
+ * - If DatasetNotFoundException/InvalidDatasetException is caught, this is a
+ * non-Hoodie inputPath
+ * @param inputPaths - InputPaths from the original jobConf that was passed to HoodieInputFormat
+ * @param incrementalTables - List of all incremental tables extracted from the config
+ * `hoodie.<table-name>.consume.mode=INCREMENTAL`
+ * @throws IOException
+ */
+ private void parseInputPaths(Path[] inputPaths, List incrementalTables)
+ throws IOException {
+ for (Path inputPath : inputPaths) {
+ boolean basePathKnown = false;
+ for (HoodieTableMetaClient metaClient : tableMetaClientMap.values()) {
+ if (inputPath.toString().contains(metaClient.getBasePath())) {
+ // We already know the base path for this inputPath.
+ basePathKnown = true;
+ // Check if this is for a snapshot query
+ String tableName = metaClient.getTableConfig().getTableName();
+ tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
+ break;
+ }
+ }
+ if (!basePathKnown) {
+ // This path is for a table that we dont know about yet.
+ HoodieTableMetaClient metaClient;
+ try {
+ metaClient = getTableMetaClient(inputPath.getFileSystem(conf), inputPath);
+ String tableName = metaClient.getTableConfig().getTableName();
+ tableMetaClientMap.put(tableName, metaClient);
+ tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
+ } catch (TableNotFoundException | InvalidTableException e) {
+ // This is a non Hoodie inputPath
+ LOG.info("Handling a non-hoodie path " + inputPath);
+ nonHoodieInputPaths.add(inputPath);
+ }
+ }
+ }
+ }
+
+ private void tagAsIncrementalOrSnapshot(Path inputPath, String tableName,
+ HoodieTableMetaClient metaClient, List incrementalTables) {
+ if (!incrementalTables.contains(tableName)) {
+ snapshotPaths.add(inputPath);
+ } else {
+ // Group incremental Paths belonging to same table.
+ if (!groupedIncrementalPaths.containsKey(metaClient)) {
+ groupedIncrementalPaths.put(metaClient, new ArrayList<>());
+ }
+ groupedIncrementalPaths.get(metaClient).add(inputPath);
+ }
+ }
+
+ public Map> getGroupedIncrementalPaths() {
+ return groupedIncrementalPaths;
+ }
+
+ public Map getTableMetaClientMap() {
+ return tableMetaClientMap;
+ }
+
+ public List getSnapshotPaths() {
+ return snapshotPaths;
+ }
+
+ public List getNonHoodieInputPaths() {
+ return nonHoodieInputPaths;
+ }
+}
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java
new file mode 100644
index 000000000..307c18203
--- /dev/null
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.common.minicluster.HdfsTestService;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class InputPathHandlerTest {
+
+ // Incremental Table
+ public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
+ public static final String MODEL_TRIPS_TEST_NAME = "model_trips";
+
+ // snapshot Table
+ public static final String ETL_TRIPS_TEST_NAME = "etl_trips";
+
+ // non Hoodie table
+ public static final String TRIPS_STATS_TEST_NAME = "trips_stats";
+
+ private static MiniDFSCluster dfsCluster;
+ private static DistributedFileSystem dfs;
+ private static HdfsTestService hdfsTestService;
+ private static InputPathHandler inputPathHandler;
+ private static String basePathTable1 = null;
+ private static String basePathTable2 = null;
+ private static String basePathTable3 = null;
+ private static String basePathTable4 = null; // non hoodie Path
+ private static List incrementalTables;
+ private static List incrementalPaths;
+ private static List snapshotPaths;
+ private static List nonHoodiePaths;
+ private static List inputPaths;
+
+ @BeforeClass
+ public static void setUpDFS() throws IOException {
+ // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+ // same JVM
+ FileSystem.closeAll();
+ if (hdfsTestService == null) {
+ hdfsTestService = new HdfsTestService();
+ dfsCluster = hdfsTestService.start(true);
+ // Create a temp folder as the base path
+ dfs = dfsCluster.getFileSystem();
+ }
+ inputPaths = new ArrayList<>();
+ incrementalPaths = new ArrayList<>();
+ snapshotPaths = new ArrayList<>();
+ nonHoodiePaths = new ArrayList<>();
+ initTables();
+ }
+
+ @AfterClass
+ public static void cleanUp() throws Exception {
+ if (hdfsTestService != null) {
+ hdfsTestService.stop();
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ dfs = null;
+ hdfsTestService = null;
+ }
+ // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+ // same JVM
+ FileSystem.closeAll();
+ }
+
+ static void initTables() throws IOException {
+ // Create a temp folder as the base path
+ TemporaryFolder parentFolder = new TemporaryFolder();
+ parentFolder.create();
+ basePathTable1 = parentFolder.newFolder(RAW_TRIPS_TEST_NAME).getAbsolutePath();
+ basePathTable2 = parentFolder.newFolder(MODEL_TRIPS_TEST_NAME).getAbsolutePath();
+ basePathTable3 = parentFolder.newFolder(ETL_TRIPS_TEST_NAME).getAbsolutePath();
+ basePathTable4 = parentFolder.newFolder(TRIPS_STATS_TEST_NAME).getAbsolutePath();
+
+ dfs.mkdirs(new Path(basePathTable1));
+ initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ);
+ incrementalPaths.addAll(generatePartitions(dfs, basePathTable1));
+
+ dfs.mkdirs(new Path(basePathTable2));
+ initTableType(dfs.getConf(), basePathTable2, MODEL_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ);
+ incrementalPaths.addAll(generatePartitions(dfs, basePathTable2));
+
+ dfs.mkdirs(new Path(basePathTable3));
+ initTableType(dfs.getConf(), basePathTable3, ETL_TRIPS_TEST_NAME, HoodieTableType.COPY_ON_WRITE);
+ snapshotPaths.addAll(generatePartitions(dfs, basePathTable3));
+
+ dfs.mkdirs(new Path(basePathTable4));
+ nonHoodiePaths.addAll(generatePartitions(dfs, basePathTable4));
+
+ inputPaths.addAll(incrementalPaths);
+ inputPaths.addAll(snapshotPaths);
+ inputPaths.addAll(nonHoodiePaths);
+
+ incrementalTables = new ArrayList<>();
+ incrementalTables.add(RAW_TRIPS_TEST_NAME);
+ incrementalTables.add(MODEL_TRIPS_TEST_NAME);
+ }
+
+ static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
+ String tableName, HoodieTableType tableType) throws IOException {
+ Properties properties = new Properties();
+ properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
+ properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
+ properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
+ return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
+ }
+
+ static List generatePartitions(DistributedFileSystem dfs, String basePath)
+ throws IOException {
+ List paths = new ArrayList<>();
+ paths.add(new Path(basePath + Path.SEPARATOR + "2019/05/21"));
+ paths.add(new Path(basePath + Path.SEPARATOR + "2019/05/22"));
+ paths.add(new Path(basePath + Path.SEPARATOR + "2019/05/23"));
+ paths.add(new Path(basePath + Path.SEPARATOR + "2019/05/24"));
+ paths.add(new Path(basePath + Path.SEPARATOR + "2019/05/25"));
+ for (Path path: paths) {
+ dfs.mkdirs(path);
+ }
+ return paths;
+ }
+
+ @Test
+ public void testInputPathHandler() throws IOException {
+ inputPathHandler = new InputPathHandler(dfs.getConf(), inputPaths.toArray(
+ new Path[inputPaths.size()]), incrementalTables);
+ List actualPaths = inputPathHandler.getGroupedIncrementalPaths().values().stream()
+ .flatMap(List::stream).collect(Collectors.toList());
+ assertTrue(actualComparesToExpected(actualPaths, incrementalPaths));
+ actualPaths = inputPathHandler.getSnapshotPaths();
+ assertTrue(actualComparesToExpected(actualPaths, snapshotPaths));
+ actualPaths = inputPathHandler.getNonHoodieInputPaths();
+ assertTrue(actualComparesToExpected(actualPaths, nonHoodiePaths));
+ }
+
+ private boolean actualComparesToExpected(List actualPaths, List expectedPaths) {
+ if (actualPaths.size() != expectedPaths.size()) {
+ return false;
+ }
+ for (Path path: actualPaths) {
+ if (!expectedPaths.contains(path)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
similarity index 79%
rename from hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieInputFormat.java
rename to hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 6eecc57a5..ed501e700 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieInputFormat.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -18,8 +18,14 @@
package org.apache.hudi.hadoop;
-import org.apache.hudi.common.util.FSUtils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.ArrayWritable;
@@ -28,17 +34,17 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTestUtils;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.FSUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestHoodieInputFormat {
+public class TestHoodieParquetInputFormat {
private HoodieParquetInputFormat inputFormat;
private JobConf jobConf;
@@ -100,7 +106,7 @@ public class TestHoodieInputFormat {
public void testIncrementalSimple() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
- InputFormatTestUtil.commit(basePath, "100");
+ createCommitFile(basePath, "100", "2016/05/01");
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
@@ -112,28 +118,42 @@ public class TestHoodieInputFormat {
files.length);
}
+ private void createCommitFile(TemporaryFolder basePath, String commitNumber, String partitionPath)
+ throws IOException {
+ List writeStats = HoodieTestUtils.generateFakeHoodieWriteStat(1);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ writeStats.stream().forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat));
+ File file = new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit");
+ file.createNewFile();
+ FileOutputStream fileOutputStream = new FileOutputStream(file);
+ fileOutputStream.write(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+ fileOutputStream.flush();
+ fileOutputStream.close();
+ }
+
@Test
public void testIncrementalWithMultipleCommits() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
- InputFormatTestUtil.commit(basePath, "100");
+ createCommitFile(basePath, "100", "2016/05/01");
+
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
// update files
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", false);
- InputFormatTestUtil.commit(basePath, "200");
+ createCommitFile(basePath, "200", "2016/05/01");
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 4, "300", false);
- InputFormatTestUtil.commit(basePath, "300");
+ createCommitFile(basePath, "300", "2016/05/01");
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 3, "400", false);
- InputFormatTestUtil.commit(basePath, "400");
+ createCommitFile(basePath, "400", "2016/05/01");
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 2, "500", false);
- InputFormatTestUtil.commit(basePath, "500");
+ createCommitFile(basePath, "500", "2016/05/01");
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 1, "600", false);
- InputFormatTestUtil.commit(basePath, "600");
+ createCommitFile(basePath, "600", "2016/05/01");
InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
FileStatus[] files = inputFormat.listStatus(jobConf);
@@ -190,8 +210,24 @@ public class TestHoodieInputFormat {
2, 10);
}
- private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit, int totalExpected)
- throws IOException {
+ @Test
+ public void testGetIncrementalTableNames() throws IOException {
+ String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips"};
+ JobConf conf = new JobConf();
+ String incrementalMode1 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
+ conf.set(incrementalMode1, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
+ String incrementalMode2 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
+ conf.set(incrementalMode2,HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
+ String defaultmode = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
+ conf.set(defaultmode, HoodieHiveUtil.DEFAULT_SCAN_MODE);
+ List actualincrTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(conf));
+ for (int i = 0; i < expectedincrTables.length; i++) {
+ assertTrue(actualincrTables.contains(expectedincrTables[i]));
+ }
+ }
+
+ private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit,
+ int totalExpected) throws IOException {
int actualCount = 0;
int totalCount = 0;
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);