diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java index 24430fbc0..1a65a46a2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java @@ -50,7 +50,7 @@ public class HoodieMergeOnReadTestUtils { } public static List getRecordsUsingInputFormat(List inputPaths, String basePath, - Configuration conf) { + Configuration conf) { JobConf jobConf = new JobConf(conf); return getRecordsUsingInputFormat(inputPaths, basePath, jobConf, new HoodieParquetRealtimeInputFormat()); } @@ -125,4 +125,4 @@ public class HoodieMergeOnReadTestUtils { jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath); jobConf.set("map.input.dir", inputPath); } -} +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ArrayUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ArrayUtils.java new file mode 100644 index 000000000..cc76c9d3d --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ArrayUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +/** + *

Operations on arrays, primitive arrays (like {@code int[]}) and + * primitive wrapper arrays (like {@code Integer[]}).

+ * + *

This class tries to handle {@code null} input gracefully. + * An exception will not be thrown for a {@code null} + * array input. However, an Object array that contains a {@code null} + * element may throw an exception. Each method documents its behaviour.

+ * + * NOTE : Adapted from org.apache.commons.lang3.ArrayUtils + */ +public class ArrayUtils { + + /** + * An empty immutable {@code long} array. + */ + public static final long[] EMPTY_LONG_ARRAY = new long[0]; + + // Long array converters + // ---------------------------------------------------------------------- + /** + *

Converts an array of object Longs to primitives.

+ * + *

This method returns {@code null} for a {@code null} input array.

+ * + * @param array a {@code Long} array, may be {@code null} + * @return a {@code long} array, {@code null} if null array input + * @throws NullPointerException if array content is {@code null} + */ + public static long[] toPrimitive(Long[] array) { + if (array == null) { + return null; + } else if (array.length == 0) { + return EMPTY_LONG_ARRAY; + } + final long[] result = new long[array.length]; + for (int i = 0; i < array.length; i++) { + result[i] = array[i].longValue(); + } + return result; + } +} diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml index a77932ceb..8d21bdaff 100644 --- a/hudi-hadoop-mr/pom.xml +++ b/hudi-hadoop-mr/pom.xml @@ -150,4 +150,4 @@ - + \ No newline at end of file diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index 27a8318c5..9f024e9d2 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -18,7 +18,11 @@ package org.apache.hudi.hadoop.hive; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hadoop.conf.Configuration; @@ -28,11 +32,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.CombineHiveRecordReader; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.hive.ql.io.IOPrepareCache; import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -45,9 +51,11 @@ import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hive.shims.HadoopShimsSecure.InputSplitShim; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -67,6 +75,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -99,250 +108,6 @@ public class HoodieCombineHiveInputFormat> { - - private final Path[] paths; - private final int start; - private final int length; - private final JobConf conf; - - public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) { - this.paths = paths; - this.start = start; - this.length = length; - this.conf = conf; - } - - @Override - public Set call() throws Exception { - Set nonCombinablePathIndices = new HashSet<>(); - for (int i = 0; i < length; i++) { - PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo, - paths[i + start], IOPrepareCache.get().allocatePartitionDescMap()); - // Use HiveInputFormat if any of the paths is not splittable - Class inputFormatClass = part.getInputFileFormatClass(); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, conf); - if (inputFormat instanceof AvoidSplitCombination - && ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) { - if (LOG.isDebugEnabled()) { - LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits"); - } - nonCombinablePathIndices.add(i + start); - } - } - return nonCombinablePathIndices; - } - } - - /** - * CombineHiveInputSplit encapsulates an InputSplit with its corresponding inputFormatClassName. A - * CombineHiveInputSplit comprises of multiple chunks from different files. Since, they belong to a single directory, - * there is a single inputformat for all the chunks. - */ - public static class CombineHiveInputSplit extends InputSplitShim { - - private String inputFormatClassName; - private CombineFileSplit inputSplitShim; - private Map pathToPartitionInfo; - - public CombineHiveInputSplit() throws IOException { - this(ShimLoader.getHadoopShims().getCombineFileInputFormat().getInputSplitShim()); - } - - public CombineHiveInputSplit(CombineFileSplit inputSplitShim) throws IOException { - this(inputSplitShim.getJob(), inputSplitShim); - } - - public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) throws IOException { - this(job, inputSplitShim, null); - } - - public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim, - Map pathToPartitionInfo) throws IOException { - this.inputSplitShim = inputSplitShim; - this.pathToPartitionInfo = pathToPartitionInfo; - if (job != null) { - if (this.pathToPartitionInfo == null) { - this.pathToPartitionInfo = Utilities.getMapWork(job).getPathToPartitionInfo(); - } - - // extract all the inputFormatClass names for each chunk in the - // CombinedSplit. - Path[] ipaths = inputSplitShim.getPaths(); - if (ipaths.length > 0) { - PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(this.pathToPartitionInfo, - ipaths[0], IOPrepareCache.get().getPartitionDescMap()); - inputFormatClassName = part.getInputFileFormatClass().getName(); - } - } - } - - public CombineFileSplit getInputSplitShim() { - return inputSplitShim; - } - - /** - * Returns the inputFormat class name for the i-th chunk. - */ - public String inputFormatClassName() { - return inputFormatClassName; - } - - public void setInputFormatClassName(String inputFormatClassName) { - this.inputFormatClassName = inputFormatClassName; - } - - @Override - public JobConf getJob() { - return inputSplitShim.getJob(); - } - - @Override - public long getLength() { - return inputSplitShim.getLength(); - } - - /** - * Returns an array containing the startoffsets of the files in the split. - */ - @Override - public long[] getStartOffsets() { - return inputSplitShim.getStartOffsets(); - } - - /** - * Returns an array containing the lengths of the files in the split. - */ - @Override - public long[] getLengths() { - return inputSplitShim.getLengths(); - } - - /** - * Returns the start offset of the ith Path. - */ - @Override - public long getOffset(int i) { - return inputSplitShim.getOffset(i); - } - - /** - * Returns the length of the ith Path. - */ - @Override - public long getLength(int i) { - return inputSplitShim.getLength(i); - } - - /** - * Returns the number of Paths in the split. - */ - @Override - public int getNumPaths() { - return inputSplitShim.getNumPaths(); - } - - /** - * Returns the ith Path. - */ - @Override - public Path getPath(int i) { - return inputSplitShim.getPath(i); - } - - /** - * Returns all the Paths in the split. - */ - @Override - public Path[] getPaths() { - return inputSplitShim.getPaths(); - } - - /** - * Returns all the Paths where this input-split resides. - */ - @Override - public String[] getLocations() throws IOException { - return inputSplitShim.getLocations(); - } - - /** - * Prints this obejct as a string. - */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(inputSplitShim.toString()); - sb.append("InputFormatClass: " + inputFormatClassName); - sb.append("\n"); - return sb.toString(); - } - - /** - * Writable interface. - */ - @Override - public void readFields(DataInput in) throws IOException { - inputSplitShim.readFields(in); - inputFormatClassName = in.readUTF(); - } - - /** - * Writable interface. - */ - @Override - public void write(DataOutput out) throws IOException { - inputSplitShim.write(out); - if (inputFormatClassName == null) { - if (pathToPartitionInfo == null) { - pathToPartitionInfo = Utilities.getMapWork(getJob()).getPathToPartitionInfo(); - } - - // extract all the inputFormatClass names for each chunk in the - // CombinedSplit. - PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo, - inputSplitShim.getPath(0), IOPrepareCache.get().getPartitionDescMap()); - - // create a new InputFormat instance if this is the first time to see - // this class - inputFormatClassName = part.getInputFileFormatClass().getName(); - } - - out.writeUTF(inputFormatClassName); - } - } - - // Splits are not shared across different partitions with different input formats. - // For example, 2 partitions (1 sequencefile and 1 rcfile) will have 2 different splits - private static class CombinePathInputFormat { - - private final List> opList; - private final String inputFormatClassName; - private final String deserializerClassName; - - public CombinePathInputFormat(List> opList, String inputFormatClassName, - String deserializerClassName) { - this.opList = opList; - this.inputFormatClassName = inputFormatClassName; - this.deserializerClassName = deserializerClassName; - } - - @Override - public boolean equals(Object o) { - if (o instanceof CombinePathInputFormat) { - CombinePathInputFormat mObj = (CombinePathInputFormat) o; - return (opList.equals(mObj.opList)) && (inputFormatClassName.equals(mObj.inputFormatClassName)) - && (Objects.equals(deserializerClassName, mObj.deserializerClassName)); - } - return false; - } - - @Override - public int hashCode() { - return (opList == null) ? 0 : opList.hashCode(); - } - } - /** * Create Hive splits based on CombineFileSplit. */ @@ -391,6 +156,16 @@ public class HoodieCombineHiveInputFormat partitions = new ArrayList<>(part.getPartSpec().keySet()); + if (!partitions.isEmpty()) { + String partitionStr = String.join(",", partitions); + LOG.info("Setting Partitions in jobConf - Partition Keys for Path : " + path + " is :" + partitionStr); + job.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, partitionStr); + } else { + job.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); + } + } } String deserializerClassName = null; try { @@ -472,7 +247,16 @@ public class HoodieCombineHiveInputFormat pathToPartitionInfo; + + public CombineHiveInputSplit() throws IOException { + this(ShimLoader.getHadoopShims().getCombineFileInputFormat().getInputSplitShim()); + } + + public CombineHiveInputSplit(CombineFileSplit inputSplitShim) throws IOException { + this(inputSplitShim.getJob(), inputSplitShim); + } + + public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) throws IOException { + this(job, inputSplitShim, null); + } + + public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim, + Map pathToPartitionInfo) throws IOException { + this.inputSplitShim = inputSplitShim; + this.pathToPartitionInfo = pathToPartitionInfo; + if (job != null) { + if (this.pathToPartitionInfo == null) { + this.pathToPartitionInfo = Utilities.getMapWork(job).getPathToPartitionInfo(); + } + + // extract all the inputFormatClass names for each chunk in the + // CombinedSplit. + Path[] ipaths = inputSplitShim.getPaths(); + if (ipaths.length > 0) { + PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(this.pathToPartitionInfo, + ipaths[0], IOPrepareCache.get().getPartitionDescMap()); + inputFormatClassName = part.getInputFileFormatClass().getName(); + } + } + } + + public CombineFileSplit getInputSplitShim() { + return inputSplitShim; + } + + /** + * Returns the inputFormat class name for the i-th chunk. + */ + public String inputFormatClassName() { + return inputFormatClassName; + } + + public void setInputFormatClassName(String inputFormatClassName) { + this.inputFormatClassName = inputFormatClassName; + } + + @Override + public JobConf getJob() { + return inputSplitShim.getJob(); + } + + @Override + public long getLength() { + return inputSplitShim.getLength(); + } + + /** + * Returns an array containing the startoffsets of the files in the split. + */ + @Override + public long[] getStartOffsets() { + return inputSplitShim.getStartOffsets(); + } + + /** + * Returns an array containing the lengths of the files in the split. + */ + @Override + public long[] getLengths() { + return inputSplitShim.getLengths(); + } + + /** + * Returns the start offset of the ith Path. + */ + @Override + public long getOffset(int i) { + return inputSplitShim.getOffset(i); + } + + /** + * Returns the length of the ith Path. + */ + @Override + public long getLength(int i) { + return inputSplitShim.getLength(i); + } + + /** + * Returns the number of Paths in the split. + */ + @Override + public int getNumPaths() { + return inputSplitShim.getNumPaths(); + } + + /** + * Returns the ith Path. + */ + @Override + public Path getPath(int i) { + return inputSplitShim.getPath(i); + } + + /** + * Returns all the Paths in the split. + */ + @Override + public Path[] getPaths() { + return inputSplitShim.getPaths(); + } + + /** + * Returns all the Paths where this input-split resides. + */ + @Override + public String[] getLocations() throws IOException { + return inputSplitShim.getLocations(); + } + + /** + * Prints this obejct as a string. + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(inputSplitShim.toString()); + sb.append("InputFormatClass: " + inputFormatClassName); + sb.append("\n"); + return sb.toString(); + } + + /** + * Writable interface. + */ + @Override + public void readFields(DataInput in) throws IOException { + inputFormatClassName = Text.readString(in); + if (HoodieParquetRealtimeInputFormat.class.getName().equals(inputFormatClassName)) { + String inputShimClassName = Text.readString(in); + inputSplitShim = ReflectionUtils.loadClass(inputShimClassName); + inputSplitShim.readFields(in); + } else { + inputSplitShim.readFields(in); + } + } + + /** + * Writable interface. + */ + @Override + public void write(DataOutput out) throws IOException { + if (inputFormatClassName == null) { + if (pathToPartitionInfo == null) { + pathToPartitionInfo = Utilities.getMapWork(getJob()).getPathToPartitionInfo(); + } + + // extract all the inputFormatClass names for each chunk in the + // CombinedSplit. + PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo, + inputSplitShim.getPath(0), IOPrepareCache.get().getPartitionDescMap()); + + // create a new InputFormat instance if this is the first time to see + // this class + inputFormatClassName = part.getInputFileFormatClass().getName(); + } + Text.writeString(out, inputFormatClassName); + if (HoodieParquetRealtimeInputFormat.class.getName().equals(inputFormatClassName)) { + // Write Shim Class Name + Text.writeString(out, inputSplitShim.getClass().getName()); + } + inputSplitShim.write(out); + } + } + + // Splits are not shared across different partitions with different input formats. + // For example, 2 partitions (1 sequencefile and 1 rcfile) will have 2 different splits + private static class CombinePathInputFormat { + + private final List> opList; + private final String inputFormatClassName; + private final String deserializerClassName; + + public CombinePathInputFormat(List> opList, String inputFormatClassName, + String deserializerClassName) { + this.opList = opList; + this.inputFormatClassName = inputFormatClassName; + this.deserializerClassName = deserializerClassName; + } + + @Override + public boolean equals(Object o) { + if (o instanceof CombinePathInputFormat) { + CombinePathInputFormat mObj = (CombinePathInputFormat) o; + return (opList.equals(mObj.opList)) && (inputFormatClassName.equals(mObj.inputFormatClassName)) + && (deserializerClassName == null ? (mObj.deserializerClassName == null) + : deserializerClassName.equals(mObj.deserializerClassName)); + } + return false; + } + + @Override + public int hashCode() { + return (opList == null) ? 0 : opList.hashCode(); + } } static class CombineFilter implements PathFilter { @@ -775,14 +795,6 @@ public class HoodieCombineHiveInputFormat combineFileSplits = new ArrayList<>(); + HoodieCombineRealtimeFileSplit.Builder builder = new HoodieCombineRealtimeFileSplit.Builder(); + int counter = 0; + for (int pos = 0; pos < splits.length; pos++) { + if (counter == maxSize - 1 || pos == splits.length - 1) { + builder.addSplit((FileSplit)splits[pos]); + combineFileSplits.add(builder.build(job)); + builder = new HoodieCombineRealtimeFileSplit.Builder(); + counter = 0; + } else if (counter < maxSize) { + counter++; + builder.addSplit((FileSplit)splits[pos]); + } + } + return combineFileSplits.toArray(new CombineFileSplit[combineFileSplits.size()]); + } else { + InputSplit[] splits = super.getSplits(job, numSplits); + ArrayList inputSplitShims = new ArrayList(); - return inputSplitShims.toArray(new HadoopShimsSecure.InputSplitShim[inputSplitShims.size()]); + for (int pos = 0; pos < splits.length; ++pos) { + CombineFileSplit split = (CombineFileSplit) splits[pos]; + if (split.getPaths().length > 0) { + inputSplitShims.add(new HadoopShimsSecure.InputSplitShim(job, split.getPaths(), split.getStartOffsets(), + split.getLengths(), split.getLocations())); + } + } + return (CombineFileSplit[]) inputSplitShims + .toArray(new HadoopShimsSecure.InputSplitShim[inputSplitShims.size()]); + } } @Override @@ -874,6 +917,16 @@ public class HoodieCombineHiveInputFormat> rrClass) throws IOException { + isRealTime = Boolean.valueOf(job.get("hudi.hive.realtime", "false")); + if (isRealTime) { + List recordReaders = new LinkedList<>(); + ValidationUtils.checkArgument(split instanceof HoodieCombineRealtimeFileSplit, "Only " + + HoodieCombineRealtimeFileSplit.class.getName() + " allowed, found " + split.getClass().getName()); + for (InputSplit inputSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) { + recordReaders.add(new HoodieParquetRealtimeInputFormat().getRecordReader(inputSplit, job, reporter)); + } + return new HoodieCombineRealtimeRecordReader(job, split, recordReaders); + } return new HadoopShimsSecure.CombineFileRecordReader(job, split, reporter, rrClass); } @@ -885,4 +938,39 @@ public class HoodieCombineHiveInputFormat> { + + private final Path[] paths; + private final int start; + private final int length; + private final JobConf conf; + + public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) { + this.paths = paths; + this.start = start; + this.length = length; + this.conf = conf; + } + + @Override + public Set call() throws Exception { + Set nonCombinablePathIndices = new HashSet(); + for (int i = 0; i < length; i++) { + PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo, + paths[i + start], IOPrepareCache.get().allocatePartitionDescMap()); + // Use HiveInputFormat if any of the paths is not splittable + Class inputFormatClass = part.getInputFileFormatClass(); + InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, conf); + if (inputFormat instanceof AvoidSplitCombination + && ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) { + if (LOG.isDebugEnabled()) { + LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits"); + } + nonCombinablePathIndices.add(i + start); + } + } + return nonCombinablePathIndices; + } + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeFileSplit.java new file mode 100644 index 000000000..a30aa178f --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeFileSplit.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.hive; + +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.collection.ArrayUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.lib.CombineFileSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Represents a CombineFileSplit for realtime tables. + */ +public class HoodieCombineRealtimeFileSplit extends CombineFileSplit { + + // These are instances of HoodieRealtimeSplits + List realtimeFileSplits = new ArrayList<>(); + + public HoodieCombineRealtimeFileSplit() { + } + + public HoodieCombineRealtimeFileSplit(JobConf jobConf, List realtimeFileSplits) { + super(jobConf, realtimeFileSplits.stream().map(p -> + ((HoodieRealtimeFileSplit) p).getPath()).collect(Collectors.toList()).toArray(new + Path[realtimeFileSplits.size()]), + ArrayUtils.toPrimitive(realtimeFileSplits.stream().map(p -> ((HoodieRealtimeFileSplit) p).getStart()) + .collect(Collectors.toList()).toArray(new Long[realtimeFileSplits.size()])), + ArrayUtils.toPrimitive(realtimeFileSplits.stream().map(p -> ((HoodieRealtimeFileSplit) p).getLength()) + .collect(Collectors.toList()).toArray(new Long[realtimeFileSplits.size()])), + realtimeFileSplits.stream().map(p -> { + try { + return Arrays.asList(p.getLocations()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).flatMap(List::stream).collect(Collectors.toList()).toArray(new + String[realtimeFileSplits.size()])); + this.realtimeFileSplits = realtimeFileSplits; + + } + + public List getRealtimeFileSplits() { + return realtimeFileSplits; + } + + @Override + public String toString() { + return "HoodieCombineRealtimeFileSplit{" + + "realtimeFileSplits=" + realtimeFileSplits + + '}'; + } + + /** + * Writable interface. + */ + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(realtimeFileSplits.size()); + for (InputSplit inputSplit: realtimeFileSplits) { + Text.writeString(out, inputSplit.getClass().getName()); + inputSplit.write(out); + } + } + + public void readFields(DataInput in) throws IOException { + int listLength = in.readInt(); + realtimeFileSplits = new ArrayList<>(listLength); + for (int i = 0; i < listLength; i++) { + String inputClassName = Text.readString(in); + HoodieRealtimeFileSplit inputSplit = ReflectionUtils.loadClass(inputClassName); + inputSplit.readFields(in); + realtimeFileSplits.add(inputSplit); + } + } + + public long getLength() { + return realtimeFileSplits.size(); + } + + /** Returns an array containing the start offsets of the files in the split. */ + public long[] getStartOffsets() { + return realtimeFileSplits.stream().mapToLong(x -> 0L).toArray(); + } + + /** Returns an array containing the lengths of the files in the split. */ + public long[] getLengths() { + return realtimeFileSplits.stream().mapToLong(FileSplit::getLength).toArray(); + } + + /** Returns the start offset of the ith Path. */ + public long getOffset(int i) { + return 0; + } + + /** Returns the length of the ith Path. */ + public long getLength(int i) { + return realtimeFileSplits.get(i).getLength(); + } + + /** Returns the number of Paths in the split. */ + public int getNumPaths() { + return realtimeFileSplits.size(); + } + + /** Returns the ith Path. */ + public Path getPath(int i) { + return realtimeFileSplits.get(i).getPath(); + } + + /** Returns all the Paths in the split. */ + public Path[] getPaths() { + return realtimeFileSplits.stream().map(x -> x.getPath()).toArray(Path[]::new); + } + + /** Returns all the Paths where this input-split resides. */ + public String[] getLocations() throws IOException { + return realtimeFileSplits.stream().flatMap(x -> { + try { + return Arrays.stream(x.getLocations()); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }).toArray(String[]::new); + } + + public static class Builder { + + // These are instances of HoodieRealtimeSplits + public List fileSplits = new ArrayList<>(); + + public void addSplit(FileSplit split) { + fileSplits.add(split); + } + + public HoodieCombineRealtimeFileSplit build(JobConf conf) { + return new HoodieCombineRealtimeFileSplit(conf, fileSplits); + } + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeHiveSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeHiveSplit.java new file mode 100644 index 000000000..c29e51b87 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeHiveSplit.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.hive; + +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.lib.CombineFileSplit; +import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat.CombineHiveInputSplit; + +/** + * Represents a CombineHiveInputSplit for realtime tables. + */ +public class HoodieCombineRealtimeHiveSplit extends CombineHiveInputSplit { + + public HoodieCombineRealtimeHiveSplit() throws IOException { + super(ShimLoader.getHadoopShims().getCombineFileInputFormat().getInputSplitShim()); + } + + public HoodieCombineRealtimeHiveSplit(JobConf jobConf, CombineFileSplit + combineFileSplit, Map map) + throws IOException { + super(jobConf, combineFileSplit, map); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 1c484e858..3edae5cf3 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -360,7 +360,9 @@ public abstract class AbstractRealtimeRecordReader { private Schema constructHiveOrderedSchema(Schema writerSchema, Map schemaFieldsMap) { // Get all column names of hive table String hiveColumnString = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS); + LOG.info("Hive Columns : " + hiveColumnString); String[] hiveColumns = hiveColumnString.split(","); + LOG.info("Hive Columns : " + hiveColumnString); List hiveSchemaFields = new ArrayList<>(); for (String columnName : hiveColumns) { @@ -378,6 +380,7 @@ public abstract class AbstractRealtimeRecordReader { Schema hiveSchema = Schema.createRecord(writerSchema.getName(), writerSchema.getDoc(), writerSchema.getNamespace(), writerSchema.isError()); hiveSchema.setFields(hiveSchemaFields); + LOG.info("HIVE Schema is :" + hiveSchema.toString(true)); return hiveSchema; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java new file mode 100644 index 000000000..bdf11ed75 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.lib.CombineFileSplit; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +/** + * Allows to read multiple realtime file splits grouped together by CombineInputFormat. + */ +public class HoodieCombineRealtimeRecordReader implements RecordReader { + + private static final transient Logger LOG = LogManager.getLogger(HoodieCombineRealtimeRecordReader.class); + // RecordReaders for each split + List recordReaders = new LinkedList<>(); + // Points to the currently iterating record reader + HoodieRealtimeRecordReader currentRecordReader; + + public HoodieCombineRealtimeRecordReader(JobConf jobConf, CombineFileSplit split, + List readers) { + try { + ValidationUtils.checkArgument(((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits().size() == readers + .size(), "Num Splits does not match number of unique RecordReaders!"); + for (InputSplit rtSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) { + LOG.info("Creating new RealtimeRecordReader for split"); + recordReaders.add( + new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) rtSplit, jobConf, readers.remove(0))); + } + currentRecordReader = recordReaders.remove(0); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean next(NullWritable key, ArrayWritable value) throws IOException { + if (this.currentRecordReader.next(key, value)) { + LOG.info("Reading from record reader"); + LOG.info(AbstractRealtimeRecordReader.arrayWritableToString(value)); + return true; + } else if (recordReaders.size() > 0) { + this.currentRecordReader.close(); + this.currentRecordReader = recordReaders.remove(0); + return this.currentRecordReader.next(key, value); + } else { + return false; + } + } + + @Override + public NullWritable createKey() { + return this.currentRecordReader.createKey(); + } + + @Override + public ArrayWritable createValue() { + return this.currentRecordReader.createValue(); + } + + @Override + public long getPos() throws IOException { + return this.currentRecordReader.getPos(); + } + + @Override + public void close() throws IOException { + this.currentRecordReader.close(); + } + + @Override + public float getProgress() throws IOException { + return this.currentRecordReader.getProgress(); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index ebb784df5..ce8680723 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -262,4 +262,4 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i public Configuration getConf() { return conf; } -} +} \ No newline at end of file diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java index c4b79cb10..a8af067e5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -66,6 +66,7 @@ public class HoodieRealtimeRecordReader implements RecordReader { @@ -84,7 +82,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get(); ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, getWriterSchema()); this.executor.getQueue().insertRecord(aWritable); - }); + }); // Start reading and buffering this.executor.startProducers(); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java index 0f7bc1e5c..a57540356 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java @@ -20,14 +20,23 @@ package org.apache.hudi.hadoop; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTestUtils; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieCommandBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.parquet.avro.AvroParquetWriter; import org.junit.rules.TemporaryFolder; @@ -37,9 +46,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.stream.Collectors; public class InputFormatTestUtil { @@ -202,4 +214,92 @@ public class InputFormatTestUtil { } } + + public static HoodieLogFormat.Writer writeRollback(File partitionDir, FileSystem fs, String fileId, String baseCommit, + String newCommit, String rolledBackInstant, int logVersion) + throws InterruptedException, IOException { + HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())).withFileId(fileId) + .overBaseCommit(baseCommit).withFs(fs).withLogVersion(logVersion).withLogWriteToken("1-0-1") + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + // generate metadata + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); + header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, rolledBackInstant); + header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + // if update belongs to an existing log file + writer = writer.appendBlock(new HoodieCommandBlock(header)); + return writer; + } + + public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, FileSystem fs, Schema schema, String + fileId, + String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) + throws InterruptedException, IOException { + HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion) + .withLogWriteToken("1-0-1").overBaseCommit(baseCommit).withFs(fs).build(); + List records = new ArrayList<>(); + for (int i = offset; i < offset + numberOfRecords; i++) { + records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0")); + } + Schema writeSchema = records.get(0).getSchema(); + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header); + writer = writer.appendBlock(dataBlock); + return writer; + } + + public static HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionDir, FileSystem fs, Schema schema, + String + fileId, String baseCommit, String newCommit, String oldCommit, int logVersion) + throws InterruptedException, IOException { + HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).overBaseCommit(baseCommit) + .withLogVersion(logVersion).withFs(fs).build(); + + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, oldCommit); + header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + HoodieCommandBlock rollbackBlock = new HoodieCommandBlock(header); + writer = writer.appendBlock(rollbackBlock); + return writer; + } + + public static void setPropsForInputFormat(JobConf jobConf, + Schema schema, String hiveColumnTypes) { + List fields = schema.getFields(); + String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); + String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + Configuration conf = HoodieTestUtils.getDefaultHadoopConf(); + + String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr")) + .map(Schema.Field::name).collect(Collectors.joining(",")); + hiveColumnNames = hiveColumnNames + ",datestr"; + String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes); + modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string"; + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr"); + conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr"); + conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); + jobConf.addResource(conf); + } + + public static void setInputPath(JobConf jobConf, String inputPath) { + jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath); + jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath); + jobConf.set("map.input.dir", inputPath); + } + } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 174175785..536e8f6ba 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -56,9 +56,23 @@ import static org.junit.Assert.assertTrue; public class TestHoodieParquetInputFormat { + @Rule + public TemporaryFolder basePath = new TemporaryFolder(); + private HoodieParquetInputFormat inputFormat; private JobConf jobConf; + public static void ensureFilesInCommit(String msg, FileStatus[] files, String commit, int expected) { + int count = 0; + for (FileStatus file : files) { + String commitTs = FSUtils.getCommitTime(file.getPath().getName()); + if (commit.equals(commitTs)) { + count++; + } + } + assertEquals(msg, expected, count); + } + @Before public void setUp() { inputFormat = new HoodieParquetInputFormat(); @@ -66,9 +80,6 @@ public class TestHoodieParquetInputFormat { inputFormat.setConf(jobConf); } - @Rule - public TemporaryFolder basePath = new TemporaryFolder(); - // Verify that HoodieParquetInputFormat does not return instants after pending compaction @Test public void testPendingCompactionWithActiveCommits() throws IOException { @@ -373,15 +384,4 @@ public class TestHoodieParquetInputFormat { assertEquals(msg, expectedNumberOfRecordsInCommit, actualCount); assertEquals(msg, totalExpected, totalCount); } - - public static void ensureFilesInCommit(String msg, FileStatus[] files, String commit, int expected) { - int count = 0; - for (FileStatus file : files) { - String commitTs = FSUtils.getCommitTime(file.getPath().getName()); - if (commit.equals(commitTs)) { - count++; - } - } - assertEquals(msg, expected, count); - } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieCombineHiveInputFormat.java new file mode 100644 index 000000000..b1839ff19 --- /dev/null +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieCombineHiveInputFormat.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK; +import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.HoodieCommonTestHarness; +import org.apache.hudi.common.minicluster.MiniClusterUtil; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieTestUtils; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.util.SchemaTestUtil; +import org.apache.hudi.hadoop.InputFormatTestUtil; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedHashMap; + +public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { + + @Rule + public TemporaryFolder basePath = new TemporaryFolder(); + private JobConf jobConf; + private FileSystem fs; + private Configuration hadoopConf; + + @BeforeClass + public static void setUpClass() throws IOException, InterruptedException { + // Append is not supported in LocalFileSystem. HDFS needs to be setup. + MiniClusterUtil.setUp(); + } + + @AfterClass + public static void tearDownClass() { + MiniClusterUtil.shutdown(); + } + + @Before + public void setUp() throws IOException, InterruptedException { + this.fs = MiniClusterUtil.fileSystem; + jobConf = new JobConf(); + hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); + assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath()))); + HoodieTestUtils.init(MiniClusterUtil.configuration, basePath.getRoot().getPath(), HoodieTableType.MERGE_ON_READ); + } + + @Test + @Ignore + public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception { + + Configuration conf = new Configuration(); + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + HoodieTestUtils.init(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); + String commitTime = "100"; + final int numRecords = 1000; + // Create 3 parquet files with 1000 records each + File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 3, numRecords, commitTime); + InputFormatTestUtil.commit(basePath, commitTime); + + // insert 1000 update records to log file 0 + String newCommitTime = "101"; + HoodieLogFormat.Writer writer = + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", commitTime, newCommitTime, + numRecords, numRecords, 0); + writer.close(); + // insert 1000 update records to log file 1 + writer = + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid1", commitTime, newCommitTime, + numRecords, numRecords, 0); + writer.close(); + // insert 1000 update records to log file 2 + writer = + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime, + numRecords, numRecords, 0); + writer.close(); + + TableDesc tblDesc = Utilities.defaultTd; + // Set the input format + tblDesc.setInputFileFormatClass(HoodieCombineHiveInputFormat.class); + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap pt = new LinkedHashMap<>(); + pt.put(new Path(basePath.getRoot().getAbsolutePath()), partDesc); + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + Path mapWorkPath = new Path(basePath.getRoot().getAbsolutePath()); + Utilities.setMapRedWork(conf, mrwork, mapWorkPath); + jobConf = new JobConf(conf); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + jobConf.set(HAS_MAP_WORK, "true"); + // The following config tells Hive to choose ExecMapper to read the MAP_WORK + jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); + // setting the split size to be 3 to create one split for 3 file groups + jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "3"); + + HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat(); + String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double"; + InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes); + InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1); + // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups + assertEquals(splits.length, 1); + RecordReader recordReader = + combineHiveInputFormat.getRecordReader(splits[0], jobConf, null); + NullWritable nullWritable = recordReader.createKey(); + ArrayWritable arrayWritable = recordReader.createValue(); + int counter = 0; + while (recordReader.next(nullWritable, arrayWritable)) { + // read over all the splits + counter++; + } + // should read out 3 splits, each for file0, file1, file2 containing 1000 records each + assertEquals(3000, counter); + } + +} diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 33856ae0a..be444b42f 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -26,11 +26,6 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieCommandBlock; -import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.hudi.common.util.collection.Pair; @@ -39,7 +34,6 @@ import org.apache.hudi.hadoop.InputFormatTestUtil; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; -import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -68,10 +62,8 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -83,7 +75,8 @@ import static org.junit.Assert.assertTrue; public class TestHoodieRealtimeRecordReader { private static final String PARTITION_COLUMN = "datestr"; - + @Rule + public TemporaryFolder basePath = new TemporaryFolder(); private JobConf jobConf; private FileSystem fs; private Configuration hadoopConf; @@ -96,64 +89,12 @@ public class TestHoodieRealtimeRecordReader { fs = FSUtils.getFs(basePath.getRoot().getAbsolutePath(), hadoopConf); } - @Rule - public TemporaryFolder basePath = new TemporaryFolder(); - - private Writer writeLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String newCommit, + private Writer writeLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String + newCommit, int numberOfRecords) throws InterruptedException, IOException { - return writeDataBlockToLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, 0, 0); - } - - private Writer writeRollback(File partitionDir, Schema schema, String fileId, String baseCommit, String newCommit, - String rolledBackInstant, int logVersion) throws InterruptedException, IOException { - Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())).withFileId(fileId) - .overBaseCommit(baseCommit).withFs(fs).withLogVersion(logVersion).withLogWriteToken("1-0-1") - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - // generate metadata - Map header = new HashMap<>(); - header.put(HeaderMetadataType.INSTANT_TIME, newCommit); - header.put(HeaderMetadataType.TARGET_INSTANT_TIME, rolledBackInstant); - header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); - // if update belongs to an existing log file - writer = writer.appendBlock(new HoodieCommandBlock(header)); - return writer; - } - - private HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, Schema schema, String fileId, - String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) - throws InterruptedException, IOException { - HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion) - .withLogWriteToken("1-0-1").overBaseCommit(baseCommit).withFs(fs).build(); - List records = new ArrayList<>(); - for (int i = offset; i < offset + numberOfRecords; i++) { - records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0")); - } - Schema writeSchema = records.get(0).getSchema(); - Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); - HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header); - writer = writer.appendBlock(dataBlock); - return writer; - } - - private HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionDir, Schema schema, String fileId, - String baseCommit, String newCommit, String oldCommit, int logVersion) throws InterruptedException, IOException { - HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).overBaseCommit(baseCommit) - .withLogVersion(logVersion).withFs(fs).build(); - - Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); - header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, oldCommit); - header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); - HoodieCommandBlock rollbackBlock = new HoodieCommandBlock(header); - writer = writer.appendBlock(rollbackBlock); - return writer; + return InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, fileId, baseCommit, newCommit, + numberOfRecords, 0, + 0); } @Test @@ -213,11 +154,12 @@ public class TestHoodieRealtimeRecordReader { HoodieLogFormat.Writer writer; if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) { - writer = writeRollback(partitionDir, schema, "fileid0", baseInstant, instantTime, + writer = InputFormatTestUtil.writeRollback(partitionDir, fs, "fileid0", baseInstant, instantTime, String.valueOf(baseInstantTs + logVersion - 1), logVersion); } else { writer = - writeDataBlockToLogFile(partitionDir, schema, "fileid0", baseInstant, instantTime, 100, 0, logVersion); + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant, + instantTime, 100, 0, logVersion); } long size = writer.getCurrentSize(); writer.close(); @@ -228,7 +170,7 @@ public class TestHoodieRealtimeRecordReader { HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, jobConf), basePath.getRoot().getPath(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(h -> h.getPath().toString()).collect(Collectors.toList()), + .map(h -> h.getPath().toString()).collect(Collectors.toList()), instantTime); // create a RecordReader to be used by HoodieRealtimeRecordReader @@ -281,7 +223,8 @@ public class TestHoodieRealtimeRecordReader { // insert new records to log file String newCommitTime = "101"; HoodieLogFormat.Writer writer = - writeDataBlockToLogFile(partitionDir, schema, "fileid0", instantTime, newCommitTime, numRecords, numRecords, 0); + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, + numRecords, numRecords, 0); long size = writer.getCurrentSize(); writer.close(); assertTrue("block - size should be > 0", size > 0); @@ -302,7 +245,7 @@ public class TestHoodieRealtimeRecordReader { jobConf.set(REALTIME_SKIP_MERGE_PROP, "true"); // validate unmerged record reader - HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); + RealtimeUnmergedRecordReader recordReader = new RealtimeUnmergedRecordReader(split, jobConf, reader); // use reader to read base Parquet File and log file // here all records should be present. Also ensure log records are in order. @@ -486,7 +429,8 @@ public class TestHoodieRealtimeRecordReader { schema = SchemaTestUtil.getComplexEvolvedSchema(); String newCommitTime = "101"; HoodieLogFormat.Writer writer = - writeDataBlockToLogFile(partitionDir, schema, "fileid0", instantTime, newCommitTime, numberOfLogRecords, 0, 1); + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, + numberOfLogRecords, 0, 1); long size = writer.getCurrentSize(); logFilePaths.add(writer.getLogFile().getPath().toString()); writer.close(); @@ -494,7 +438,8 @@ public class TestHoodieRealtimeRecordReader { // write rollback for the previous block in new log file version newCommitTime = "102"; - writer = writeRollbackBlockToLogFile(partitionDir, schema, "fileid0", instantTime, newCommitTime, "101", 1); + writer = InputFormatTestUtil.writeRollbackBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, + newCommitTime, "101", 1); logFilePaths.add(writer.getLogFile().getPath().toString()); writer.close(); InputFormatTestUtil.deltaCommit(basePath, newCommitTime);