1
0

Adding hoodie-hadoop-mr module to add HoodieInputFormat

This commit is contained in:
Prasanna Rajaperumal
2016-12-16 19:29:53 -08:00
parent 8e80c8d2ea
commit 61200b1207
7 changed files with 806 additions and 6 deletions

View File

@@ -0,0 +1,58 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.hadoop;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
public class HoodieHiveUtil {
public static final Logger LOG =
LogManager.getLogger(HoodieHiveUtil.class);
public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits";
public static final String INCREMENTAL_SCAN_MODE = "INCREMENTAL";
public static final String LATEST_SCAN_MODE = "LATEST";
public static final String DEFAULT_SCAN_MODE = LATEST_SCAN_MODE;
public static final int DEFAULT_MAX_COMMITS = 1;
public static final int MAX_COMMIT_ALL = -1;
public static Integer readMaxCommits(JobContext job, String tableName) {
String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName);
int maxCommits = job.getConfiguration().getInt(maxCommitName, DEFAULT_MAX_COMMITS);
if (maxCommits == MAX_COMMIT_ALL) {
maxCommits = Integer.MAX_VALUE;
}
LOG.info("Read max commits - " + maxCommits);
return maxCommits;
}
public static String readStartCommitTime(JobContext job, String tableName) {
String startCommitTimestampName = String.format(HOODIE_START_COMMIT_PATTERN, tableName);
LOG.info("Read start commit time - " + job.getConfiguration().get(startCommitTimestampName));
return job.getConfiguration().get(startCommitTimestampName);
}
public static String readMode(JobContext job, String tableName) {
String modePropertyName = String.format(HOODIE_CONSUME_MODE_PATTERN, tableName);
String mode =job.getConfiguration().get(modePropertyName, DEFAULT_SCAN_MODE);
LOG.info(modePropertyName + ": " + mode);
return mode;
}
}

View File

@@ -0,0 +1,252 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.hadoop;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableMetadata;
import com.uber.hoodie.exception.InvalidDatasetException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import parquet.filter2.predicate.FilterPredicate;
import parquet.filter2.predicate.Operators;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.FileMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.api.Binary;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static parquet.filter2.predicate.FilterApi.and;
import static parquet.filter2.predicate.FilterApi.binaryColumn;
import static parquet.filter2.predicate.FilterApi.gt;
/**
* HoodieInputFormat which understands the Hoodie File Structure and filters
* files based on the Hoodie Mode. If paths that does not correspond to a hoodie dataset
* then they are passed in as is (as what FileInputFormat.listStatus() would do).
* The JobConf could have paths from multipe Hoodie/Non-Hoodie datasets
*/
public class HoodieInputFormat extends MapredParquetInputFormat
implements Configurable {
public static final Log LOG = LogFactory.getLog(HoodieInputFormat.class);
private Configuration conf;
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Get all the file status from FileInputFormat and then do the filter
FileStatus[] fileStatuses = super.listStatus(job);
Map<HoodieTableMetadata, List<FileStatus>> groupedFileStatus = groupFileStatus(fileStatuses);
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
List<FileStatus> returns = new ArrayList<FileStatus>();
for(Map.Entry<HoodieTableMetadata, List<FileStatus>> entry:groupedFileStatus.entrySet()) {
HoodieTableMetadata metadata = entry.getKey();
if(metadata == null) {
// Add all the paths which are not hoodie specific
returns.addAll(entry.getValue());
continue;
}
FileStatus[] value = entry.getValue().toArray(new FileStatus[entry.getValue().size()]);
LOG.info("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
String tableName = metadata.getTableName();
String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
// this is of the form commitTs_partition_sequenceNumber
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
List<String>
commitsToReturn = metadata.findCommitsAfter(lastIncrementalTs, maxCommits);
FileStatus[] filteredFiles =
metadata.getLatestVersionInRange(value, commitsToReturn);
for (FileStatus filteredFile : filteredFiles) {
LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
returns.add(filteredFile);
}
LOG.info(
"Total paths to process after hoodie incremental filter " + filteredFiles.length);
} else {
// filter files on the latest commit found
FileStatus[] filteredFiles = metadata.getLatestVersions(value);
LOG.info("Total paths to process after hoodie filter " + filteredFiles.length);
for (FileStatus filteredFile : filteredFiles) {
LOG.info("Processing latest hoodie file - " + filteredFile.getPath());
returns.add(filteredFile);
}
}
}
return returns.toArray(new FileStatus[returns.size()]);
}
private Map<HoodieTableMetadata, List<FileStatus>> groupFileStatus(FileStatus[] fileStatuses)
throws IOException {
// This assumes the paths for different tables are grouped together
Map<HoodieTableMetadata, List<FileStatus>> grouped = new HashMap<>();
HoodieTableMetadata metadata = null;
String nonHoodieBasePath = null;
for(FileStatus status:fileStatuses) {
if ((metadata == null && nonHoodieBasePath == null) || (metadata == null && !status.getPath().toString()
.contains(nonHoodieBasePath)) || (metadata != null && !status.getPath().toString()
.contains(metadata.getBasePath()))) {
try {
metadata = getTableMetadata(status.getPath().getParent());
nonHoodieBasePath = null;
} catch (InvalidDatasetException e) {
LOG.info("Handling a non-hoodie path " + status.getPath());
metadata = null;
nonHoodieBasePath =
status.getPath().getParent().toString();
}
if(!grouped.containsKey(metadata)) {
grouped.put(metadata, new ArrayList<FileStatus>());
}
}
grouped.get(metadata).add(status);
}
return grouped;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return conf;
}
@Override
public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit split,
final JobConf job, final Reporter reporter) throws IOException {
// TODO enable automatic predicate pushdown after fixing issues
// FileSplit fileSplit = (FileSplit) split;
// HoodieTableMetadata metadata = getTableMetadata(fileSplit.getPath().getParent());
// String tableName = metadata.getTableName();
// String mode = HoodieHiveUtil.readMode(job, tableName);
// if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
// FilterPredicate predicate = constructHoodiePredicate(job, tableName, split);
// LOG.info("Setting parquet predicate push down as " + predicate);
// ParquetInputFormat.setFilterPredicate(job, predicate);
//clearOutExistingPredicate(job);
// }
return super.getRecordReader(split, job, reporter);
}
/**
* Clears out the filter expression (if this is not done, then ParquetReader will override the FilterPredicate set)
*
* @param job
*/
private void clearOutExistingPredicate(JobConf job) {
job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
}
/**
* Constructs the predicate to push down to parquet storage.
* This creates the predicate for `hoodie_commit_time` > 'start_commit_time' and ANDs with the existing predicate if one is present already.
*
* @param job
* @param tableName
* @return
*/
private FilterPredicate constructHoodiePredicate(JobConf job, String tableName,
InputSplit split) throws IOException {
FilterPredicate commitTimePushdown = constructCommitTimePushdownPredicate(job, tableName);
LOG.info("Commit time predicate - " + commitTimePushdown.toString());
FilterPredicate existingPushdown = constructHQLPushdownPredicate(job, split);
LOG.info("Existing predicate - " + existingPushdown);
FilterPredicate hoodiePredicate;
if (existingPushdown != null) {
hoodiePredicate = and(existingPushdown, commitTimePushdown);
} else {
hoodiePredicate = commitTimePushdown;
}
LOG.info("Hoodie Predicate - " + hoodiePredicate);
return hoodiePredicate;
}
private FilterPredicate constructHQLPushdownPredicate(JobConf job, InputSplit split)
throws IOException {
String serializedPushdown = job.get(TableScanDesc.FILTER_EXPR_CONF_STR);
String columnNamesString = job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty()
|| columnNamesString.isEmpty()) {
return null;
} else {
SearchArgument sarg =
SearchArgumentFactory.create(Utilities.deserializeExpression(serializedPushdown));
final Path finalPath = ((FileSplit) split).getPath();
final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(job, finalPath);
final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
return ParquetFilterPredicateConverter
.toFilterPredicate(sarg, fileMetaData.getSchema());
}
}
private FilterPredicate constructCommitTimePushdownPredicate(JobConf job, String tableName)
throws IOException {
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
Operators.BinaryColumn sequenceColumn =
binaryColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
FilterPredicate p = gt(sequenceColumn, Binary.fromString(lastIncrementalTs));
LOG.info("Setting predicate in InputFormat " + p.toString());
return p;
}
/**
* Read the table metadata from a data path. This assumes certain hierarchy of files which
* should be changed once a better way is figured out to pass in the hoodie meta directory
*
* @param dataPath
* @return
* @throws IOException
*/
private HoodieTableMetadata getTableMetadata(Path dataPath) throws IOException {
FileSystem fs = dataPath.getFileSystem(conf);
// TODO - remove this hard-coding. Pass this in job conf, somehow. Or read the Table Location
Path baseDir = dataPath.getParent().getParent().getParent();
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
return new HoodieTableMetadata(fs, baseDir.toString());
}
}