From a066865bd6e1bbdae145324d70fbfb8b5b69b2ab Mon Sep 17 00:00:00 2001 From: n3nash Date: Sat, 3 Aug 2019 08:44:01 -0700 Subject: [PATCH] - Adding HoodieCombineHiveInputFormat for COW tables (#811) - Combine input format helps to reduce large scans into smaller ones by combining map tasks - Implementation to support Hive 2.x and above --- .../hive/HoodieCombineHiveInputFormat.java | 933 ++++++++++++++++++ 1 file changed, 933 insertions(+) create mode 100644 hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/hive/HoodieCombineHiveInputFormat.java diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/hive/HoodieCombineHiveInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/hive/HoodieCombineHiveInputFormat.java new file mode 100644 index 000000000..5f2fb47d3 --- /dev/null +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -0,0 +1,933 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hadoop.hive; + +import com.google.common.annotations.VisibleForTesting; +import com.uber.hoodie.hadoop.HoodieInputFormat; +import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.StringInternUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.CombineHiveRecordReader; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.IOPrepareCache; +import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.parse.SplitSample; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim; +import org.apache.hadoop.hive.shims.HadoopShimsSecure; +import org.apache.hadoop.hive.shims.HadoopShimsSecure.InputSplitShim; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.lib.CombineFileInputFormat; +import org.apache.hadoop.mapred.lib.CombineFileSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is just a copy of the org.apache.hadoop.hive.ql.io.CombineHiveInputFormat from Hive 2.x + * Search for **MOD** to see minor modifications to support custom inputformat in CombineHiveInputFormat. + * See https://issues.apache.org/jira/browse/HIVE-9771 + *

+ *

+ * CombineHiveInputFormat is a parameterized InputFormat which looks at the path + * name and determine the correct InputFormat for that path name from + * mapredPlan.pathToPartitionInfo(). It can be used to read files with different + * input format in the same map-reduce job. + * + * NOTE : This class is implemented to work with Hive 2.x + + */ +public class HoodieCombineHiveInputFormat + extends HiveInputFormat { + + private static final String CLASS_NAME = HoodieCombineHiveInputFormat.class.getName(); + public static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + // max number of threads we can use to check non-combinable paths + private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50; + private static final int DEFAULT_NUM_PATH_PER_THREAD = 100; + + private class CheckNonCombinablePathCallable implements Callable> { + + private final Path[] paths; + private final int start; + private final int length; + private final JobConf conf; + + public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) { + this.paths = paths; + this.start = start; + this.length = length; + this.conf = conf; + } + + @Override + public Set call() throws Exception { + Set nonCombinablePathIndices = new HashSet(); + for (int i = 0; i < length; i++) { + PartitionDesc part = + HiveFileFormatUtils.getPartitionDescFromPathRecursively( + pathToPartitionInfo, paths[i + start], + IOPrepareCache.get().allocatePartitionDescMap()); + // Use HiveInputFormat if any of the paths is not splittable + Class inputFormatClass = part.getInputFileFormatClass(); + InputFormat inputFormat = + getInputFormatFromCache(inputFormatClass, conf); + if (inputFormat instanceof AvoidSplitCombination + && ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) { + if (LOG.isDebugEnabled()) { + LOG.debug("The path [" + paths[i + start] + + "] is being parked for HiveInputFormat.getSplits"); + } + nonCombinablePathIndices.add(i + start); + } + } + return nonCombinablePathIndices; + } + } + + /** + * CombineHiveInputSplit encapsulates an InputSplit with its corresponding + * inputFormatClassName. A CombineHiveInputSplit comprises of multiple chunks + * from different files. Since, they belong to a single directory, there is a + * single inputformat for all the chunks. + */ + public static class CombineHiveInputSplit extends InputSplitShim { + + private String inputFormatClassName; + private CombineFileSplit inputSplitShim; + private Map pathToPartitionInfo; + + public CombineHiveInputSplit() throws IOException { + this(ShimLoader.getHadoopShims().getCombineFileInputFormat() + .getInputSplitShim()); + } + + public CombineHiveInputSplit(CombineFileSplit inputSplitShim) throws IOException { + this(inputSplitShim.getJob(), inputSplitShim); + } + + public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) + throws IOException { + this(job, inputSplitShim, null); + } + + public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim, + Map pathToPartitionInfo) throws IOException { + this.inputSplitShim = inputSplitShim; + this.pathToPartitionInfo = pathToPartitionInfo; + if (job != null) { + if (this.pathToPartitionInfo == null) { + this.pathToPartitionInfo = Utilities.getMapWork(job).getPathToPartitionInfo(); + } + + // extract all the inputFormatClass names for each chunk in the + // CombinedSplit. + Path[] ipaths = inputSplitShim.getPaths(); + if (ipaths.length > 0) { + PartitionDesc part = HiveFileFormatUtils + .getPartitionDescFromPathRecursively(this.pathToPartitionInfo, + ipaths[0], IOPrepareCache.get().getPartitionDescMap()); + inputFormatClassName = part.getInputFileFormatClass().getName(); + } + } + } + + public CombineFileSplit getInputSplitShim() { + return inputSplitShim; + } + + /** + * Returns the inputFormat class name for the i-th chunk. + */ + public String inputFormatClassName() { + return inputFormatClassName; + } + + public void setInputFormatClassName(String inputFormatClassName) { + this.inputFormatClassName = inputFormatClassName; + } + + @Override + public JobConf getJob() { + return inputSplitShim.getJob(); + } + + @Override + public long getLength() { + return inputSplitShim.getLength(); + } + + /** + * Returns an array containing the startoffsets of the files in the split. + */ + @Override + public long[] getStartOffsets() { + return inputSplitShim.getStartOffsets(); + } + + /** + * Returns an array containing the lengths of the files in the split. + */ + @Override + public long[] getLengths() { + return inputSplitShim.getLengths(); + } + + /** + * Returns the start offset of the ith Path. + */ + @Override + public long getOffset(int i) { + return inputSplitShim.getOffset(i); + } + + /** + * Returns the length of the ith Path. + */ + @Override + public long getLength(int i) { + return inputSplitShim.getLength(i); + } + + /** + * Returns the number of Paths in the split. + */ + @Override + public int getNumPaths() { + return inputSplitShim.getNumPaths(); + } + + /** + * Returns the ith Path. + */ + @Override + public Path getPath(int i) { + return inputSplitShim.getPath(i); + } + + /** + * Returns all the Paths in the split. + */ + @Override + public Path[] getPaths() { + return inputSplitShim.getPaths(); + } + + /** + * Returns all the Paths where this input-split resides. + */ + @Override + public String[] getLocations() throws IOException { + return inputSplitShim.getLocations(); + } + + /** + * Prints this obejct as a string. + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(inputSplitShim.toString()); + sb.append("InputFormatClass: " + inputFormatClassName); + sb.append("\n"); + return sb.toString(); + } + + /** + * Writable interface. + */ + @Override + public void readFields(DataInput in) throws IOException { + inputSplitShim.readFields(in); + inputFormatClassName = in.readUTF(); + } + + /** + * Writable interface. + */ + @Override + public void write(DataOutput out) throws IOException { + inputSplitShim.write(out); + if (inputFormatClassName == null) { + if (pathToPartitionInfo == null) { + pathToPartitionInfo = Utilities.getMapWork(getJob()).getPathToPartitionInfo(); + } + + // extract all the inputFormatClass names for each chunk in the + // CombinedSplit. + PartitionDesc part = + HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo, + inputSplitShim.getPath(0), IOPrepareCache.get().getPartitionDescMap()); + + // create a new InputFormat instance if this is the first time to see + // this class + inputFormatClassName = part.getInputFileFormatClass().getName(); + } + + out.writeUTF(inputFormatClassName); + } + } + + // Splits are not shared across different partitions with different input formats. + // For example, 2 partitions (1 sequencefile and 1 rcfile) will have 2 different splits + private static class CombinePathInputFormat { + + private final List> opList; + private final String inputFormatClassName; + private final String deserializerClassName; + + public CombinePathInputFormat(List> opList, + String inputFormatClassName, + String deserializerClassName) { + this.opList = opList; + this.inputFormatClassName = inputFormatClassName; + this.deserializerClassName = deserializerClassName; + } + + @Override + public boolean equals(Object o) { + if (o instanceof CombinePathInputFormat) { + CombinePathInputFormat mObj = (CombinePathInputFormat) o; + return (opList.equals(mObj.opList)) + && (inputFormatClassName.equals(mObj.inputFormatClassName)) + && (deserializerClassName == null ? (mObj.deserializerClassName == null) : + deserializerClassName.equals(mObj.deserializerClassName)); + } + return false; + } + + @Override + public int hashCode() { + return (opList == null) ? 0 : opList.hashCode(); + } + } + + /** + * Create Hive splits based on CombineFileSplit. + */ + private InputSplit[] getCombineSplits(JobConf job, int numSplits, + Map pathToPartitionInfo) + throws IOException { + init(job); + Map> pathToAliases = mrwork.getPathToAliases(); + Map> aliasToWork = + mrwork.getAliasToWork(); + /** MOD - Initialize a custom combine input format shim that will call listStatus on the custom inputFormat **/ + HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim + combine = new HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim(); + + InputSplit[] splits = null; + if (combine == null) { + splits = super.getSplits(job, numSplits); + return splits; + } + + if (combine.getInputPathsShim(job).length == 0) { + throw new IOException("No input paths specified in job"); + } + ArrayList result = new ArrayList(); + + // combine splits only from same tables and same partitions. Do not combine splits from multiple + // tables or multiple partitions. + Path[] paths = StringInternUtils.internUriStringsInPathArray(combine.getInputPathsShim(job)); + + List inpDirs = new ArrayList(); + List inpFiles = new ArrayList(); + Map poolMap = + new HashMap(); + Set poolSet = new HashSet(); + + for (Path path : paths) { + PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively( + pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap()); + TableDesc tableDesc = part.getTableDesc(); + if ((tableDesc != null) && tableDesc.isNonNative()) { + return super.getSplits(job, numSplits); + } + + // Use HiveInputFormat if any of the paths is not splittable + Class inputFormatClass = part.getInputFileFormatClass(); + String inputFormatClassName = inputFormatClass.getName(); + InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); + LOG.info("Input Format => " + inputFormatClass.getName()); + // **MOD** Set the hoodie filter in the combine + if (inputFormatClass.getName().equals(HoodieInputFormat.class.getName())) { + combine.setHoodieFilter(true); + } else if (inputFormatClass.getName().equals(HoodieRealtimeInputFormat.class.getName())) { + LOG.info("Setting hoodie filter and realtime input format"); + combine.setHoodieFilter(true); + combine.setRealTime(true); + } + String deserializerClassName = null; + try { + deserializerClassName = part.getDeserializer(job).getClass().getName(); + } catch (Exception e) { + // ignore + } + FileSystem inpFs = path.getFileSystem(job); + + //don't combine if inputformat is a SymlinkTextInputFormat + if (inputFormat instanceof SymlinkTextInputFormat) { + splits = super.getSplits(job, numSplits); + return splits; + } + + Path filterPath = path; + + // Does a pool exist for this path already + CombineFilter f = null; + List> opList = null; + + if (!mrwork.isMapperCannotSpanPartns()) { + //if mapper can span partitions, make sure a splits does not contain multiple + // opList + inputFormatClassName + deserializerClassName combination + // This is done using the Map of CombinePathInputFormat to PathFilter + + opList = HiveFileFormatUtils.doGetWorksFromPath( + pathToAliases, aliasToWork, filterPath); + CombinePathInputFormat combinePathInputFormat = + new CombinePathInputFormat(opList, inputFormatClassName, deserializerClassName); + f = poolMap.get(combinePathInputFormat); + if (f == null) { + f = new CombineFilter(filterPath); + LOG.info("CombineHiveInputSplit creating pool for " + + path + "; using filter path " + filterPath); + combine.createPool(job, f); + poolMap.put(combinePathInputFormat, f); + } else { + LOG.info("CombineHiveInputSplit: pool is already created for " + + path + "; using filter path " + filterPath); + f.addPath(filterPath); + } + } else { + // In the case of tablesample, the input paths are pointing to files rather than directories. + // We need to get the parent directory as the filtering path so that all files in the same + // parent directory will be grouped into one pool but not files from different parent + // directories. This guarantees that a split will combine all files in the same partition + // but won't cross multiple partitions if the user has asked so. + if (!path.getFileSystem(job).getFileStatus(path).isDir()) { // path is not directory + filterPath = path.getParent(); + inpFiles.add(path); + poolSet.add(filterPath); + } else { + inpDirs.add(path); + } + } + } + + // Processing directories + List iss = new ArrayList(); + if (!mrwork.isMapperCannotSpanPartns()) { + //mapper can span partitions + //combine into as few as one split, subject to the PathFilters set + // using combine.createPool. + iss = Arrays.asList(combine.getSplits(job, 1)); + } else { + for (Path path : inpDirs) { + processPaths(job, combine, iss, path); + } + + if (inpFiles.size() > 0) { + // Processing files + for (Path filterPath : poolSet) { + combine.createPool(job, new CombineFilter(filterPath)); + } + processPaths(job, combine, iss, inpFiles.toArray(new Path[0])); + } + } + + if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) { + iss = sampleSplits(iss); + } + + for (CombineFileSplit is : iss) { + CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is, pathToPartitionInfo); + result.add(csplit); + } + + LOG.info("number of splits " + result.size()); + return result.toArray(new CombineHiveInputSplit[result.size()]); + } + + /** + * Gets all the path indices that should not be combined + */ + @VisibleForTesting + public Set getNonCombinablePathIndices(JobConf job, Path[] paths, int numThreads) + throws ExecutionException, InterruptedException { + LOG.info("Total number of paths: " + paths.length + + ", launching " + numThreads + " threads to check non-combinable ones."); + int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads); + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + List>> futureList = new ArrayList>>(numThreads); + try { + for (int i = 0; i < numThreads; i++) { + int start = i * numPathPerThread; + int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; + futureList.add(executor.submit( + new CheckNonCombinablePathCallable(paths, start, length, job))); + } + Set nonCombinablePathIndices = new HashSet(); + for (Future> future : futureList) { + nonCombinablePathIndices.addAll(future.get()); + } + return nonCombinablePathIndices; + } finally { + executor.shutdownNow(); + } + } + + /** + * Create Hive splits based on CombineFileSplit. + */ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); + init(job); + + ArrayList result = new ArrayList(); + + Path[] paths = getInputPaths(job); + + List nonCombinablePaths = new ArrayList(paths.length / 2); + List combinablePaths = new ArrayList(paths.length / 2); + + int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, + (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); + + // This check is necessary because for Spark branch, the result array from + // getInputPaths() above could be empty, and therefore numThreads could be 0. + // In that case, Executors.newFixedThreadPool will fail. + if (numThreads > 0) { + try { + Set nonCombinablePathIndices = getNonCombinablePathIndices(job, paths, numThreads); + for (int i = 0; i < paths.length; i++) { + if (nonCombinablePathIndices.contains(i)) { + nonCombinablePaths.add(paths[i]); + } else { + combinablePaths.add(paths[i]); + } + } + } catch (Exception e) { + LOG.error("Error checking non-combinable path", e); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); + throw new IOException(e); + } + } + + // Store the previous value for the path specification + String oldPaths = job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); + if (LOG.isDebugEnabled()) { + LOG.debug("The received input paths are: [" + oldPaths + + "] against the property " + + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); + } + + // Process the normal splits + if (nonCombinablePaths.size() > 0) { + FileInputFormat.setInputPaths(job, + nonCombinablePaths.toArray(new Path[nonCombinablePaths.size()])); + InputSplit[] splits = super.getSplits(job, numSplits); + for (InputSplit split : splits) { + result.add(split); + } + } + + // Process the combine splits + if (combinablePaths.size() > 0) { + FileInputFormat.setInputPaths(job, + combinablePaths.toArray(new Path[combinablePaths.size()])); + Map pathToPartitionInfo = this.pathToPartitionInfo != null + ? this.pathToPartitionInfo : Utilities.getMapWork(job).getPathToPartitionInfo(); + InputSplit[] splits = getCombineSplits(job, numSplits, pathToPartitionInfo); + for (InputSplit split : splits) { + result.add(split); + } + } + + // Restore the old path information back + // This is just to prevent incompatibilities with previous versions Hive + // if some application depends on the original value being set. + if (oldPaths != null) { + job.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, oldPaths); + } + + // clear work from ThreadLocal after splits generated in case of thread is reused in pool. + Utilities.clearWorkMapForConf(job); + + LOG.info("Number of all splits " + result.size()); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); + return result.toArray(new InputSplit[result.size()]); + } + + private void processPaths(JobConf job, CombineFileInputFormatShim combine, + List iss, Path... path) throws IOException { + JobConf currJob = new JobConf(job); + FileInputFormat.setInputPaths(currJob, path); + iss.addAll(Arrays.asList(combine.getSplits(currJob, 1))); + } + + /** + * MOD - Just added this for visibility + **/ + Path[] getInputPaths(JobConf job) throws IOException { + Path[] dirs = FileInputFormat.getInputPaths(job); + if (dirs.length == 0) { + // on tez we're avoiding to duplicate the file info in FileInputFormat. + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + try { + List paths = Utilities.getInputPathsTez(job, mrwork); + dirs = paths.toArray(new Path[paths.size()]); + } catch (Exception e) { + throw new IOException("Could not create input files", e); + } + } else { + throw new IOException("No input paths specified in job"); + } + } + return dirs; + } + + /** + * This function is used to sample inputs for clauses like "TABLESAMPLE(1 PERCENT)" + *

+ * First, splits are grouped by alias they are for. If one split serves more than one + * alias or not for any sampled alias, we just directly add it to returned list. + * Then we find a list of exclusive splits for every alias to be sampled. + * For each alias, we start from position of seedNumber%totalNumber, and keep add + * splits until the total size hits percentage. + * + * @return the sampled splits + */ + private List sampleSplits(List splits) { + HashMap nameToSamples = mrwork.getNameToSplitSample(); + List retLists = new ArrayList(); + Map> aliasToSplitList = + new HashMap>(); + Map> pathToAliases = mrwork.getPathToAliases(); + Map> pathToAliasesNoScheme = removeScheme(pathToAliases); + + // Populate list of exclusive splits for every sampled alias + // + for (CombineFileSplit split : splits) { + String alias = null; + for (Path path : split.getPaths()) { + boolean schemeless = path.toUri().getScheme() == null; + List l = HiveFileFormatUtils.doGetAliasesFromPath( + schemeless ? pathToAliasesNoScheme : pathToAliases, path); + // a path for a split unqualified the split from being sampled if: + // 1. it serves more than one alias + // 2. the alias it serves is not sampled + // 3. it serves different alias than another path for the same split + if (l.size() != 1 || !nameToSamples.containsKey(l.get(0)) + || (alias != null && l.get(0) != alias)) { + alias = null; + break; + } + alias = l.get(0); + } + + if (alias != null) { + // split exclusively serves alias, which needs to be sampled + // add it to the split list of the alias. + if (!aliasToSplitList.containsKey(alias)) { + aliasToSplitList.put(alias, new ArrayList()); + } + aliasToSplitList.get(alias).add(split); + } else { + // The split doesn't exclusively serve one alias + retLists.add(split); + } + } + + // for every sampled alias, we figure out splits to be sampled and add + // them to return list + // + for (Map.Entry> entry : aliasToSplitList.entrySet()) { + ArrayList splitList = entry.getValue(); + long totalSize = 0; + for (CombineFileSplit split : splitList) { + totalSize += split.getLength(); + } + + SplitSample splitSample = nameToSamples.get(entry.getKey()); + + long targetSize = splitSample.getTargetSize(totalSize); + int startIndex = splitSample.getSeedNum() % splitList.size(); + long size = 0; + for (int i = 0; i < splitList.size(); i++) { + CombineFileSplit split = splitList.get((startIndex + i) % splitList.size()); + retLists.add(split); + long splitgLength = split.getLength(); + if (size + splitgLength >= targetSize) { + LOG.info("Sample alias " + entry.getValue() + " using " + (i + 1) + "splits"); + if (size + splitgLength > targetSize) { + ((InputSplitShim) split).shrinkSplit(targetSize - size); + } + break; + } + size += splitgLength; + } + + } + + return retLists; + } + + Map> removeScheme(Map> pathToAliases) { + Map> result = new HashMap<>(); + for (Map.Entry> entry : pathToAliases.entrySet()) { + Path newKey = Path.getPathWithoutSchemeAndAuthority(entry.getKey()); + StringInternUtils.internUriStringsInPath(newKey); + result.put(newKey, entry.getValue()); + } + return result; + } + + /** + * Create a generic Hive RecordReader than can iterate over all chunks in a + * CombinedFileSplit. + */ + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + if (!(split instanceof CombineHiveInputSplit)) { + return super.getRecordReader(split, job, reporter); + } + + CombineHiveInputSplit hsplit = (CombineHiveInputSplit) split; + + String inputFormatClassName = null; + Class inputFormatClass = null; + try { + inputFormatClassName = hsplit.inputFormatClassName(); + inputFormatClass = job.getClassByName(inputFormatClassName); + } catch (Exception e) { + throw new IOException("cannot find class " + inputFormatClassName); + } + + pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath(0)); + + return ShimLoader.getHadoopShims().getCombineFileInputFormat() + .getRecordReader(job, + (CombineFileSplit) split, reporter, + CombineHiveRecordReader.class); + } + + static class CombineFilter implements PathFilter { + + private final Set pStrings = new HashSet(); + + // store a path prefix in this TestFilter + // PRECONDITION: p should always be a directory + public CombineFilter(Path p) { + // we need to keep the path part only because the Hadoop CombineFileInputFormat will + // pass the path part only to accept(). + // Trailing the path with a separator to prevent partial matching. + addPath(p); + } + + public void addPath(Path p) { + String pString = p.toUri().getPath(); + pStrings.add(pString); + } + + // returns true if the specified path matches the prefix stored + // in this TestFilter. + @Override + public boolean accept(Path path) { + boolean find = false; + while (path != null && !find) { + if (pStrings.contains(path.toUri().getPath())) { + find = true; + break; + } + path = path.getParent(); + } + return find; + } + + @Override + public String toString() { + StringBuilder s = new StringBuilder(); + s.append("PathFilter: "); + for (String pString : pStrings) { + s.append(pString + " "); + } + return s.toString(); + } + } + + /** + * This is a marker interface that is used to identify the formats where + * combine split generation is not applicable + */ + public interface AvoidSplitCombination { + + boolean shouldSkipCombine(Path path, Configuration conf) throws IOException; + } + + /** + * **MOD** this is the implementation of CombineFileInputFormat which is a copy of + * org.apache.hadoop.hive.shims.HadoopShimsSecure.CombineFileInputFormatShim + * with changes in listStatus + */ + public static class HoodieCombineFileInputFormatShim extends CombineFileInputFormat + implements org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim { + + private boolean hoodieFilter = false; + private boolean isRealTime = false; + + public HoodieCombineFileInputFormatShim() { + } + + public Path[] getInputPathsShim(JobConf conf) { + try { + return FileInputFormat.getInputPaths(conf); + } catch (Exception var3) { + throw new RuntimeException(var3); + } + } + + public void createPool(JobConf conf, PathFilter... filters) { + super.createPool(conf, filters); + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) + throws IOException { + throw new IOException("CombineFileInputFormat.getRecordReader not needed."); + } + + protected List listStatus(JobContext job) throws IOException { + LOG.info("Listing status in HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim"); + List result; + if (hoodieFilter) { + HoodieInputFormat input; + if (isRealTime) { + LOG.info("Using HoodieRealtimeInputFormat"); + input = new HoodieRealtimeInputFormat(); + } else { + LOG.info("Using HoodieInputFormat"); + input = new HoodieInputFormat(); + } + input.setConf(job.getConfiguration()); + result = new ArrayList( + Arrays.asList(input.listStatus(new JobConf(job.getConfiguration())))); + } else { + result = super.listStatus(job); + } + + Iterator it = result.iterator(); + + while (it.hasNext()) { + FileStatus stat = (FileStatus) it.next(); + if (!stat.isFile()) { + it.remove(); + } + } + return result; + } + + public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOException { + long minSize = job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 0L); + if (job.getLong("mapreduce.input.fileinputformat.split.minsize.per.node", 0L) == 0L) { + super.setMinSplitSizeNode(minSize); + } + + if (job.getLong("mapreduce.input.fileinputformat.split.minsize.per.rack", 0L) == 0L) { + super.setMinSplitSizeRack(minSize); + } + + if (job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, 0L) == 0L) { + super.setMaxSplitSize(minSize); + } + + InputSplit[] splits = super.getSplits(job, numSplits); + ArrayList inputSplitShims = new ArrayList(); + + for (int pos = 0; pos < splits.length; ++pos) { + CombineFileSplit split = (CombineFileSplit) splits[pos]; + if (split.getPaths().length > 0) { + inputSplitShims.add( + new HadoopShimsSecure.InputSplitShim(job, split.getPaths(), split.getStartOffsets(), + split.getLengths(), split.getLocations())); + } + } + + return (CombineFileSplit[]) inputSplitShims + .toArray(new HadoopShimsSecure.InputSplitShim[inputSplitShims.size()]); + } + + public HadoopShimsSecure.InputSplitShim getInputSplitShim() throws IOException { + return new HadoopShimsSecure.InputSplitShim(); + } + + public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter, + Class> rrClass) throws IOException { + return new HadoopShimsSecure.CombineFileRecordReader(job, split, reporter, rrClass); + } + + public void setHoodieFilter(boolean hoodieFilter) { + this.hoodieFilter = hoodieFilter; + } + + public void setRealTime(boolean realTime) { + isRealTime = realTime; + } + } +} \ No newline at end of file