diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java similarity index 96% rename from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java rename to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index 2a62cc898..efb198ad6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -27,7 +27,10 @@ import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieLocalEngineContext; @@ -48,6 +51,7 @@ import scala.collection.Seq; import javax.annotation.Nonnull; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -70,7 +74,7 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState; * * NOTE: This class is invariant of the underlying file-format of the files being read */ -public abstract class HoodieFileInputFormatBase extends FileInputFormat +public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat implements Configurable { protected Configuration conf; @@ -103,8 +107,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + throw new UnsupportedEncodingException("not implemented"); + } + + protected boolean includeLogFilesForSnapshotView() { + return false; + } + /** * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified @@ -198,16 +219,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat getRecordReaderInternal(InputSplit split, JobConf job, Reporter reporter) throws IOException { - return mapredParquetInputFormat.getRecordReader(split, job, reporter); + return super.getRecordReader(split, job, reporter); } private RecordReader createBootstrappingRecordReader(InputSplit split, diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormatBase.java new file mode 100644 index 000000000..4f8d07bf8 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormatBase.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hudi.hadoop.realtime.HoodieMergeOnReadTableInputFormat; + +import java.io.IOException; + +/** + * !!! PLEASE READ CAREFULLY !!! + * + * NOTE: Hive bears optimizations which are based upon validating whether {@link FileInputFormat} + * implementation inherits from {@link MapredParquetInputFormat}. + * + * To make sure that Hudi implementations are leveraging these optimizations to the fullest, this class + * serves as a base-class for every {@link FileInputFormat} implementations working with Parquet file-format. + * + * However, this class serves as a simple delegate to the actual implementation hierarchy: it expects + * either {@link HoodieCopyOnWriteTableInputFormat} or {@link HoodieMergeOnReadTableInputFormat} to be supplied + * to which it delegates all of its necessary methods. + */ +public abstract class HoodieParquetInputFormatBase extends MapredParquetInputFormat implements Configurable { + + private final HoodieCopyOnWriteTableInputFormat inputFormatDelegate; + + protected HoodieParquetInputFormatBase(HoodieCopyOnWriteTableInputFormat inputFormatDelegate) { + this.inputFormatDelegate = inputFormatDelegate; + } + + @Override + public final void setConf(Configuration conf) { + inputFormatDelegate.setConf(conf); + } + + @Override + public final Configuration getConf() { + return inputFormatDelegate.getConf(); + } + + @Override + public final InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return inputFormatDelegate.getSplits(job, numSplits); + } + + @Override + protected final boolean isSplitable(FileSystem fs, Path filename) { + return inputFormatDelegate.isSplitable(fs, filename); + } + + @Override + protected final FileSplit makeSplit(Path file, long start, long length, + String[] hosts) { + return inputFormatDelegate.makeSplit(file, start, length, hosts); + } + + @Override + protected final FileSplit makeSplit(Path file, long start, long length, + String[] hosts, String[] inMemoryHosts) { + return inputFormatDelegate.makeSplit(file, start, length, hosts, inMemoryHosts); + } + + @Override + public final FileStatus[] listStatus(JobConf job) throws IOException { + return inputFormatDelegate.listStatus(job); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index f23e6ac86..8736883ce 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -18,13 +18,6 @@ package org.apache.hudi.hadoop.hive; -import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.hadoop.HoodieFileInputFormatBase; -import org.apache.hudi.hadoop.HoodieParquetInputFormat; -import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader; -import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -63,6 +56,13 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileSplit; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.HoodieParquetInputFormatBase; +import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -877,7 +877,7 @@ public class HoodieCombineHiveInputFormat result; if (hoodieFilter) { - HoodieFileInputFormatBase input; + HoodieParquetInputFormatBase input; if (isRealTime) { LOG.info("Using HoodieRealtimeInputFormat"); input = createParquetRealtimeInputFormat(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java index 56f10b137..799d90bce 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java @@ -44,7 +44,7 @@ import java.io.IOException; */ @UseRecordReaderFromInputFormat @UseFileSplitsFromInputFormat -public class HoodieHFileRealtimeInputFormat extends HoodieRealtimeFileInputFormatBase { +public class HoodieHFileRealtimeInputFormat extends HoodieMergeOnReadTableInputFormat { private static final Logger LOG = LogManager.getLogger(HoodieHFileRealtimeInputFormat.class); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java similarity index 97% rename from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileInputFormatBase.java rename to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 2fe3bbdc1..7b482f415 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileInputFormatBase.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -39,14 +39,12 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; -import org.apache.hudi.hadoop.HoodieFileInputFormatBase; +import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat; import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile; import org.apache.hudi.hadoop.PathWithLogFilePath; import org.apache.hudi.hadoop.RealtimeFileStatus; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayList; @@ -68,9 +66,7 @@ import java.util.stream.Collectors; * * NOTE: This class is invariant of the underlying file-format of the files being read */ -public abstract class HoodieRealtimeFileInputFormatBase extends HoodieFileInputFormatBase implements Configurable { - - private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class); +public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInputFormat implements Configurable { @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index 5f8b11cb1..e204c3b07 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -18,7 +18,6 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; @@ -43,12 +42,13 @@ import java.io.IOException; */ @UseRecordReaderFromInputFormat @UseFileSplitsFromInputFormat -public class HoodieParquetRealtimeInputFormat extends HoodieRealtimeFileInputFormatBase implements Configurable { +public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat { private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class); - // NOTE: We're only using {@code HoodieParquetInputFormat} to compose {@code RecordReader} - private final HoodieParquetInputFormat parquetInputFormat = new HoodieParquetInputFormat(); + public HoodieParquetRealtimeInputFormat() { + super(new HoodieMergeOnReadTableInputFormat()); + } // To make Hive on Spark queries work with RT tables. Our theory is that due to // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher} @@ -71,7 +71,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieRealtimeFileInputFor } return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, - parquetInputFormat.getRecordReader(split, jobConf, reporter)); + super.getRecordReader(split, jobConf, reporter)); } void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) { @@ -96,8 +96,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieRealtimeFileInputFor if (!realtimeSplit.getDeltaLogPaths().isEmpty()) { HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getHoodieVirtualKeyInfo()); } - this.conf = jobConf; - this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true"); + jobConf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true"); + setConf(jobConf); } } }