diff --git a/hoodie-hadoop-mr/pom.xml b/hoodie-hadoop-mr/pom.xml
new file mode 100644
index 000000000..59137bf04
--- /dev/null
+++ b/hoodie-hadoop-mr/pom.xml
@@ -0,0 +1,69 @@
+
+
+
+ hoodie
+ com.uber.hoodie
+ 0.2.5-SNAPSHOT
+
+ 4.0.0
+
+ hoodie-hadoop-mr
+
+
+
+ com.uber.hoodie
+ hoodie-common
+ ${project.version}
+
+
+ com.uber.hoodie
+ hoodie-common
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-auth
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hive
+ hive-exec
+
+
+ commons-logging
+ commons-logging
+
+
+ org.apache.hive
+ hive-jdbc
+
+
+ org.apache.parquet
+ parquet-avro
+
+
+ org.apache.avro
+ avro
+
+
+
diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java
new file mode 100644
index 000000000..ee38cb2d7
--- /dev/null
+++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hadoop;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HoodieHiveUtil {
+ public static final Logger LOG =
+ LogManager.getLogger(HoodieHiveUtil.class);
+
+ public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
+ public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
+ public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits";
+ public static final String INCREMENTAL_SCAN_MODE = "INCREMENTAL";
+ public static final String LATEST_SCAN_MODE = "LATEST";
+ public static final String DEFAULT_SCAN_MODE = LATEST_SCAN_MODE;
+ public static final int DEFAULT_MAX_COMMITS = 1;
+ public static final int MAX_COMMIT_ALL = -1;
+
+ public static Integer readMaxCommits(JobContext job, String tableName) {
+ String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName);
+ int maxCommits = job.getConfiguration().getInt(maxCommitName, DEFAULT_MAX_COMMITS);
+ if (maxCommits == MAX_COMMIT_ALL) {
+ maxCommits = Integer.MAX_VALUE;
+ }
+ LOG.info("Read max commits - " + maxCommits);
+ return maxCommits;
+ }
+
+ public static String readStartCommitTime(JobContext job, String tableName) {
+ String startCommitTimestampName = String.format(HOODIE_START_COMMIT_PATTERN, tableName);
+ LOG.info("Read start commit time - " + job.getConfiguration().get(startCommitTimestampName));
+ return job.getConfiguration().get(startCommitTimestampName);
+ }
+
+ public static String readMode(JobContext job, String tableName) {
+ String modePropertyName = String.format(HOODIE_CONSUME_MODE_PATTERN, tableName);
+ String mode =job.getConfiguration().get(modePropertyName, DEFAULT_SCAN_MODE);
+ LOG.info(modePropertyName + ": " + mode);
+ return mode;
+ }
+}
diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
new file mode 100644
index 000000000..5e81c6231
--- /dev/null
+++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hadoop;
+
+import com.uber.hoodie.common.model.HoodieRecord;
+import com.uber.hoodie.common.model.HoodieTableMetadata;
+import com.uber.hoodie.exception.InvalidDatasetException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.Operators;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.FileMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.io.api.Binary;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.binaryColumn;
+import static parquet.filter2.predicate.FilterApi.gt;
+
+/**
+ * HoodieInputFormat which understands the Hoodie File Structure and filters
+ * files based on the Hoodie Mode. If paths that does not correspond to a hoodie dataset
+ * then they are passed in as is (as what FileInputFormat.listStatus() would do).
+ * The JobConf could have paths from multipe Hoodie/Non-Hoodie datasets
+ */
+public class HoodieInputFormat extends MapredParquetInputFormat
+ implements Configurable {
+ public static final Log LOG = LogFactory.getLog(HoodieInputFormat.class);
+ private Configuration conf;
+
+ @Override
+ public FileStatus[] listStatus(JobConf job) throws IOException {
+ // Get all the file status from FileInputFormat and then do the filter
+ FileStatus[] fileStatuses = super.listStatus(job);
+ Map> groupedFileStatus = groupFileStatus(fileStatuses);
+ LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
+ List returns = new ArrayList();
+ for(Map.Entry> entry:groupedFileStatus.entrySet()) {
+ HoodieTableMetadata metadata = entry.getKey();
+ if(metadata == null) {
+ // Add all the paths which are not hoodie specific
+ returns.addAll(entry.getValue());
+ continue;
+ }
+
+ FileStatus[] value = entry.getValue().toArray(new FileStatus[entry.getValue().size()]);
+ LOG.info("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
+ String tableName = metadata.getTableName();
+ String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
+ if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
+ // this is of the form commitTs_partition_sequenceNumber
+ String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
+ // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+ Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
+ LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+ List
+ commitsToReturn = metadata.findCommitsAfter(lastIncrementalTs, maxCommits);
+ FileStatus[] filteredFiles =
+ metadata.getLatestVersionInRange(value, commitsToReturn);
+ for (FileStatus filteredFile : filteredFiles) {
+ LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
+ returns.add(filteredFile);
+ }
+ LOG.info(
+ "Total paths to process after hoodie incremental filter " + filteredFiles.length);
+ } else {
+ // filter files on the latest commit found
+ FileStatus[] filteredFiles = metadata.getLatestVersions(value);
+ LOG.info("Total paths to process after hoodie filter " + filteredFiles.length);
+ for (FileStatus filteredFile : filteredFiles) {
+ LOG.info("Processing latest hoodie file - " + filteredFile.getPath());
+ returns.add(filteredFile);
+ }
+ }
+ }
+ return returns.toArray(new FileStatus[returns.size()]);
+
+ }
+
+ private Map> groupFileStatus(FileStatus[] fileStatuses)
+ throws IOException {
+ // This assumes the paths for different tables are grouped together
+ Map> grouped = new HashMap<>();
+ HoodieTableMetadata metadata = null;
+ String nonHoodieBasePath = null;
+ for(FileStatus status:fileStatuses) {
+ if ((metadata == null && nonHoodieBasePath == null) || (metadata == null && !status.getPath().toString()
+ .contains(nonHoodieBasePath)) || (metadata != null && !status.getPath().toString()
+ .contains(metadata.getBasePath()))) {
+ try {
+ metadata = getTableMetadata(status.getPath().getParent());
+ nonHoodieBasePath = null;
+ } catch (InvalidDatasetException e) {
+ LOG.info("Handling a non-hoodie path " + status.getPath());
+ metadata = null;
+ nonHoodieBasePath =
+ status.getPath().getParent().toString();
+ }
+ if(!grouped.containsKey(metadata)) {
+ grouped.put(metadata, new ArrayList());
+ }
+ }
+ grouped.get(metadata).add(status);
+ }
+ return grouped;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public RecordReader getRecordReader(final InputSplit split,
+ final JobConf job, final Reporter reporter) throws IOException {
+ // TODO enable automatic predicate pushdown after fixing issues
+// FileSplit fileSplit = (FileSplit) split;
+// HoodieTableMetadata metadata = getTableMetadata(fileSplit.getPath().getParent());
+// String tableName = metadata.getTableName();
+// String mode = HoodieHiveUtil.readMode(job, tableName);
+
+// if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
+// FilterPredicate predicate = constructHoodiePredicate(job, tableName, split);
+// LOG.info("Setting parquet predicate push down as " + predicate);
+// ParquetInputFormat.setFilterPredicate(job, predicate);
+ //clearOutExistingPredicate(job);
+// }
+ return super.getRecordReader(split, job, reporter);
+ }
+
+ /**
+ * Clears out the filter expression (if this is not done, then ParquetReader will override the FilterPredicate set)
+ *
+ * @param job
+ */
+ private void clearOutExistingPredicate(JobConf job) {
+ job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
+ }
+
+ /**
+ * Constructs the predicate to push down to parquet storage.
+ * This creates the predicate for `hoodie_commit_time` > 'start_commit_time' and ANDs with the existing predicate if one is present already.
+ *
+ * @param job
+ * @param tableName
+ * @return
+ */
+ private FilterPredicate constructHoodiePredicate(JobConf job, String tableName,
+ InputSplit split) throws IOException {
+ FilterPredicate commitTimePushdown = constructCommitTimePushdownPredicate(job, tableName);
+ LOG.info("Commit time predicate - " + commitTimePushdown.toString());
+ FilterPredicate existingPushdown = constructHQLPushdownPredicate(job, split);
+ LOG.info("Existing predicate - " + existingPushdown);
+
+ FilterPredicate hoodiePredicate;
+ if (existingPushdown != null) {
+ hoodiePredicate = and(existingPushdown, commitTimePushdown);
+ } else {
+ hoodiePredicate = commitTimePushdown;
+ }
+ LOG.info("Hoodie Predicate - " + hoodiePredicate);
+ return hoodiePredicate;
+ }
+
+ private FilterPredicate constructHQLPushdownPredicate(JobConf job, InputSplit split)
+ throws IOException {
+ String serializedPushdown = job.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ String columnNamesString = job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+ if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty()
+ || columnNamesString.isEmpty()) {
+ return null;
+ } else {
+ SearchArgument sarg =
+ SearchArgumentFactory.create(Utilities.deserializeExpression(serializedPushdown));
+ final Path finalPath = ((FileSplit) split).getPath();
+ final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(job, finalPath);
+ final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
+ return ParquetFilterPredicateConverter
+ .toFilterPredicate(sarg, fileMetaData.getSchema());
+ }
+ }
+
+ private FilterPredicate constructCommitTimePushdownPredicate(JobConf job, String tableName)
+ throws IOException {
+ String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
+ Operators.BinaryColumn sequenceColumn =
+ binaryColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+ FilterPredicate p = gt(sequenceColumn, Binary.fromString(lastIncrementalTs));
+ LOG.info("Setting predicate in InputFormat " + p.toString());
+ return p;
+ }
+
+ /**
+ * Read the table metadata from a data path. This assumes certain hierarchy of files which
+ * should be changed once a better way is figured out to pass in the hoodie meta directory
+ *
+ * @param dataPath
+ * @return
+ * @throws IOException
+ */
+ private HoodieTableMetadata getTableMetadata(Path dataPath) throws IOException {
+ FileSystem fs = dataPath.getFileSystem(conf);
+ // TODO - remove this hard-coding. Pass this in job conf, somehow. Or read the Table Location
+ Path baseDir = dataPath.getParent().getParent().getParent();
+ LOG.info("Reading hoodie metadata from path " + baseDir.toString());
+ return new HoodieTableMetadata(fs, baseDir.toString());
+
+ }
+}
diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
new file mode 100644
index 000000000..3ca847f6a
--- /dev/null
+++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hadoop;
+
+import com.uber.hoodie.common.util.FSUtils;
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.*;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class HoodieInputFormatTest {
+ private HoodieInputFormat inputFormat;
+ private JobConf jobConf;
+
+ @Before public void setUp() {
+ inputFormat = new HoodieInputFormat();
+ jobConf = new JobConf();
+ inputFormat.setConf(jobConf);
+ }
+
+ @Rule public TemporaryFolder basePath = new TemporaryFolder();
+
+ @Test public void testInputFormatLoad() throws IOException {
+ // initial commit
+ File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100");
+ InputFormatTestUtil.commit(basePath, "100");
+
+ // Add the paths
+ FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+ InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10);
+ assertEquals(10, inputSplits.length);
+
+ FileStatus[] files = inputFormat.listStatus(jobConf);
+ assertEquals(10, files.length);
+ }
+
+ @Test public void testInputFormatUpdates() throws IOException {
+ // initial commit
+ File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100");
+ InputFormatTestUtil.commit(basePath, "100");
+
+ // Add the paths
+ FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+ FileStatus[] files = inputFormat.listStatus(jobConf);
+ assertEquals(10, files.length);
+
+ // update files
+ InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", true);
+ // Before the commit
+ files = inputFormat.listStatus(jobConf);
+ assertEquals(10, files.length);
+ ensureFilesInCommit(
+ "Commit 200 has not been committed. We should not see files from this commit", files,
+ "200", 0);
+ InputFormatTestUtil.commit(basePath, "200");
+ files = inputFormat.listStatus(jobConf);
+ assertEquals(10, files.length);
+ ensureFilesInCommit(
+ "5 files have been updated to commit 200. We should see 5 files from commit 200 and 5 files from 100 commit",
+ files, "200", 5);
+ ensureFilesInCommit(
+ "5 files have been updated to commit 200. We should see 5 files from commit 100 and 5 files from 200 commit",
+ files, "100", 5);
+ }
+
+ @Test public void testIncrementalSimple() throws IOException {
+ // initial commit
+ File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100");
+ InputFormatTestUtil.commit(basePath, "100");
+
+ // Add the paths
+ FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+ InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
+
+ FileStatus[] files = inputFormat.listStatus(jobConf);
+ assertEquals(
+ "We should exclude commit 100 when returning incremental pull with start commit time as 100",
+ 0, files.length);
+ }
+
+ @Test public void testIncrementalWithMultipleCommits() throws IOException {
+ // initial commit
+ File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100");
+ InputFormatTestUtil.commit(basePath, "100");
+ // Add the paths
+ FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+ // update files
+ InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", false);
+ InputFormatTestUtil.commit(basePath, "200");
+
+ InputFormatTestUtil.simulateUpdates(partitionDir, "100", 4, "300", false);
+ InputFormatTestUtil.commit(basePath, "300");
+
+ InputFormatTestUtil.simulateUpdates(partitionDir, "100", 3, "400", false);
+ InputFormatTestUtil.commit(basePath, "400");
+
+ InputFormatTestUtil.simulateUpdates(partitionDir, "100", 2, "500", false);
+ InputFormatTestUtil.commit(basePath, "500");
+
+ InputFormatTestUtil.simulateUpdates(partitionDir, "100", 1, "600", false);
+ InputFormatTestUtil.commit(basePath, "600");
+
+ InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
+ FileStatus[] files = inputFormat.listStatus(jobConf);
+ assertEquals("Pulling 1 commit from 100, should get us the 5 files committed at 200", 5,
+ files.length);
+ ensureFilesInCommit("Pulling 1 commit from 100, should get us the 5 files committed at 200",
+ files, "200", 5);
+
+ InputFormatTestUtil.setupIncremental(jobConf, "100", 3);
+ files = inputFormat.listStatus(jobConf);
+
+ assertEquals(
+ "Pulling 3 commits from 100, should get us the 3 files from 400 commit, 1 file from 300 commit and 1 file from 200 commit",
+ 5, files.length);
+ ensureFilesInCommit("Pulling 3 commits from 100, should get us the 3 files from 400 commit",
+ files, "400", 3);
+ ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit",
+ files, "300", 1);
+ ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit",
+ files, "200", 1);
+
+ InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtil.MAX_COMMIT_ALL);
+ files = inputFormat.listStatus(jobConf);
+
+ assertEquals(
+ "Pulling all commits from 100, should get us the 1 file from each of 200,300,400,500,400 commits",
+ 5, files.length);
+ ensureFilesInCommit(
+ "Pulling all commits from 100, should get us the 1 files from 600 commit", files, "600",
+ 1);
+ ensureFilesInCommit(
+ "Pulling all commits from 100, should get us the 1 files from 500 commit", files, "500",
+ 1);
+ ensureFilesInCommit(
+ "Pulling all commits from 100, should get us the 1 files from 400 commit", files, "400",
+ 1);
+ ensureFilesInCommit(
+ "Pulling all commits from 100, should get us the 1 files from 300 commit", files, "300",
+ 1);
+ ensureFilesInCommit(
+ "Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200",
+ 1);
+ }
+
+ //TODO enable this after enabling predicate pushdown
+ public void testPredicatePushDown() throws IOException {
+ // initial commit
+ Schema schema = InputFormatTestUtil.readSchema("/sample1.avro");
+ String commit1 = "20160628071126";
+ File partitionDir =
+ InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 10, commit1);
+ InputFormatTestUtil.commit(basePath, commit1);
+ // Add the paths
+ FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+ // check whether we have 10 records at this point
+ ensureRecordsInCommit("We need to have 10 records at this point for commit " + commit1, commit1, 10, 10);
+
+ // update 2 records in the original parquet file and save it as commit 200
+ String commit2 = "20160629193623";
+ InputFormatTestUtil.simulateParquetUpdates(partitionDir, schema, commit1, 10, 2, commit2);
+ InputFormatTestUtil.commit(basePath, commit2);
+
+ InputFormatTestUtil.setupIncremental(jobConf, commit1, 1);
+ // check whether we have 2 records at this point
+ ensureRecordsInCommit(
+ "We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2, 2, 2);
+ // Make sure we have the 10 records if we roll back the stattime
+ InputFormatTestUtil.setupIncremental(jobConf, "0", 2);
+ ensureRecordsInCommit(
+ "We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1, 8, 10);
+ ensureRecordsInCommit(
+ "We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2, 2, 10);
+ }
+
+ private void ensureRecordsInCommit(String msg, String commit,
+ int expectedNumberOfRecordsInCommit, int totalExpected) throws IOException {
+ int actualCount = 0;
+ int totalCount = 0;
+ InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
+ for(InputSplit split:splits) {
+ RecordReader
+ recordReader = inputFormat.getRecordReader(split, jobConf, null);
+ Void key = recordReader.createKey();
+ ArrayWritable writable = recordReader.createValue();
+
+ while(recordReader.next(key, writable)) {
+ // writable returns an array with [field1, field2, _hoodie_commit_time, _hoodie_commit_seqno]
+ // Take the commit time and compare with the one we are interested in
+ if(commit.equals((writable.get()[2]).toString())) {
+ actualCount++;
+ }
+ totalCount++;
+ }
+ }
+ assertEquals(msg, expectedNumberOfRecordsInCommit, actualCount);
+ assertEquals(msg, totalExpected, totalCount);
+ }
+
+ public static void ensureFilesInCommit(String msg, FileStatus[] files, String commit,
+ int expected) {
+ int count = 0;
+ for (FileStatus file : files) {
+ String commitTs = FSUtils.getCommitTime(file.getPath().getName());
+ if (commit.equals(commitTs)) {
+ count++;
+ }
+ }
+ assertEquals(msg, expected, count);
+ }
+}
diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
new file mode 100644
index 000000000..d2e5e3edb
--- /dev/null
+++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hadoop;
+
+import com.uber.hoodie.common.model.HoodieRecord;
+import com.uber.hoodie.common.model.HoodieTestUtils;
+import com.uber.hoodie.common.util.FSUtils;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class InputFormatTestUtil {
+ public static File prepareDataset(TemporaryFolder basePath, int numberOfFiles,
+ String commitNumber) throws IOException {
+ basePath.create();
+ HoodieTestUtils.initializeHoodieDirectory(basePath.getRoot().toString());
+ File partitionPath = basePath.newFolder("2016", "05", "01");
+ for (int i = 0; i < numberOfFiles; i++) {
+ File dataFile =
+ new File(partitionPath, FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i));
+ dataFile.createNewFile();
+ }
+ return partitionPath;
+ }
+
+ public static void simulateUpdates(File directory, final String originalCommit, int numberOfFilesUpdated,
+ String newCommit, boolean randomize) throws IOException {
+ List dataFiles = Arrays.asList(directory.listFiles(new FilenameFilter() {
+ @Override public boolean accept(File dir, String name) {
+ String commitTs = FSUtils.getCommitTime(name);
+ return originalCommit.equals(commitTs);
+ }
+ }));
+ if(randomize) {
+ Collections.shuffle(dataFiles);
+ }
+ List toUpdateList =
+ dataFiles.subList(0, Math.min(numberOfFilesUpdated, dataFiles.size()));
+ for (File file : toUpdateList) {
+ String fileId = FSUtils.getFileId(file.getName());
+ File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, 1, fileId));
+ dataFile.createNewFile();
+ }
+ }
+
+ public static void commit(TemporaryFolder basePath, String commitNumber) throws IOException {
+ // create the commit
+ new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit").createNewFile();
+ }
+
+ public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
+ String modePropertyName = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN,
+ HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
+
+ String startCommitTimestampName = String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ jobConf.set(startCommitTimestampName, startCommit);
+
+ String maxCommitPulls = String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
+ }
+
+ public static Schema readSchema(String location) throws IOException {
+ return new Schema.Parser().parse(InputFormatTestUtil.class.getResourceAsStream(location));
+ }
+
+ public static File prepareParquetDataset(TemporaryFolder basePath, Schema schema, int numberOfFiles, int numberOfRecords,
+ String commitNumber) throws IOException {
+ basePath.create();
+ HoodieTestUtils.initializeHoodieDirectory(basePath.getRoot().toString());
+ File partitionPath = basePath.newFolder("2016", "05", "01");
+ AvroParquetWriter parquetWriter;
+ for (int i = 0; i < numberOfFiles; i++) {
+ File dataFile =
+ new File(partitionPath, FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i));
+ // dataFile.createNewFile();
+ parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()),
+ schema);
+ try {
+ for (GenericRecord record : generateAvroRecords(schema, numberOfRecords, commitNumber)) {
+ parquetWriter.write(record);
+ }
+ } finally {
+ parquetWriter.close();
+ }
+ }
+ return partitionPath;
+
+ }
+
+ private static Iterable extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime) {
+ List records = new ArrayList<>(numberOfRecords);
+ for(int i=0;i 0) {
+ // update this record
+ record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, newCommit);
+ String oldSeqNo = (String) record.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD);
+ record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
+ oldSeqNo.replace(originalCommit, newCommit));
+ numberOfRecordsToUpdate--;
+ }
+ parquetWriter.write(record);
+ }
+ } finally {
+ parquetWriter.close();
+ }
+
+ }
+}
diff --git a/hoodie-hadoop-mr/src/test/resources/sample1.avro b/hoodie-hadoop-mr/src/test/resources/sample1.avro
new file mode 100644
index 000000000..20455fa5a
--- /dev/null
+++ b/hoodie-hadoop-mr/src/test/resources/sample1.avro
@@ -0,0 +1,17 @@
+{
+ "type" : "record",
+ "name" : "testRecord",
+ "fields" : [ {
+ "name" : "field1",
+ "type" : "string"
+ }, {
+ "name" : "field2",
+ "type" : "string"
+ }, {
+ "name" : "_hoodie_commit_time",
+ "type" : "string"
+ }, {
+ "name" : "_hoodie_commit_seqno",
+ "type" : "string"
+ }]
+}
diff --git a/pom.xml b/pom.xml
index 52c19f357..4aabe6def 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,6 +26,7 @@
hoodie-common
hoodie-client
hoodie-cli
+ hoodie-hadoop-mr
@@ -74,6 +75,7 @@
junit
junit
${junit.version}
+ test
@@ -251,12 +253,6 @@
${hive.version}-cdh${cdh.version}
provided
-
-
-
-
-
-
org.apache.hadoop
hadoop-mapreduce-client-core
@@ -381,6 +377,12 @@
1.9.13
+
+ org.apache.hive
+ hive-jdbc
+ 1.1.0-cdh5.7.2
+
+