Merge pull request #3 from prazanna/master
Adding hoodie-hadoop-mr module to add HoodieInputFormat
This commit is contained in:
69
hoodie-hadoop-mr/pom.xml
Normal file
69
hoodie-hadoop-mr/pom.xml
Normal file
@@ -0,0 +1,69 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>hoodie</artifactId>
|
||||
<groupId>com.uber.hoodie</groupId>
|
||||
<version>0.2.5-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>hoodie-hadoop-mr</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.uber.hoodie</groupId>
|
||||
<artifactId>hoodie-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.uber.hoodie</groupId>
|
||||
<artifactId>hoodie-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-auth</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-exec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-jdbc</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.parquet</groupId>
|
||||
<artifactId>parquet-avro</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hadoop;
|
||||
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
public class HoodieHiveUtil {
|
||||
public static final Logger LOG =
|
||||
LogManager.getLogger(HoodieHiveUtil.class);
|
||||
|
||||
public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
|
||||
public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
|
||||
public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits";
|
||||
public static final String INCREMENTAL_SCAN_MODE = "INCREMENTAL";
|
||||
public static final String LATEST_SCAN_MODE = "LATEST";
|
||||
public static final String DEFAULT_SCAN_MODE = LATEST_SCAN_MODE;
|
||||
public static final int DEFAULT_MAX_COMMITS = 1;
|
||||
public static final int MAX_COMMIT_ALL = -1;
|
||||
|
||||
public static Integer readMaxCommits(JobContext job, String tableName) {
|
||||
String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName);
|
||||
int maxCommits = job.getConfiguration().getInt(maxCommitName, DEFAULT_MAX_COMMITS);
|
||||
if (maxCommits == MAX_COMMIT_ALL) {
|
||||
maxCommits = Integer.MAX_VALUE;
|
||||
}
|
||||
LOG.info("Read max commits - " + maxCommits);
|
||||
return maxCommits;
|
||||
}
|
||||
|
||||
public static String readStartCommitTime(JobContext job, String tableName) {
|
||||
String startCommitTimestampName = String.format(HOODIE_START_COMMIT_PATTERN, tableName);
|
||||
LOG.info("Read start commit time - " + job.getConfiguration().get(startCommitTimestampName));
|
||||
return job.getConfiguration().get(startCommitTimestampName);
|
||||
}
|
||||
|
||||
public static String readMode(JobContext job, String tableName) {
|
||||
String modePropertyName = String.format(HOODIE_CONSUME_MODE_PATTERN, tableName);
|
||||
String mode =job.getConfiguration().get(modePropertyName, DEFAULT_SCAN_MODE);
|
||||
LOG.info(modePropertyName + ": " + mode);
|
||||
return mode;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,252 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hadoop;
|
||||
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieTableMetadata;
|
||||
import com.uber.hoodie.exception.InvalidDatasetException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.ql.exec.Utilities;
|
||||
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
|
||||
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
|
||||
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
|
||||
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
|
||||
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
|
||||
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.mapred.FileSplit;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import parquet.filter2.predicate.FilterPredicate;
|
||||
import parquet.filter2.predicate.Operators;
|
||||
import parquet.hadoop.ParquetFileReader;
|
||||
import parquet.hadoop.metadata.FileMetaData;
|
||||
import parquet.hadoop.metadata.ParquetMetadata;
|
||||
import parquet.io.api.Binary;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static parquet.filter2.predicate.FilterApi.and;
|
||||
import static parquet.filter2.predicate.FilterApi.binaryColumn;
|
||||
import static parquet.filter2.predicate.FilterApi.gt;
|
||||
|
||||
/**
|
||||
* HoodieInputFormat which understands the Hoodie File Structure and filters
|
||||
* files based on the Hoodie Mode. If paths that does not correspond to a hoodie dataset
|
||||
* then they are passed in as is (as what FileInputFormat.listStatus() would do).
|
||||
* The JobConf could have paths from multipe Hoodie/Non-Hoodie datasets
|
||||
*/
|
||||
public class HoodieInputFormat extends MapredParquetInputFormat
|
||||
implements Configurable {
|
||||
public static final Log LOG = LogFactory.getLog(HoodieInputFormat.class);
|
||||
private Configuration conf;
|
||||
|
||||
@Override
|
||||
public FileStatus[] listStatus(JobConf job) throws IOException {
|
||||
// Get all the file status from FileInputFormat and then do the filter
|
||||
FileStatus[] fileStatuses = super.listStatus(job);
|
||||
Map<HoodieTableMetadata, List<FileStatus>> groupedFileStatus = groupFileStatus(fileStatuses);
|
||||
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
|
||||
List<FileStatus> returns = new ArrayList<FileStatus>();
|
||||
for(Map.Entry<HoodieTableMetadata, List<FileStatus>> entry:groupedFileStatus.entrySet()) {
|
||||
HoodieTableMetadata metadata = entry.getKey();
|
||||
if(metadata == null) {
|
||||
// Add all the paths which are not hoodie specific
|
||||
returns.addAll(entry.getValue());
|
||||
continue;
|
||||
}
|
||||
|
||||
FileStatus[] value = entry.getValue().toArray(new FileStatus[entry.getValue().size()]);
|
||||
LOG.info("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
|
||||
String tableName = metadata.getTableName();
|
||||
String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
|
||||
if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
|
||||
// this is of the form commitTs_partition_sequenceNumber
|
||||
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
|
||||
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
|
||||
Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
|
||||
LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
|
||||
List<String>
|
||||
commitsToReturn = metadata.findCommitsAfter(lastIncrementalTs, maxCommits);
|
||||
FileStatus[] filteredFiles =
|
||||
metadata.getLatestVersionInRange(value, commitsToReturn);
|
||||
for (FileStatus filteredFile : filteredFiles) {
|
||||
LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
|
||||
returns.add(filteredFile);
|
||||
}
|
||||
LOG.info(
|
||||
"Total paths to process after hoodie incremental filter " + filteredFiles.length);
|
||||
} else {
|
||||
// filter files on the latest commit found
|
||||
FileStatus[] filteredFiles = metadata.getLatestVersions(value);
|
||||
LOG.info("Total paths to process after hoodie filter " + filteredFiles.length);
|
||||
for (FileStatus filteredFile : filteredFiles) {
|
||||
LOG.info("Processing latest hoodie file - " + filteredFile.getPath());
|
||||
returns.add(filteredFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
return returns.toArray(new FileStatus[returns.size()]);
|
||||
|
||||
}
|
||||
|
||||
private Map<HoodieTableMetadata, List<FileStatus>> groupFileStatus(FileStatus[] fileStatuses)
|
||||
throws IOException {
|
||||
// This assumes the paths for different tables are grouped together
|
||||
Map<HoodieTableMetadata, List<FileStatus>> grouped = new HashMap<>();
|
||||
HoodieTableMetadata metadata = null;
|
||||
String nonHoodieBasePath = null;
|
||||
for(FileStatus status:fileStatuses) {
|
||||
if ((metadata == null && nonHoodieBasePath == null) || (metadata == null && !status.getPath().toString()
|
||||
.contains(nonHoodieBasePath)) || (metadata != null && !status.getPath().toString()
|
||||
.contains(metadata.getBasePath()))) {
|
||||
try {
|
||||
metadata = getTableMetadata(status.getPath().getParent());
|
||||
nonHoodieBasePath = null;
|
||||
} catch (InvalidDatasetException e) {
|
||||
LOG.info("Handling a non-hoodie path " + status.getPath());
|
||||
metadata = null;
|
||||
nonHoodieBasePath =
|
||||
status.getPath().getParent().toString();
|
||||
}
|
||||
if(!grouped.containsKey(metadata)) {
|
||||
grouped.put(metadata, new ArrayList<FileStatus>());
|
||||
}
|
||||
}
|
||||
grouped.get(metadata).add(status);
|
||||
}
|
||||
return grouped;
|
||||
}
|
||||
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit split,
|
||||
final JobConf job, final Reporter reporter) throws IOException {
|
||||
// TODO enable automatic predicate pushdown after fixing issues
|
||||
// FileSplit fileSplit = (FileSplit) split;
|
||||
// HoodieTableMetadata metadata = getTableMetadata(fileSplit.getPath().getParent());
|
||||
// String tableName = metadata.getTableName();
|
||||
// String mode = HoodieHiveUtil.readMode(job, tableName);
|
||||
|
||||
// if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
|
||||
// FilterPredicate predicate = constructHoodiePredicate(job, tableName, split);
|
||||
// LOG.info("Setting parquet predicate push down as " + predicate);
|
||||
// ParquetInputFormat.setFilterPredicate(job, predicate);
|
||||
//clearOutExistingPredicate(job);
|
||||
// }
|
||||
return super.getRecordReader(split, job, reporter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears out the filter expression (if this is not done, then ParquetReader will override the FilterPredicate set)
|
||||
*
|
||||
* @param job
|
||||
*/
|
||||
private void clearOutExistingPredicate(JobConf job) {
|
||||
job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs the predicate to push down to parquet storage.
|
||||
* This creates the predicate for `hoodie_commit_time` > 'start_commit_time' and ANDs with the existing predicate if one is present already.
|
||||
*
|
||||
* @param job
|
||||
* @param tableName
|
||||
* @return
|
||||
*/
|
||||
private FilterPredicate constructHoodiePredicate(JobConf job, String tableName,
|
||||
InputSplit split) throws IOException {
|
||||
FilterPredicate commitTimePushdown = constructCommitTimePushdownPredicate(job, tableName);
|
||||
LOG.info("Commit time predicate - " + commitTimePushdown.toString());
|
||||
FilterPredicate existingPushdown = constructHQLPushdownPredicate(job, split);
|
||||
LOG.info("Existing predicate - " + existingPushdown);
|
||||
|
||||
FilterPredicate hoodiePredicate;
|
||||
if (existingPushdown != null) {
|
||||
hoodiePredicate = and(existingPushdown, commitTimePushdown);
|
||||
} else {
|
||||
hoodiePredicate = commitTimePushdown;
|
||||
}
|
||||
LOG.info("Hoodie Predicate - " + hoodiePredicate);
|
||||
return hoodiePredicate;
|
||||
}
|
||||
|
||||
private FilterPredicate constructHQLPushdownPredicate(JobConf job, InputSplit split)
|
||||
throws IOException {
|
||||
String serializedPushdown = job.get(TableScanDesc.FILTER_EXPR_CONF_STR);
|
||||
String columnNamesString = job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
|
||||
if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty()
|
||||
|| columnNamesString.isEmpty()) {
|
||||
return null;
|
||||
} else {
|
||||
SearchArgument sarg =
|
||||
SearchArgumentFactory.create(Utilities.deserializeExpression(serializedPushdown));
|
||||
final Path finalPath = ((FileSplit) split).getPath();
|
||||
final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(job, finalPath);
|
||||
final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
|
||||
return ParquetFilterPredicateConverter
|
||||
.toFilterPredicate(sarg, fileMetaData.getSchema());
|
||||
}
|
||||
}
|
||||
|
||||
private FilterPredicate constructCommitTimePushdownPredicate(JobConf job, String tableName)
|
||||
throws IOException {
|
||||
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
|
||||
Operators.BinaryColumn sequenceColumn =
|
||||
binaryColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
|
||||
FilterPredicate p = gt(sequenceColumn, Binary.fromString(lastIncrementalTs));
|
||||
LOG.info("Setting predicate in InputFormat " + p.toString());
|
||||
return p;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the table metadata from a data path. This assumes certain hierarchy of files which
|
||||
* should be changed once a better way is figured out to pass in the hoodie meta directory
|
||||
*
|
||||
* @param dataPath
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private HoodieTableMetadata getTableMetadata(Path dataPath) throws IOException {
|
||||
FileSystem fs = dataPath.getFileSystem(conf);
|
||||
// TODO - remove this hard-coding. Pass this in job conf, somehow. Or read the Table Location
|
||||
Path baseDir = dataPath.getParent().getParent().getParent();
|
||||
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
|
||||
return new HoodieTableMetadata(fs, baseDir.toString());
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,237 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hadoop;
|
||||
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.mapred.*;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class HoodieInputFormatTest {
|
||||
private HoodieInputFormat inputFormat;
|
||||
private JobConf jobConf;
|
||||
|
||||
@Before public void setUp() {
|
||||
inputFormat = new HoodieInputFormat();
|
||||
jobConf = new JobConf();
|
||||
inputFormat.setConf(jobConf);
|
||||
}
|
||||
|
||||
@Rule public TemporaryFolder basePath = new TemporaryFolder();
|
||||
|
||||
@Test public void testInputFormatLoad() throws IOException {
|
||||
// initial commit
|
||||
File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100");
|
||||
InputFormatTestUtil.commit(basePath, "100");
|
||||
|
||||
// Add the paths
|
||||
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||
|
||||
InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10);
|
||||
assertEquals(10, inputSplits.length);
|
||||
|
||||
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||
assertEquals(10, files.length);
|
||||
}
|
||||
|
||||
@Test public void testInputFormatUpdates() throws IOException {
|
||||
// initial commit
|
||||
File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100");
|
||||
InputFormatTestUtil.commit(basePath, "100");
|
||||
|
||||
// Add the paths
|
||||
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||
|
||||
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||
assertEquals(10, files.length);
|
||||
|
||||
// update files
|
||||
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", true);
|
||||
// Before the commit
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
assertEquals(10, files.length);
|
||||
ensureFilesInCommit(
|
||||
"Commit 200 has not been committed. We should not see files from this commit", files,
|
||||
"200", 0);
|
||||
InputFormatTestUtil.commit(basePath, "200");
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
assertEquals(10, files.length);
|
||||
ensureFilesInCommit(
|
||||
"5 files have been updated to commit 200. We should see 5 files from commit 200 and 5 files from 100 commit",
|
||||
files, "200", 5);
|
||||
ensureFilesInCommit(
|
||||
"5 files have been updated to commit 200. We should see 5 files from commit 100 and 5 files from 200 commit",
|
||||
files, "100", 5);
|
||||
}
|
||||
|
||||
@Test public void testIncrementalSimple() throws IOException {
|
||||
// initial commit
|
||||
File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100");
|
||||
InputFormatTestUtil.commit(basePath, "100");
|
||||
|
||||
// Add the paths
|
||||
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
|
||||
|
||||
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||
assertEquals(
|
||||
"We should exclude commit 100 when returning incremental pull with start commit time as 100",
|
||||
0, files.length);
|
||||
}
|
||||
|
||||
@Test public void testIncrementalWithMultipleCommits() throws IOException {
|
||||
// initial commit
|
||||
File partitionDir = InputFormatTestUtil.prepareDataset(basePath, 10, "100");
|
||||
InputFormatTestUtil.commit(basePath, "100");
|
||||
// Add the paths
|
||||
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||
// update files
|
||||
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", false);
|
||||
InputFormatTestUtil.commit(basePath, "200");
|
||||
|
||||
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 4, "300", false);
|
||||
InputFormatTestUtil.commit(basePath, "300");
|
||||
|
||||
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 3, "400", false);
|
||||
InputFormatTestUtil.commit(basePath, "400");
|
||||
|
||||
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 2, "500", false);
|
||||
InputFormatTestUtil.commit(basePath, "500");
|
||||
|
||||
InputFormatTestUtil.simulateUpdates(partitionDir, "100", 1, "600", false);
|
||||
InputFormatTestUtil.commit(basePath, "600");
|
||||
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
|
||||
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||
assertEquals("Pulling 1 commit from 100, should get us the 5 files committed at 200", 5,
|
||||
files.length);
|
||||
ensureFilesInCommit("Pulling 1 commit from 100, should get us the 5 files committed at 200",
|
||||
files, "200", 5);
|
||||
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", 3);
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
|
||||
assertEquals(
|
||||
"Pulling 3 commits from 100, should get us the 3 files from 400 commit, 1 file from 300 commit and 1 file from 200 commit",
|
||||
5, files.length);
|
||||
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 3 files from 400 commit",
|
||||
files, "400", 3);
|
||||
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit",
|
||||
files, "300", 1);
|
||||
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit",
|
||||
files, "200", 1);
|
||||
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtil.MAX_COMMIT_ALL);
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
|
||||
assertEquals(
|
||||
"Pulling all commits from 100, should get us the 1 file from each of 200,300,400,500,400 commits",
|
||||
5, files.length);
|
||||
ensureFilesInCommit(
|
||||
"Pulling all commits from 100, should get us the 1 files from 600 commit", files, "600",
|
||||
1);
|
||||
ensureFilesInCommit(
|
||||
"Pulling all commits from 100, should get us the 1 files from 500 commit", files, "500",
|
||||
1);
|
||||
ensureFilesInCommit(
|
||||
"Pulling all commits from 100, should get us the 1 files from 400 commit", files, "400",
|
||||
1);
|
||||
ensureFilesInCommit(
|
||||
"Pulling all commits from 100, should get us the 1 files from 300 commit", files, "300",
|
||||
1);
|
||||
ensureFilesInCommit(
|
||||
"Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200",
|
||||
1);
|
||||
}
|
||||
|
||||
//TODO enable this after enabling predicate pushdown
|
||||
public void testPredicatePushDown() throws IOException {
|
||||
// initial commit
|
||||
Schema schema = InputFormatTestUtil.readSchema("/sample1.avro");
|
||||
String commit1 = "20160628071126";
|
||||
File partitionDir =
|
||||
InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 10, commit1);
|
||||
InputFormatTestUtil.commit(basePath, commit1);
|
||||
// Add the paths
|
||||
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||
// check whether we have 10 records at this point
|
||||
ensureRecordsInCommit("We need to have 10 records at this point for commit " + commit1, commit1, 10, 10);
|
||||
|
||||
// update 2 records in the original parquet file and save it as commit 200
|
||||
String commit2 = "20160629193623";
|
||||
InputFormatTestUtil.simulateParquetUpdates(partitionDir, schema, commit1, 10, 2, commit2);
|
||||
InputFormatTestUtil.commit(basePath, commit2);
|
||||
|
||||
InputFormatTestUtil.setupIncremental(jobConf, commit1, 1);
|
||||
// check whether we have 2 records at this point
|
||||
ensureRecordsInCommit(
|
||||
"We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2, 2, 2);
|
||||
// Make sure we have the 10 records if we roll back the stattime
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "0", 2);
|
||||
ensureRecordsInCommit(
|
||||
"We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1, 8, 10);
|
||||
ensureRecordsInCommit(
|
||||
"We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2, 2, 10);
|
||||
}
|
||||
|
||||
private void ensureRecordsInCommit(String msg, String commit,
|
||||
int expectedNumberOfRecordsInCommit, int totalExpected) throws IOException {
|
||||
int actualCount = 0;
|
||||
int totalCount = 0;
|
||||
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
|
||||
for(InputSplit split:splits) {
|
||||
RecordReader<Void, ArrayWritable>
|
||||
recordReader = inputFormat.getRecordReader(split, jobConf, null);
|
||||
Void key = recordReader.createKey();
|
||||
ArrayWritable writable = recordReader.createValue();
|
||||
|
||||
while(recordReader.next(key, writable)) {
|
||||
// writable returns an array with [field1, field2, _hoodie_commit_time, _hoodie_commit_seqno]
|
||||
// Take the commit time and compare with the one we are interested in
|
||||
if(commit.equals((writable.get()[2]).toString())) {
|
||||
actualCount++;
|
||||
}
|
||||
totalCount++;
|
||||
}
|
||||
}
|
||||
assertEquals(msg, expectedNumberOfRecordsInCommit, actualCount);
|
||||
assertEquals(msg, totalExpected, totalCount);
|
||||
}
|
||||
|
||||
public static void ensureFilesInCommit(String msg, FileStatus[] files, String commit,
|
||||
int expected) {
|
||||
int count = 0;
|
||||
for (FileStatus file : files) {
|
||||
String commitTs = FSUtils.getCommitTime(file.getPath().getName());
|
||||
if (commit.equals(commitTs)) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
assertEquals(msg, expected, count);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hadoop;
|
||||
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.GenericRecordBuilder;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.parquet.avro.AvroParquetReader;
|
||||
import org.apache.parquet.avro.AvroParquetWriter;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class InputFormatTestUtil {
|
||||
public static File prepareDataset(TemporaryFolder basePath, int numberOfFiles,
|
||||
String commitNumber) throws IOException {
|
||||
basePath.create();
|
||||
HoodieTestUtils.initializeHoodieDirectory(basePath.getRoot().toString());
|
||||
File partitionPath = basePath.newFolder("2016", "05", "01");
|
||||
for (int i = 0; i < numberOfFiles; i++) {
|
||||
File dataFile =
|
||||
new File(partitionPath, FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i));
|
||||
dataFile.createNewFile();
|
||||
}
|
||||
return partitionPath;
|
||||
}
|
||||
|
||||
public static void simulateUpdates(File directory, final String originalCommit, int numberOfFilesUpdated,
|
||||
String newCommit, boolean randomize) throws IOException {
|
||||
List<File> dataFiles = Arrays.asList(directory.listFiles(new FilenameFilter() {
|
||||
@Override public boolean accept(File dir, String name) {
|
||||
String commitTs = FSUtils.getCommitTime(name);
|
||||
return originalCommit.equals(commitTs);
|
||||
}
|
||||
}));
|
||||
if(randomize) {
|
||||
Collections.shuffle(dataFiles);
|
||||
}
|
||||
List<File> toUpdateList =
|
||||
dataFiles.subList(0, Math.min(numberOfFilesUpdated, dataFiles.size()));
|
||||
for (File file : toUpdateList) {
|
||||
String fileId = FSUtils.getFileId(file.getName());
|
||||
File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, 1, fileId));
|
||||
dataFile.createNewFile();
|
||||
}
|
||||
}
|
||||
|
||||
public static void commit(TemporaryFolder basePath, String commitNumber) throws IOException {
|
||||
// create the commit
|
||||
new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit").createNewFile();
|
||||
}
|
||||
|
||||
public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
|
||||
String modePropertyName = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN,
|
||||
HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
|
||||
|
||||
String startCommitTimestampName = String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(startCommitTimestampName, startCommit);
|
||||
|
||||
String maxCommitPulls = String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
|
||||
}
|
||||
|
||||
public static Schema readSchema(String location) throws IOException {
|
||||
return new Schema.Parser().parse(InputFormatTestUtil.class.getResourceAsStream(location));
|
||||
}
|
||||
|
||||
public static File prepareParquetDataset(TemporaryFolder basePath, Schema schema, int numberOfFiles, int numberOfRecords,
|
||||
String commitNumber) throws IOException {
|
||||
basePath.create();
|
||||
HoodieTestUtils.initializeHoodieDirectory(basePath.getRoot().toString());
|
||||
File partitionPath = basePath.newFolder("2016", "05", "01");
|
||||
AvroParquetWriter parquetWriter;
|
||||
for (int i = 0; i < numberOfFiles; i++) {
|
||||
File dataFile =
|
||||
new File(partitionPath, FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i));
|
||||
// dataFile.createNewFile();
|
||||
parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()),
|
||||
schema);
|
||||
try {
|
||||
for (GenericRecord record : generateAvroRecords(schema, numberOfRecords, commitNumber)) {
|
||||
parquetWriter.write(record);
|
||||
}
|
||||
} finally {
|
||||
parquetWriter.close();
|
||||
}
|
||||
}
|
||||
return partitionPath;
|
||||
|
||||
}
|
||||
|
||||
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime) {
|
||||
List<GenericRecord> records = new ArrayList<>(numberOfRecords);
|
||||
for(int i=0;i<numberOfRecords;i++) {
|
||||
records.add(generateAvroRecord(schema, i, commitTime));
|
||||
}
|
||||
return records;
|
||||
}
|
||||
|
||||
private static GenericRecord generateAvroRecord(Schema schema, int recordNumber,
|
||||
String commitTime) {
|
||||
return new GenericRecordBuilder(schema).set("field1", "field" + recordNumber)
|
||||
.set("field2", "field" + recordNumber)
|
||||
.set(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime)
|
||||
.set(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, commitTime + "_" + recordNumber).build();
|
||||
}
|
||||
|
||||
public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit,
|
||||
int totalNumberOfRecords, int numberOfRecordsToUpdate,
|
||||
String newCommit) throws IOException {
|
||||
File fileToUpdate = directory.listFiles(new FilenameFilter() {
|
||||
@Override public boolean accept(File dir, String name) {
|
||||
return name.endsWith("parquet");
|
||||
}
|
||||
})[0];
|
||||
String fileId = FSUtils.getFileId(fileToUpdate.getName());
|
||||
File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, 1, fileId));
|
||||
AvroParquetWriter parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()),
|
||||
schema);
|
||||
try {
|
||||
for (GenericRecord record : generateAvroRecords(schema, totalNumberOfRecords,
|
||||
originalCommit)) {
|
||||
if (numberOfRecordsToUpdate > 0) {
|
||||
// update this record
|
||||
record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, newCommit);
|
||||
String oldSeqNo = (String) record.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD);
|
||||
record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
|
||||
oldSeqNo.replace(originalCommit, newCommit));
|
||||
numberOfRecordsToUpdate--;
|
||||
}
|
||||
parquetWriter.write(record);
|
||||
}
|
||||
} finally {
|
||||
parquetWriter.close();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
17
hoodie-hadoop-mr/src/test/resources/sample1.avro
Normal file
17
hoodie-hadoop-mr/src/test/resources/sample1.avro
Normal file
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"type" : "record",
|
||||
"name" : "testRecord",
|
||||
"fields" : [ {
|
||||
"name" : "field1",
|
||||
"type" : "string"
|
||||
}, {
|
||||
"name" : "field2",
|
||||
"type" : "string"
|
||||
}, {
|
||||
"name" : "_hoodie_commit_time",
|
||||
"type" : "string"
|
||||
}, {
|
||||
"name" : "_hoodie_commit_seqno",
|
||||
"type" : "string"
|
||||
}]
|
||||
}
|
||||
14
pom.xml
14
pom.xml
@@ -26,6 +26,7 @@
|
||||
<module>hoodie-common</module>
|
||||
<module>hoodie-client</module>
|
||||
<module>hoodie-cli</module>
|
||||
<module>hoodie-hadoop-mr</module>
|
||||
</modules>
|
||||
|
||||
<licenses>
|
||||
@@ -74,6 +75,7 @@
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>${junit.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
@@ -251,12 +253,6 @@
|
||||
<version>${hive.version}-cdh${cdh.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<!--<dependency>-->
|
||||
<!--<groupId>org.apache.hive</groupId>-->
|
||||
<!--<artifactId>hive-jdbc</artifactId>-->
|
||||
<!--<version>${hive.version}-cdh${cdh.version}</version>-->
|
||||
<!--<scope>provided</scope>-->
|
||||
<!--</dependency>-->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
@@ -381,6 +377,12 @@
|
||||
<version>1.9.13</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-jdbc</artifactId>
|
||||
<version>1.1.0-cdh5.7.2</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</dependencyManagement>
|
||||
|
||||
Reference in New Issue
Block a user