From 61200b1207abdec60420771b0ae367a7c6466720 Mon Sep 17 00:00:00 2001 From: Prasanna Rajaperumal Date: Fri, 16 Dec 2016 19:29:53 -0800 Subject: [PATCH] Adding hoodie-hadoop-mr module to add HoodieInputFormat --- hoodie-hadoop-mr/pom.xml | 69 +++++ .../uber/hoodie/hadoop/HoodieHiveUtil.java | 58 ++++ .../uber/hoodie/hadoop/HoodieInputFormat.java | 252 ++++++++++++++++++ .../hoodie/hadoop/HoodieInputFormatTest.java | 237 ++++++++++++++++ .../hoodie/hadoop/InputFormatTestUtil.java | 165 ++++++++++++ .../src/test/resources/sample1.avro | 17 ++ pom.xml | 14 +- 7 files changed, 806 insertions(+), 6 deletions(-) create mode 100644 hoodie-hadoop-mr/pom.xml create mode 100644 hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java create mode 100644 hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java create mode 100644 hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java create mode 100644 hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java create mode 100644 hoodie-hadoop-mr/src/test/resources/sample1.avro diff --git a/hoodie-hadoop-mr/pom.xml b/hoodie-hadoop-mr/pom.xml new file mode 100644 index 000000000..59137bf04 --- /dev/null +++ b/hoodie-hadoop-mr/pom.xml @@ -0,0 +1,69 @@ + + + + hoodie + com.uber.hoodie + 0.2.5-SNAPSHOT + + 4.0.0 + + hoodie-hadoop-mr + + + + com.uber.hoodie + hoodie-common + ${project.version} + + + com.uber.hoodie + hoodie-common + ${project.version} + test-jar + test + + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.apache.hadoop + hadoop-mapreduce-client-common + + + org.apache.hadoop + hadoop-auth + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hive + hive-exec + + + commons-logging + commons-logging + + + org.apache.hive + hive-jdbc + + + org.apache.parquet + parquet-avro + + + org.apache.avro + avro + + + diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java new file mode 100644 index 000000000..ee38cb2d7 --- /dev/null +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hadoop; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +public class HoodieHiveUtil { + public static final Logger LOG = + LogManager.getLogger(HoodieHiveUtil.class); + + public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode"; + public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp"; + public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits"; + public static final String INCREMENTAL_SCAN_MODE = "INCREMENTAL"; + public static final String LATEST_SCAN_MODE = "LATEST"; + public static final String DEFAULT_SCAN_MODE = LATEST_SCAN_MODE; + public static final int DEFAULT_MAX_COMMITS = 1; + public static final int MAX_COMMIT_ALL = -1; + + public static Integer readMaxCommits(JobContext job, String tableName) { + String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName); + int maxCommits = job.getConfiguration().getInt(maxCommitName, DEFAULT_MAX_COMMITS); + if (maxCommits == MAX_COMMIT_ALL) { + maxCommits = Integer.MAX_VALUE; + } + LOG.info("Read max commits - " + maxCommits); + return maxCommits; + } + + public static String readStartCommitTime(JobContext job, String tableName) { + String startCommitTimestampName = String.format(HOODIE_START_COMMIT_PATTERN, tableName); + LOG.info("Read start commit time - " + job.getConfiguration().get(startCommitTimestampName)); + return job.getConfiguration().get(startCommitTimestampName); + } + + public static String readMode(JobContext job, String tableName) { + String modePropertyName = String.format(HOODIE_CONSUME_MODE_PATTERN, tableName); + String mode =job.getConfiguration().get(modePropertyName, DEFAULT_SCAN_MODE); + LOG.info(modePropertyName + ": " + mode); + return mode; + } +} diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java new file mode 100644 index 000000000..5e81c6231 --- /dev/null +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java @@ -0,0 +1,252 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hadoop; + +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableMetadata; +import com.uber.hoodie.exception.InvalidDatasetException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import parquet.filter2.predicate.FilterPredicate; +import parquet.filter2.predicate.Operators; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.metadata.FileMetaData; +import parquet.hadoop.metadata.ParquetMetadata; +import parquet.io.api.Binary; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static parquet.filter2.predicate.FilterApi.and; +import static parquet.filter2.predicate.FilterApi.binaryColumn; +import static parquet.filter2.predicate.FilterApi.gt; + +/** + * HoodieInputFormat which understands the Hoodie File Structure and filters + * files based on the Hoodie Mode. If paths that does not correspond to a hoodie dataset + * then they are passed in as is (as what FileInputFormat.listStatus() would do). + * The JobConf could have paths from multipe Hoodie/Non-Hoodie datasets + */ +public class HoodieInputFormat extends MapredParquetInputFormat + implements Configurable { + public static final Log LOG = LogFactory.getLog(HoodieInputFormat.class); + private Configuration conf; + + @Override + public FileStatus[] listStatus(JobConf job) throws IOException { + // Get all the file status from FileInputFormat and then do the filter + FileStatus[] fileStatuses = super.listStatus(job); + Map> groupedFileStatus = groupFileStatus(fileStatuses); + LOG.info("Found a total of " + groupedFileStatus.size() + " groups"); + List returns = new ArrayList(); + for(Map.Entry> entry:groupedFileStatus.entrySet()) { + HoodieTableMetadata metadata = entry.getKey(); + if(metadata == null) { + // Add all the paths which are not hoodie specific + returns.addAll(entry.getValue()); + continue; + } + + FileStatus[] value = entry.getValue().toArray(new FileStatus[entry.getValue().size()]); + LOG.info("Hoodie Metadata initialized with completed commit Ts as :" + metadata); + String tableName = metadata.getTableName(); + String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName); + if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) { + // this is of the form commitTs_partition_sequenceNumber + String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName); + // Total number of commits to return in this batch. Set this to -1 to get all the commits. + Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName); + LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs); + List + commitsToReturn = metadata.findCommitsAfter(lastIncrementalTs, maxCommits); + FileStatus[] filteredFiles = + metadata.getLatestVersionInRange(value, commitsToReturn); + for (FileStatus filteredFile : filteredFiles) { + LOG.info("Processing incremental hoodie file - " + filteredFile.getPath()); + returns.add(filteredFile); + } + LOG.info( + "Total paths to process after hoodie incremental filter " + filteredFiles.length); + } else { + // filter files on the latest commit found + FileStatus[] filteredFiles = metadata.getLatestVersions(value); + LOG.info("Total paths to process after hoodie filter " + filteredFiles.length); + for (FileStatus filteredFile : filteredFiles) { + LOG.info("Processing latest hoodie file - " + filteredFile.getPath()); + returns.add(filteredFile); + } + } + } + return returns.toArray(new FileStatus[returns.size()]); + + } + + private Map> groupFileStatus(FileStatus[] fileStatuses) + throws IOException { + // This assumes the paths for different tables are grouped together + Map> grouped = new HashMap<>(); + HoodieTableMetadata metadata = null; + String nonHoodieBasePath = null; + for(FileStatus status:fileStatuses) { + if ((metadata == null && nonHoodieBasePath == null) || (metadata == null && !status.getPath().toString() + .contains(nonHoodieBasePath)) || (metadata != null && !status.getPath().toString() + .contains(metadata.getBasePath()))) { + try { + metadata = getTableMetadata(status.getPath().getParent()); + nonHoodieBasePath = null; + } catch (InvalidDatasetException e) { + LOG.info("Handling a non-hoodie path " + status.getPath()); + metadata = null; + nonHoodieBasePath = + status.getPath().getParent().toString(); + } + if(!grouped.containsKey(metadata)) { + grouped.put(metadata, new ArrayList()); + } + } + grouped.get(metadata).add(status); + } + return grouped; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Configuration getConf() { + return conf; + } + + @Override + public RecordReader getRecordReader(final InputSplit split, + final JobConf job, final Reporter reporter) throws IOException { + // TODO enable automatic predicate pushdown after fixing issues +// FileSplit fileSplit = (FileSplit) split; +// HoodieTableMetadata metadata = getTableMetadata(fileSplit.getPath().getParent()); +// String tableName = metadata.getTableName(); +// String mode = HoodieHiveUtil.readMode(job, tableName); + +// if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) { +// FilterPredicate predicate = constructHoodiePredicate(job, tableName, split); +// LOG.info("Setting parquet predicate push down as " + predicate); +// ParquetInputFormat.setFilterPredicate(job, predicate); + //clearOutExistingPredicate(job); +// } + return super.getRecordReader(split, job, reporter); + } + + /** + * Clears out the filter expression (if this is not done, then ParquetReader will override the FilterPredicate set) + * + * @param job + */ + private void clearOutExistingPredicate(JobConf job) { + job.unset(TableScanDesc.FILTER_EXPR_CONF_STR); + } + + /** + * Constructs the predicate to push down to parquet storage. + * This creates the predicate for `hoodie_commit_time` > 'start_commit_time' and ANDs with the existing predicate if one is present already. + * + * @param job + * @param tableName + * @return + */ + private FilterPredicate constructHoodiePredicate(JobConf job, String tableName, + InputSplit split) throws IOException { + FilterPredicate commitTimePushdown = constructCommitTimePushdownPredicate(job, tableName); + LOG.info("Commit time predicate - " + commitTimePushdown.toString()); + FilterPredicate existingPushdown = constructHQLPushdownPredicate(job, split); + LOG.info("Existing predicate - " + existingPushdown); + + FilterPredicate hoodiePredicate; + if (existingPushdown != null) { + hoodiePredicate = and(existingPushdown, commitTimePushdown); + } else { + hoodiePredicate = commitTimePushdown; + } + LOG.info("Hoodie Predicate - " + hoodiePredicate); + return hoodiePredicate; + } + + private FilterPredicate constructHQLPushdownPredicate(JobConf job, InputSplit split) + throws IOException { + String serializedPushdown = job.get(TableScanDesc.FILTER_EXPR_CONF_STR); + String columnNamesString = job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); + if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty() + || columnNamesString.isEmpty()) { + return null; + } else { + SearchArgument sarg = + SearchArgumentFactory.create(Utilities.deserializeExpression(serializedPushdown)); + final Path finalPath = ((FileSplit) split).getPath(); + final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(job, finalPath); + final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + return ParquetFilterPredicateConverter + .toFilterPredicate(sarg, fileMetaData.getSchema()); + } + } + + private FilterPredicate constructCommitTimePushdownPredicate(JobConf job, String tableName) + throws IOException { + String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName); + Operators.BinaryColumn sequenceColumn = + binaryColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD); + FilterPredicate p = gt(sequenceColumn, Binary.fromString(lastIncrementalTs)); + LOG.info("Setting predicate in InputFormat " + p.toString()); + return p; + } + + /** + * Read the table metadata from a data path. This assumes certain hierarchy of files which + * should be changed once a better way is figured out to pass in the hoodie meta directory + * + * @param dataPath + * @return + * @throws IOException + */ + private HoodieTableMetadata getTableMetadata(Path dataPath) throws IOException { + FileSystem fs = dataPath.getFileSystem(conf); + // TODO - remove this hard-coding. Pass this in job conf, somehow. Or read the Table Location + Path baseDir = dataPath.getParent().getParent().getParent(); + LOG.info("Reading hoodie metadata from path " + baseDir.toString()); + return new HoodieTableMetadata(fs, baseDir.toString()); + + } +} diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java new file mode 100644 index 000000000..3ca847f6a --- /dev/null +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java @@ -0,0 +1,237 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hadoop; + +import com.uber.hoodie.common.util.FSUtils; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapred.*; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class HoodieInputFormatTest { + private HoodieInputFormat inputFormat; + private JobConf jobConf; + + @Before public void setUp() { + inputFormat = new HoodieInputFormat(); + jobConf = new JobConf(); + inputFormat.setConf(jobConf); + } + + @Rule public TemporaryFolder basePath = new TemporaryFolder(); + + @Test public void testInputFormatLoad() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100"); + InputFormatTestUtil.commit(basePath, "100"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10); + assertEquals(10, inputSplits.length); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + } + + @Test public void testInputFormatUpdates() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100"); + InputFormatTestUtil.commit(basePath, "100"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + + // update files + InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", true); + // Before the commit + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + ensureFilesInCommit( + "Commit 200 has not been committed. We should not see files from this commit", files, + "200", 0); + InputFormatTestUtil.commit(basePath, "200"); + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + ensureFilesInCommit( + "5 files have been updated to commit 200. We should see 5 files from commit 200 and 5 files from 100 commit", + files, "200", 5); + ensureFilesInCommit( + "5 files have been updated to commit 200. We should see 5 files from commit 100 and 5 files from 200 commit", + files, "100", 5); + } + + @Test public void testIncrementalSimple() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100"); + InputFormatTestUtil.commit(basePath, "100"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + InputFormatTestUtil.setupIncremental(jobConf, "100", 1); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals( + "We should exclude commit 100 when returning incremental pull with start commit time as 100", + 0, files.length); + } + + @Test public void testIncrementalWithMultipleCommits() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100"); + InputFormatTestUtil.commit(basePath, "100"); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + // update files + InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", false); + InputFormatTestUtil.commit(basePath, "200"); + + InputFormatTestUtil.simulateUpdates(partitionDir, "100", 4, "300", false); + InputFormatTestUtil.commit(basePath, "300"); + + InputFormatTestUtil.simulateUpdates(partitionDir, "100", 3, "400", false); + InputFormatTestUtil.commit(basePath, "400"); + + InputFormatTestUtil.simulateUpdates(partitionDir, "100", 2, "500", false); + InputFormatTestUtil.commit(basePath, "500"); + + InputFormatTestUtil.simulateUpdates(partitionDir, "100", 1, "600", false); + InputFormatTestUtil.commit(basePath, "600"); + + InputFormatTestUtil.setupIncremental(jobConf, "100", 1); + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals("Pulling 1 commit from 100, should get us the 5 files committed at 200", 5, + files.length); + ensureFilesInCommit("Pulling 1 commit from 100, should get us the 5 files committed at 200", + files, "200", 5); + + InputFormatTestUtil.setupIncremental(jobConf, "100", 3); + files = inputFormat.listStatus(jobConf); + + assertEquals( + "Pulling 3 commits from 100, should get us the 3 files from 400 commit, 1 file from 300 commit and 1 file from 200 commit", + 5, files.length); + ensureFilesInCommit("Pulling 3 commits from 100, should get us the 3 files from 400 commit", + files, "400", 3); + ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit", + files, "300", 1); + ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit", + files, "200", 1); + + InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtil.MAX_COMMIT_ALL); + files = inputFormat.listStatus(jobConf); + + assertEquals( + "Pulling all commits from 100, should get us the 1 file from each of 200,300,400,500,400 commits", + 5, files.length); + ensureFilesInCommit( + "Pulling all commits from 100, should get us the 1 files from 600 commit", files, "600", + 1); + ensureFilesInCommit( + "Pulling all commits from 100, should get us the 1 files from 500 commit", files, "500", + 1); + ensureFilesInCommit( + "Pulling all commits from 100, should get us the 1 files from 400 commit", files, "400", + 1); + ensureFilesInCommit( + "Pulling all commits from 100, should get us the 1 files from 300 commit", files, "300", + 1); + ensureFilesInCommit( + "Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", + 1); + } + + //TODO enable this after enabling predicate pushdown + public void testPredicatePushDown() throws IOException { + // initial commit + Schema schema = InputFormatTestUtil.readSchema("/sample1.avro"); + String commit1 = "20160628071126"; + File partitionDir = + InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 10, commit1); + InputFormatTestUtil.commit(basePath, commit1); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + // check whether we have 10 records at this point + ensureRecordsInCommit("We need to have 10 records at this point for commit " + commit1, commit1, 10, 10); + + // update 2 records in the original parquet file and save it as commit 200 + String commit2 = "20160629193623"; + InputFormatTestUtil.simulateParquetUpdates(partitionDir, schema, commit1, 10, 2, commit2); + InputFormatTestUtil.commit(basePath, commit2); + + InputFormatTestUtil.setupIncremental(jobConf, commit1, 1); + // check whether we have 2 records at this point + ensureRecordsInCommit( + "We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2, 2, 2); + // Make sure we have the 10 records if we roll back the stattime + InputFormatTestUtil.setupIncremental(jobConf, "0", 2); + ensureRecordsInCommit( + "We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1, 8, 10); + ensureRecordsInCommit( + "We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2, 2, 10); + } + + private void ensureRecordsInCommit(String msg, String commit, + int expectedNumberOfRecordsInCommit, int totalExpected) throws IOException { + int actualCount = 0; + int totalCount = 0; + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + for(InputSplit split:splits) { + RecordReader + recordReader = inputFormat.getRecordReader(split, jobConf, null); + Void key = recordReader.createKey(); + ArrayWritable writable = recordReader.createValue(); + + while(recordReader.next(key, writable)) { + // writable returns an array with [field1, field2, _hoodie_commit_time, _hoodie_commit_seqno] + // Take the commit time and compare with the one we are interested in + if(commit.equals((writable.get()[2]).toString())) { + actualCount++; + } + totalCount++; + } + } + assertEquals(msg, expectedNumberOfRecordsInCommit, actualCount); + assertEquals(msg, totalExpected, totalCount); + } + + public static void ensureFilesInCommit(String msg, FileStatus[] files, String commit, + int expected) { + int count = 0; + for (FileStatus file : files) { + String commitTs = FSUtils.getCommitTime(file.getPath().getName()); + if (commit.equals(commitTs)) { + count++; + } + } + assertEquals(msg, expected, count); + } +} diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java new file mode 100644 index 000000000..d2e5e3edb --- /dev/null +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hadoop; + +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.util.FSUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroParquetWriter; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class InputFormatTestUtil { + public static File prepareDataset(TemporaryFolder basePath, int numberOfFiles, + String commitNumber) throws IOException { + basePath.create(); + HoodieTestUtils.initializeHoodieDirectory(basePath.getRoot().toString()); + File partitionPath = basePath.newFolder("2016", "05", "01"); + for (int i = 0; i < numberOfFiles; i++) { + File dataFile = + new File(partitionPath, FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i)); + dataFile.createNewFile(); + } + return partitionPath; + } + + public static void simulateUpdates(File directory, final String originalCommit, int numberOfFilesUpdated, + String newCommit, boolean randomize) throws IOException { + List dataFiles = Arrays.asList(directory.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + String commitTs = FSUtils.getCommitTime(name); + return originalCommit.equals(commitTs); + } + })); + if(randomize) { + Collections.shuffle(dataFiles); + } + List toUpdateList = + dataFiles.subList(0, Math.min(numberOfFilesUpdated, dataFiles.size())); + for (File file : toUpdateList) { + String fileId = FSUtils.getFileId(file.getName()); + File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, 1, fileId)); + dataFile.createNewFile(); + } + } + + public static void commit(TemporaryFolder basePath, String commitNumber) throws IOException { + // create the commit + new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit").createNewFile(); + } + + public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) { + String modePropertyName = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, + HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE); + + String startCommitTimestampName = String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(startCommitTimestampName, startCommit); + + String maxCommitPulls = String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); + } + + public static Schema readSchema(String location) throws IOException { + return new Schema.Parser().parse(InputFormatTestUtil.class.getResourceAsStream(location)); + } + + public static File prepareParquetDataset(TemporaryFolder basePath, Schema schema, int numberOfFiles, int numberOfRecords, + String commitNumber) throws IOException { + basePath.create(); + HoodieTestUtils.initializeHoodieDirectory(basePath.getRoot().toString()); + File partitionPath = basePath.newFolder("2016", "05", "01"); + AvroParquetWriter parquetWriter; + for (int i = 0; i < numberOfFiles; i++) { + File dataFile = + new File(partitionPath, FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i)); + // dataFile.createNewFile(); + parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), + schema); + try { + for (GenericRecord record : generateAvroRecords(schema, numberOfRecords, commitNumber)) { + parquetWriter.write(record); + } + } finally { + parquetWriter.close(); + } + } + return partitionPath; + + } + + private static Iterable generateAvroRecords(Schema schema, int numberOfRecords, String commitTime) { + List records = new ArrayList<>(numberOfRecords); + for(int i=0;i 0) { + // update this record + record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, newCommit); + String oldSeqNo = (String) record.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD); + record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, + oldSeqNo.replace(originalCommit, newCommit)); + numberOfRecordsToUpdate--; + } + parquetWriter.write(record); + } + } finally { + parquetWriter.close(); + } + + } +} diff --git a/hoodie-hadoop-mr/src/test/resources/sample1.avro b/hoodie-hadoop-mr/src/test/resources/sample1.avro new file mode 100644 index 000000000..20455fa5a --- /dev/null +++ b/hoodie-hadoop-mr/src/test/resources/sample1.avro @@ -0,0 +1,17 @@ +{ + "type" : "record", + "name" : "testRecord", + "fields" : [ { + "name" : "field1", + "type" : "string" + }, { + "name" : "field2", + "type" : "string" + }, { + "name" : "_hoodie_commit_time", + "type" : "string" + }, { + "name" : "_hoodie_commit_seqno", + "type" : "string" + }] +} diff --git a/pom.xml b/pom.xml index 52c19f357..4aabe6def 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,7 @@ hoodie-common hoodie-client hoodie-cli + hoodie-hadoop-mr @@ -74,6 +75,7 @@ junit junit ${junit.version} + test @@ -251,12 +253,6 @@ ${hive.version}-cdh${cdh.version} provided - - - - - - org.apache.hadoop hadoop-mapreduce-client-core @@ -381,6 +377,12 @@ 1.9.13 + + org.apache.hive + hive-jdbc + 1.1.0-cdh5.7.2 + +