1
0

[HUDI-371] Supporting hive combine input format for realtime tables (#1503)

This commit is contained in:
n3nash
2020-04-20 20:40:06 -07:00
committed by GitHub
parent 2a2f31d919
commit 332072bc6d
15 changed files with 1042 additions and 369 deletions

View File

@@ -18,7 +18,11 @@
package org.apache.hudi.hadoop.hive;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hadoop.conf.Configuration;
@@ -28,11 +32,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.CombineHiveRecordReader;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.hive.ql.io.IOPrepareCache;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -45,9 +51,11 @@ import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim;
import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.hive.shims.HadoopShimsSecure.InputSplitShim;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -67,6 +75,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -99,250 +108,6 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
private static final int DEFAULT_NUM_PATH_PER_THREAD = 100;
private class CheckNonCombinablePathCallable implements Callable<Set<Integer>> {
private final Path[] paths;
private final int start;
private final int length;
private final JobConf conf;
public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) {
this.paths = paths;
this.start = start;
this.length = length;
this.conf = conf;
}
@Override
public Set<Integer> call() throws Exception {
Set<Integer> nonCombinablePathIndices = new HashSet<>();
for (int i = 0; i < length; i++) {
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo,
paths[i + start], IOPrepareCache.get().allocatePartitionDescMap());
// Use HiveInputFormat if any of the paths is not splittable
Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
InputFormat<WritableComparable, Writable> inputFormat = getInputFormatFromCache(inputFormatClass, conf);
if (inputFormat instanceof AvoidSplitCombination
&& ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) {
if (LOG.isDebugEnabled()) {
LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits");
}
nonCombinablePathIndices.add(i + start);
}
}
return nonCombinablePathIndices;
}
}
/**
* CombineHiveInputSplit encapsulates an InputSplit with its corresponding inputFormatClassName. A
* CombineHiveInputSplit comprises of multiple chunks from different files. Since, they belong to a single directory,
* there is a single inputformat for all the chunks.
*/
public static class CombineHiveInputSplit extends InputSplitShim {
private String inputFormatClassName;
private CombineFileSplit inputSplitShim;
private Map<Path, PartitionDesc> pathToPartitionInfo;
public CombineHiveInputSplit() throws IOException {
this(ShimLoader.getHadoopShims().getCombineFileInputFormat().getInputSplitShim());
}
public CombineHiveInputSplit(CombineFileSplit inputSplitShim) throws IOException {
this(inputSplitShim.getJob(), inputSplitShim);
}
public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) throws IOException {
this(job, inputSplitShim, null);
}
public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim,
Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException {
this.inputSplitShim = inputSplitShim;
this.pathToPartitionInfo = pathToPartitionInfo;
if (job != null) {
if (this.pathToPartitionInfo == null) {
this.pathToPartitionInfo = Utilities.getMapWork(job).getPathToPartitionInfo();
}
// extract all the inputFormatClass names for each chunk in the
// CombinedSplit.
Path[] ipaths = inputSplitShim.getPaths();
if (ipaths.length > 0) {
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(this.pathToPartitionInfo,
ipaths[0], IOPrepareCache.get().getPartitionDescMap());
inputFormatClassName = part.getInputFileFormatClass().getName();
}
}
}
public CombineFileSplit getInputSplitShim() {
return inputSplitShim;
}
/**
* Returns the inputFormat class name for the i-th chunk.
*/
public String inputFormatClassName() {
return inputFormatClassName;
}
public void setInputFormatClassName(String inputFormatClassName) {
this.inputFormatClassName = inputFormatClassName;
}
@Override
public JobConf getJob() {
return inputSplitShim.getJob();
}
@Override
public long getLength() {
return inputSplitShim.getLength();
}
/**
* Returns an array containing the startoffsets of the files in the split.
*/
@Override
public long[] getStartOffsets() {
return inputSplitShim.getStartOffsets();
}
/**
* Returns an array containing the lengths of the files in the split.
*/
@Override
public long[] getLengths() {
return inputSplitShim.getLengths();
}
/**
* Returns the start offset of the i<sup>th</sup> Path.
*/
@Override
public long getOffset(int i) {
return inputSplitShim.getOffset(i);
}
/**
* Returns the length of the i<sup>th</sup> Path.
*/
@Override
public long getLength(int i) {
return inputSplitShim.getLength(i);
}
/**
* Returns the number of Paths in the split.
*/
@Override
public int getNumPaths() {
return inputSplitShim.getNumPaths();
}
/**
* Returns the i<sup>th</sup> Path.
*/
@Override
public Path getPath(int i) {
return inputSplitShim.getPath(i);
}
/**
* Returns all the Paths in the split.
*/
@Override
public Path[] getPaths() {
return inputSplitShim.getPaths();
}
/**
* Returns all the Paths where this input-split resides.
*/
@Override
public String[] getLocations() throws IOException {
return inputSplitShim.getLocations();
}
/**
* Prints this obejct as a string.
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(inputSplitShim.toString());
sb.append("InputFormatClass: " + inputFormatClassName);
sb.append("\n");
return sb.toString();
}
/**
* Writable interface.
*/
@Override
public void readFields(DataInput in) throws IOException {
inputSplitShim.readFields(in);
inputFormatClassName = in.readUTF();
}
/**
* Writable interface.
*/
@Override
public void write(DataOutput out) throws IOException {
inputSplitShim.write(out);
if (inputFormatClassName == null) {
if (pathToPartitionInfo == null) {
pathToPartitionInfo = Utilities.getMapWork(getJob()).getPathToPartitionInfo();
}
// extract all the inputFormatClass names for each chunk in the
// CombinedSplit.
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo,
inputSplitShim.getPath(0), IOPrepareCache.get().getPartitionDescMap());
// create a new InputFormat instance if this is the first time to see
// this class
inputFormatClassName = part.getInputFileFormatClass().getName();
}
out.writeUTF(inputFormatClassName);
}
}
// Splits are not shared across different partitions with different input formats.
// For example, 2 partitions (1 sequencefile and 1 rcfile) will have 2 different splits
private static class CombinePathInputFormat {
private final List<Operator<? extends OperatorDesc>> opList;
private final String inputFormatClassName;
private final String deserializerClassName;
public CombinePathInputFormat(List<Operator<? extends OperatorDesc>> opList, String inputFormatClassName,
String deserializerClassName) {
this.opList = opList;
this.inputFormatClassName = inputFormatClassName;
this.deserializerClassName = deserializerClassName;
}
@Override
public boolean equals(Object o) {
if (o instanceof CombinePathInputFormat) {
CombinePathInputFormat mObj = (CombinePathInputFormat) o;
return (opList.equals(mObj.opList)) && (inputFormatClassName.equals(mObj.inputFormatClassName))
&& (Objects.equals(deserializerClassName, mObj.deserializerClassName));
}
return false;
}
@Override
public int hashCode() {
return (opList == null) ? 0 : opList.hashCode();
}
}
/**
* Create Hive splits based on CombineFileSplit.
*/
@@ -391,6 +156,16 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
LOG.info("Setting hoodie filter and realtime input format");
combine.setHoodieFilter(true);
combine.setRealTime(true);
if (job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").isEmpty()) {
List<String> partitions = new ArrayList<>(part.getPartSpec().keySet());
if (!partitions.isEmpty()) {
String partitionStr = String.join(",", partitions);
LOG.info("Setting Partitions in jobConf - Partition Keys for Path : " + path + " is :" + partitionStr);
job.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, partitionStr);
} else {
job.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
}
}
}
String deserializerClassName = null;
try {
@@ -472,7 +247,16 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
}
for (CombineFileSplit is : iss) {
CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is, pathToPartitionInfo);
final InputSplit csplit;
if (combine.isRealTime) {
if (is instanceof HoodieCombineRealtimeHiveSplit) {
csplit = is;
} else {
csplit = new HoodieCombineRealtimeHiveSplit(job, is, pathToPartitionInfo);
}
} else {
csplit = new CombineHiveInputSplit(job, is, pathToPartitionInfo);
}
result.add(csplit);
}
@@ -727,8 +511,244 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath(0));
return ShimLoader.getHadoopShims().getCombineFileInputFormat().getRecordReader(job, (CombineFileSplit) split,
reporter, CombineHiveRecordReader.class);
if (inputFormatClass.getName().equals(HoodieParquetInputFormat.class.getName())) {
return ShimLoader.getHadoopShims().getCombineFileInputFormat().getRecordReader(job, (CombineFileSplit) split,
reporter, CombineHiveRecordReader.class);
} else if (inputFormatClass.getName().equals(HoodieParquetRealtimeInputFormat.class.getName())) {
HoodieCombineFileInputFormatShim shims = new HoodieCombineFileInputFormatShim();
IOContextMap.get(job).setInputPath(((CombineHiveInputSplit) split).getPath(0));
return shims.getRecordReader(job, ((CombineHiveInputSplit) split).getInputSplitShim(),
reporter, CombineHiveRecordReader.class);
} else {
throw new HoodieException("Unexpected input format : " + inputFormatClassName);
}
}
/**
* This is a marker interface that is used to identify the formats where combine split generation is not applicable.
*/
public interface AvoidSplitCombination {
boolean shouldSkipCombine(Path path, Configuration conf) throws IOException;
}
/**
* CombineHiveInputSplit encapsulates an InputSplit with its corresponding inputFormatClassName. A
* CombineHiveInputSplit comprises of multiple chunks from different files. Since, they belong to a single directory,
* there is a single inputformat for all the chunks.
*/
public static class CombineHiveInputSplit extends InputSplitShim {
private String inputFormatClassName;
protected CombineFileSplit inputSplitShim;
private Map<Path, PartitionDesc> pathToPartitionInfo;
public CombineHiveInputSplit() throws IOException {
this(ShimLoader.getHadoopShims().getCombineFileInputFormat().getInputSplitShim());
}
public CombineHiveInputSplit(CombineFileSplit inputSplitShim) throws IOException {
this(inputSplitShim.getJob(), inputSplitShim);
}
public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) throws IOException {
this(job, inputSplitShim, null);
}
public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim,
Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException {
this.inputSplitShim = inputSplitShim;
this.pathToPartitionInfo = pathToPartitionInfo;
if (job != null) {
if (this.pathToPartitionInfo == null) {
this.pathToPartitionInfo = Utilities.getMapWork(job).getPathToPartitionInfo();
}
// extract all the inputFormatClass names for each chunk in the
// CombinedSplit.
Path[] ipaths = inputSplitShim.getPaths();
if (ipaths.length > 0) {
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(this.pathToPartitionInfo,
ipaths[0], IOPrepareCache.get().getPartitionDescMap());
inputFormatClassName = part.getInputFileFormatClass().getName();
}
}
}
public CombineFileSplit getInputSplitShim() {
return inputSplitShim;
}
/**
* Returns the inputFormat class name for the i-th chunk.
*/
public String inputFormatClassName() {
return inputFormatClassName;
}
public void setInputFormatClassName(String inputFormatClassName) {
this.inputFormatClassName = inputFormatClassName;
}
@Override
public JobConf getJob() {
return inputSplitShim.getJob();
}
@Override
public long getLength() {
return inputSplitShim.getLength();
}
/**
* Returns an array containing the startoffsets of the files in the split.
*/
@Override
public long[] getStartOffsets() {
return inputSplitShim.getStartOffsets();
}
/**
* Returns an array containing the lengths of the files in the split.
*/
@Override
public long[] getLengths() {
return inputSplitShim.getLengths();
}
/**
* Returns the start offset of the i<sup>th</sup> Path.
*/
@Override
public long getOffset(int i) {
return inputSplitShim.getOffset(i);
}
/**
* Returns the length of the i<sup>th</sup> Path.
*/
@Override
public long getLength(int i) {
return inputSplitShim.getLength(i);
}
/**
* Returns the number of Paths in the split.
*/
@Override
public int getNumPaths() {
return inputSplitShim.getNumPaths();
}
/**
* Returns the i<sup>th</sup> Path.
*/
@Override
public Path getPath(int i) {
return inputSplitShim.getPath(i);
}
/**
* Returns all the Paths in the split.
*/
@Override
public Path[] getPaths() {
return inputSplitShim.getPaths();
}
/**
* Returns all the Paths where this input-split resides.
*/
@Override
public String[] getLocations() throws IOException {
return inputSplitShim.getLocations();
}
/**
* Prints this obejct as a string.
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(inputSplitShim.toString());
sb.append("InputFormatClass: " + inputFormatClassName);
sb.append("\n");
return sb.toString();
}
/**
* Writable interface.
*/
@Override
public void readFields(DataInput in) throws IOException {
inputFormatClassName = Text.readString(in);
if (HoodieParquetRealtimeInputFormat.class.getName().equals(inputFormatClassName)) {
String inputShimClassName = Text.readString(in);
inputSplitShim = ReflectionUtils.loadClass(inputShimClassName);
inputSplitShim.readFields(in);
} else {
inputSplitShim.readFields(in);
}
}
/**
* Writable interface.
*/
@Override
public void write(DataOutput out) throws IOException {
if (inputFormatClassName == null) {
if (pathToPartitionInfo == null) {
pathToPartitionInfo = Utilities.getMapWork(getJob()).getPathToPartitionInfo();
}
// extract all the inputFormatClass names for each chunk in the
// CombinedSplit.
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo,
inputSplitShim.getPath(0), IOPrepareCache.get().getPartitionDescMap());
// create a new InputFormat instance if this is the first time to see
// this class
inputFormatClassName = part.getInputFileFormatClass().getName();
}
Text.writeString(out, inputFormatClassName);
if (HoodieParquetRealtimeInputFormat.class.getName().equals(inputFormatClassName)) {
// Write Shim Class Name
Text.writeString(out, inputSplitShim.getClass().getName());
}
inputSplitShim.write(out);
}
}
// Splits are not shared across different partitions with different input formats.
// For example, 2 partitions (1 sequencefile and 1 rcfile) will have 2 different splits
private static class CombinePathInputFormat {
private final List<Operator<? extends OperatorDesc>> opList;
private final String inputFormatClassName;
private final String deserializerClassName;
public CombinePathInputFormat(List<Operator<? extends OperatorDesc>> opList, String inputFormatClassName,
String deserializerClassName) {
this.opList = opList;
this.inputFormatClassName = inputFormatClassName;
this.deserializerClassName = deserializerClassName;
}
@Override
public boolean equals(Object o) {
if (o instanceof CombinePathInputFormat) {
CombinePathInputFormat mObj = (CombinePathInputFormat) o;
return (opList.equals(mObj.opList)) && (inputFormatClassName.equals(mObj.inputFormatClassName))
&& (deserializerClassName == null ? (mObj.deserializerClassName == null)
: deserializerClassName.equals(mObj.deserializerClassName));
}
return false;
}
@Override
public int hashCode() {
return (opList == null) ? 0 : opList.hashCode();
}
}
static class CombineFilter implements PathFilter {
@@ -775,14 +795,6 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
}
}
/**
* This is a marker interface that is used to identify the formats where combine split generation is not applicable.
*/
public interface AvoidSplitCombination {
boolean shouldSkipCombine(Path path, Configuration conf) throws IOException;
}
/**
* **MOD** This is the implementation of CombineFileInputFormat which is a copy of
* org.apache.hadoop.hive.shims.HadoopShimsSecure.CombineFileInputFormatShim with changes in listStatus.
@@ -793,7 +805,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
private boolean hoodieFilter = false;
private boolean isRealTime = false;
public HoodieCombineFileInputFormatShim() {}
public HoodieCombineFileInputFormatShim() {
}
@Override
public Path[] getInputPathsShim(JobConf conf) {
@@ -840,6 +853,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
@Override
public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOException {
long minSize = job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 0L);
long maxSize = job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, minSize);
if (job.getLong("mapreduce.input.fileinputformat.split.minsize.per.node", 0L) == 0L) {
super.setMinSplitSizeNode(minSize);
}
@@ -851,19 +865,48 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
if (job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, 0L) == 0L) {
super.setMaxSplitSize(minSize);
}
LOG.info("mapreduce.input.fileinputformat.split.minsize=" + minSize
+ ", mapreduce.input.fileinputformat.split.maxsize=" + maxSize);
InputSplit[] splits = super.getSplits(job, numSplits);
List<InputSplitShim> inputSplitShims = new ArrayList<>();
for (InputSplit inputSplit : splits) {
CombineFileSplit split = (CombineFileSplit) inputSplit;
if (split.getPaths().length > 0) {
inputSplitShims.add(new InputSplitShim(job, split.getPaths(), split.getStartOffsets(),
split.getLengths(), split.getLocations()));
if (isRealTime) {
job.set("hudi.hive.realtime", "true");
InputSplit[] splits;
if (hoodieFilter) {
HoodieParquetInputFormat input = new HoodieParquetRealtimeInputFormat();
input.setConf(job);
splits = input.getSplits(job, numSplits);
} else {
splits = super.getSplits(job, numSplits);
}
}
ArrayList<CombineFileSplit> combineFileSplits = new ArrayList<>();
HoodieCombineRealtimeFileSplit.Builder builder = new HoodieCombineRealtimeFileSplit.Builder();
int counter = 0;
for (int pos = 0; pos < splits.length; pos++) {
if (counter == maxSize - 1 || pos == splits.length - 1) {
builder.addSplit((FileSplit)splits[pos]);
combineFileSplits.add(builder.build(job));
builder = new HoodieCombineRealtimeFileSplit.Builder();
counter = 0;
} else if (counter < maxSize) {
counter++;
builder.addSplit((FileSplit)splits[pos]);
}
}
return combineFileSplits.toArray(new CombineFileSplit[combineFileSplits.size()]);
} else {
InputSplit[] splits = super.getSplits(job, numSplits);
ArrayList inputSplitShims = new ArrayList();
return inputSplitShims.toArray(new HadoopShimsSecure.InputSplitShim[inputSplitShims.size()]);
for (int pos = 0; pos < splits.length; ++pos) {
CombineFileSplit split = (CombineFileSplit) splits[pos];
if (split.getPaths().length > 0) {
inputSplitShims.add(new HadoopShimsSecure.InputSplitShim(job, split.getPaths(), split.getStartOffsets(),
split.getLengths(), split.getLocations()));
}
}
return (CombineFileSplit[]) inputSplitShims
.toArray(new HadoopShimsSecure.InputSplitShim[inputSplitShims.size()]);
}
}
@Override
@@ -874,6 +917,16 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
@Override
public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter,
Class<RecordReader<K, V>> rrClass) throws IOException {
isRealTime = Boolean.valueOf(job.get("hudi.hive.realtime", "false"));
if (isRealTime) {
List<RecordReader> recordReaders = new LinkedList<>();
ValidationUtils.checkArgument(split instanceof HoodieCombineRealtimeFileSplit, "Only "
+ HoodieCombineRealtimeFileSplit.class.getName() + " allowed, found " + split.getClass().getName());
for (InputSplit inputSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) {
recordReaders.add(new HoodieParquetRealtimeInputFormat().getRecordReader(inputSplit, job, reporter));
}
return new HoodieCombineRealtimeRecordReader(job, split, recordReaders);
}
return new HadoopShimsSecure.CombineFileRecordReader(job, split, reporter, rrClass);
}
@@ -885,4 +938,39 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
isRealTime = realTime;
}
}
private class CheckNonCombinablePathCallable implements Callable<Set<Integer>> {
private final Path[] paths;
private final int start;
private final int length;
private final JobConf conf;
public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) {
this.paths = paths;
this.start = start;
this.length = length;
this.conf = conf;
}
@Override
public Set<Integer> call() throws Exception {
Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
for (int i = 0; i < length; i++) {
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo,
paths[i + start], IOPrepareCache.get().allocatePartitionDescMap());
// Use HiveInputFormat if any of the paths is not splittable
Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
InputFormat<WritableComparable, Writable> inputFormat = getInputFormatFromCache(inputFormatClass, conf);
if (inputFormat instanceof AvoidSplitCombination
&& ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) {
if (LOG.isDebugEnabled()) {
LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits");
}
nonCombinablePathIndices.add(i + start);
}
}
return nonCombinablePathIndices;
}
}
}

View File

@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop.hive;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ArrayUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* Represents a CombineFileSplit for realtime tables.
*/
public class HoodieCombineRealtimeFileSplit extends CombineFileSplit {
// These are instances of HoodieRealtimeSplits
List<FileSplit> realtimeFileSplits = new ArrayList<>();
public HoodieCombineRealtimeFileSplit() {
}
public HoodieCombineRealtimeFileSplit(JobConf jobConf, List<FileSplit> realtimeFileSplits) {
super(jobConf, realtimeFileSplits.stream().map(p ->
((HoodieRealtimeFileSplit) p).getPath()).collect(Collectors.toList()).toArray(new
Path[realtimeFileSplits.size()]),
ArrayUtils.toPrimitive(realtimeFileSplits.stream().map(p -> ((HoodieRealtimeFileSplit) p).getStart())
.collect(Collectors.toList()).toArray(new Long[realtimeFileSplits.size()])),
ArrayUtils.toPrimitive(realtimeFileSplits.stream().map(p -> ((HoodieRealtimeFileSplit) p).getLength())
.collect(Collectors.toList()).toArray(new Long[realtimeFileSplits.size()])),
realtimeFileSplits.stream().map(p -> {
try {
return Arrays.asList(p.getLocations());
} catch (Exception e) {
throw new RuntimeException(e);
}
}).flatMap(List::stream).collect(Collectors.toList()).toArray(new
String[realtimeFileSplits.size()]));
this.realtimeFileSplits = realtimeFileSplits;
}
public List<FileSplit> getRealtimeFileSplits() {
return realtimeFileSplits;
}
@Override
public String toString() {
return "HoodieCombineRealtimeFileSplit{"
+ "realtimeFileSplits=" + realtimeFileSplits
+ '}';
}
/**
* Writable interface.
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(realtimeFileSplits.size());
for (InputSplit inputSplit: realtimeFileSplits) {
Text.writeString(out, inputSplit.getClass().getName());
inputSplit.write(out);
}
}
public void readFields(DataInput in) throws IOException {
int listLength = in.readInt();
realtimeFileSplits = new ArrayList<>(listLength);
for (int i = 0; i < listLength; i++) {
String inputClassName = Text.readString(in);
HoodieRealtimeFileSplit inputSplit = ReflectionUtils.loadClass(inputClassName);
inputSplit.readFields(in);
realtimeFileSplits.add(inputSplit);
}
}
public long getLength() {
return realtimeFileSplits.size();
}
/** Returns an array containing the start offsets of the files in the split. */
public long[] getStartOffsets() {
return realtimeFileSplits.stream().mapToLong(x -> 0L).toArray();
}
/** Returns an array containing the lengths of the files in the split. */
public long[] getLengths() {
return realtimeFileSplits.stream().mapToLong(FileSplit::getLength).toArray();
}
/** Returns the start offset of the i<sup>th</sup> Path. */
public long getOffset(int i) {
return 0;
}
/** Returns the length of the i<sup>th</sup> Path. */
public long getLength(int i) {
return realtimeFileSplits.get(i).getLength();
}
/** Returns the number of Paths in the split. */
public int getNumPaths() {
return realtimeFileSplits.size();
}
/** Returns the i<sup>th</sup> Path. */
public Path getPath(int i) {
return realtimeFileSplits.get(i).getPath();
}
/** Returns all the Paths in the split. */
public Path[] getPaths() {
return realtimeFileSplits.stream().map(x -> x.getPath()).toArray(Path[]::new);
}
/** Returns all the Paths where this input-split resides. */
public String[] getLocations() throws IOException {
return realtimeFileSplits.stream().flatMap(x -> {
try {
return Arrays.stream(x.getLocations());
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).toArray(String[]::new);
}
public static class Builder {
// These are instances of HoodieRealtimeSplits
public List<FileSplit> fileSplits = new ArrayList<>();
public void addSplit(FileSplit split) {
fileSplits.add(split);
}
public HoodieCombineRealtimeFileSplit build(JobConf conf) {
return new HoodieCombineRealtimeFileSplit(conf, fileSplits);
}
}
}

View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop.hive;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat.CombineHiveInputSplit;
/**
* Represents a CombineHiveInputSplit for realtime tables.
*/
public class HoodieCombineRealtimeHiveSplit extends CombineHiveInputSplit {
public HoodieCombineRealtimeHiveSplit() throws IOException {
super(ShimLoader.getHadoopShims().getCombineFileInputFormat().getInputSplitShim());
}
public HoodieCombineRealtimeHiveSplit(JobConf jobConf, CombineFileSplit
combineFileSplit, Map<Path, PartitionDesc> map)
throws IOException {
super(jobConf, combineFileSplit, map);
}
}

View File

@@ -360,7 +360,9 @@ public abstract class AbstractRealtimeRecordReader {
private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) {
// Get all column names of hive table
String hiveColumnString = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS);
LOG.info("Hive Columns : " + hiveColumnString);
String[] hiveColumns = hiveColumnString.split(",");
LOG.info("Hive Columns : " + hiveColumnString);
List<Field> hiveSchemaFields = new ArrayList<>();
for (String columnName : hiveColumns) {
@@ -378,6 +380,7 @@ public abstract class AbstractRealtimeRecordReader {
Schema hiveSchema = Schema.createRecord(writerSchema.getName(), writerSchema.getDoc(), writerSchema.getNamespace(),
writerSchema.isError());
hiveSchema.setFields(hiveSchemaFields);
LOG.info("HIVE Schema is :" + hiveSchema.toString(true));
return hiveSchema;
}

View File

@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
/**
* Allows to read multiple realtime file splits grouped together by CombineInputFormat.
*/
public class HoodieCombineRealtimeRecordReader implements RecordReader<NullWritable, ArrayWritable> {
private static final transient Logger LOG = LogManager.getLogger(HoodieCombineRealtimeRecordReader.class);
// RecordReaders for each split
List<HoodieRealtimeRecordReader> recordReaders = new LinkedList<>();
// Points to the currently iterating record reader
HoodieRealtimeRecordReader currentRecordReader;
public HoodieCombineRealtimeRecordReader(JobConf jobConf, CombineFileSplit split,
List<RecordReader> readers) {
try {
ValidationUtils.checkArgument(((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits().size() == readers
.size(), "Num Splits does not match number of unique RecordReaders!");
for (InputSplit rtSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) {
LOG.info("Creating new RealtimeRecordReader for split");
recordReaders.add(
new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) rtSplit, jobConf, readers.remove(0)));
}
currentRecordReader = recordReaders.remove(0);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean next(NullWritable key, ArrayWritable value) throws IOException {
if (this.currentRecordReader.next(key, value)) {
LOG.info("Reading from record reader");
LOG.info(AbstractRealtimeRecordReader.arrayWritableToString(value));
return true;
} else if (recordReaders.size() > 0) {
this.currentRecordReader.close();
this.currentRecordReader = recordReaders.remove(0);
return this.currentRecordReader.next(key, value);
} else {
return false;
}
}
@Override
public NullWritable createKey() {
return this.currentRecordReader.createKey();
}
@Override
public ArrayWritable createValue() {
return this.currentRecordReader.createValue();
}
@Override
public long getPos() throws IOException {
return this.currentRecordReader.getPos();
}
@Override
public void close() throws IOException {
this.currentRecordReader.close();
}
@Override
public float getProgress() throws IOException {
return this.currentRecordReader.getProgress();
}
}

View File

@@ -262,4 +262,4 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
public Configuration getConf() {
return conf;
}
}
}

View File

@@ -66,6 +66,7 @@ public class HoodieRealtimeRecordReader implements RecordReader<NullWritable, Ar
LOG.info("Enabling un-merged reading of realtime records");
return new RealtimeUnmergedRecordReader(split, jobConf, realReader);
}
LOG.info("Enabling merged reading of realtime records for split " + split);
return new RealtimeCompactedRecordReader(split, jobConf, realReader);
} catch (IOException ex) {
LOG.error("Got exception when constructing record reader", ex);

View File

@@ -18,6 +18,15 @@
package org.apache.hudi.hadoop.realtime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
@@ -29,17 +38,6 @@ import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.hadoop.RecordReaderValueIterator;
import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> {
@@ -84,7 +82,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get();
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, getWriterSchema());
this.executor.getQueue().insertRecord(aWritable);
});
});
// Start reading and buffering
this.executor.startProducers();
}