1
0

[HUDI-3276] Rebased Parquet-based FileInputFormat impls to inherit from MapredParquetInputFormat (#4667)

Rebased Parquet-based FileInputFormat impls to inherit from MapredParquetInputFormat, to make sure that Hive is appropriately recognizing those impls and applying corresponding optimizations.

- Converted HoodieRealtimeFileInputFormatBase and HoodieFileInputFormatBase into standalone implementations that could be instantiated as standalone objects (which could be used for delegation)
- Renamed HoodieFileInputFormatBase > HoodieCopyOnWriteTableInputFormat, HoodieRealtimeFileInputFormatBase > HoodieMergeOnReadTableInputFormat
- Scaffolded HoodieParquetFileInputFormatBase for all Parquet impls to inherit from
- Rebased Parquet impls onto HoodieParquetFileInputFormatBase
This commit is contained in:
Alexey Kudinkin
2022-02-08 12:21:45 -08:00
committed by GitHub
parent 60831d6906
commit 973087f385
8 changed files with 142 additions and 45 deletions

View File

@@ -27,7 +27,10 @@ import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
@@ -48,6 +51,7 @@ import scala.collection.Seq;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -70,7 +74,7 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState;
*
* NOTE: This class is invariant of the underlying file-format of the files being read
*/
public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWritable, ArrayWritable>
public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat<NullWritable, ArrayWritable>
implements Configurable {
protected Configuration conf;
@@ -103,8 +107,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
this.conf = conf;
}
protected abstract boolean includeLogFilesForSnapshotView();
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return !(filename instanceof PathWithBootstrapFileStatus);
@@ -121,6 +123,16 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
return split;
}
@Override
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
FileSplit split = new FileSplit(file, start, length, hosts, inMemoryHosts);
if (file instanceof PathWithBootstrapFileStatus) {
return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
}
return split;
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
@@ -161,6 +173,15 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
return returns.toArray(new FileStatus[0]);
}
@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
throw new UnsupportedEncodingException("not implemented");
}
protected boolean includeLogFilesForSnapshotView() {
return false;
}
/**
* Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
* lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
@@ -198,16 +219,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
@Override
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
FileSplit split = new FileSplit(file, start, length, hosts, inMemoryHosts);
if (file instanceof PathWithBootstrapFileStatus) {
return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
}
return split;
}
private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
try {
LOG.info("Making external data split for " + file);

View File

@@ -35,7 +35,7 @@ import java.io.IOException;
* HoodieInputFormat for HUDI datasets which store data in HFile base file format.
*/
@UseFileSplitsFromInputFormat
public class HoodieHFileInputFormat extends HoodieFileInputFormatBase {
public class HoodieHFileInputFormat extends HoodieCopyOnWriteTableInputFormat {
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.hadoop;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.io.ArrayWritable;
@@ -47,16 +45,16 @@ import java.util.stream.IntStream;
*/
@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implements Configurable {
public class HoodieParquetInputFormat extends HoodieParquetInputFormatBase {
private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class);
// NOTE: We're only using {@code MapredParquetInputFormat} to compose vectorized
// {@code RecordReader}
private final MapredParquetInputFormat mapredParquetInputFormat = new MapredParquetInputFormat();
public HoodieParquetInputFormat() {
super(new HoodieCopyOnWriteTableInputFormat());
}
protected boolean includeLogFilesForSnapshotView() {
return false;
protected HoodieParquetInputFormat(HoodieCopyOnWriteTableInputFormat delegate) {
super(delegate);
}
@Override
@@ -88,7 +86,7 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen
private RecordReader<NullWritable, ArrayWritable> getRecordReaderInternal(InputSplit split,
JobConf job,
Reporter reporter) throws IOException {
return mapredParquetInputFormat.getRecordReader(split, job, reporter);
return super.getRecordReader(split, job, reporter);
}
private RecordReader<NullWritable, ArrayWritable> createBootstrappingRecordReader(InputSplit split,

View File

@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.hadoop.realtime.HoodieMergeOnReadTableInputFormat;
import java.io.IOException;
/**
* !!! PLEASE READ CAREFULLY !!!
*
* NOTE: Hive bears optimizations which are based upon validating whether {@link FileInputFormat}
* implementation inherits from {@link MapredParquetInputFormat}.
*
* To make sure that Hudi implementations are leveraging these optimizations to the fullest, this class
* serves as a base-class for every {@link FileInputFormat} implementations working with Parquet file-format.
*
* However, this class serves as a simple delegate to the actual implementation hierarchy: it expects
* either {@link HoodieCopyOnWriteTableInputFormat} or {@link HoodieMergeOnReadTableInputFormat} to be supplied
* to which it delegates all of its necessary methods.
*/
public abstract class HoodieParquetInputFormatBase extends MapredParquetInputFormat implements Configurable {
private final HoodieCopyOnWriteTableInputFormat inputFormatDelegate;
protected HoodieParquetInputFormatBase(HoodieCopyOnWriteTableInputFormat inputFormatDelegate) {
this.inputFormatDelegate = inputFormatDelegate;
}
@Override
public final void setConf(Configuration conf) {
inputFormatDelegate.setConf(conf);
}
@Override
public final Configuration getConf() {
return inputFormatDelegate.getConf();
}
@Override
public final InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
return inputFormatDelegate.getSplits(job, numSplits);
}
@Override
protected final boolean isSplitable(FileSystem fs, Path filename) {
return inputFormatDelegate.isSplitable(fs, filename);
}
@Override
protected final FileSplit makeSplit(Path file, long start, long length,
String[] hosts) {
return inputFormatDelegate.makeSplit(file, start, length, hosts);
}
@Override
protected final FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
return inputFormatDelegate.makeSplit(file, start, length, hosts, inMemoryHosts);
}
@Override
public final FileStatus[] listStatus(JobConf job) throws IOException {
return inputFormatDelegate.listStatus(job);
}
}

View File

@@ -18,13 +18,6 @@
package org.apache.hudi.hadoop.hive;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieFileInputFormatBase;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -63,6 +56,13 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormatBase;
import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -877,7 +877,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
LOG.info("Listing status in HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim");
List<FileStatus> result;
if (hoodieFilter) {
HoodieFileInputFormatBase input;
HoodieParquetInputFormatBase input;
if (isRealTime) {
LOG.info("Using HoodieRealtimeInputFormat");
input = createParquetRealtimeInputFormat();

View File

@@ -44,7 +44,7 @@ import java.io.IOException;
*/
@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
public class HoodieHFileRealtimeInputFormat extends HoodieRealtimeFileInputFormatBase {
public class HoodieHFileRealtimeInputFormat extends HoodieMergeOnReadTableInputFormat {
private static final Logger LOG = LogManager.getLogger(HoodieHFileRealtimeInputFormat.class);

View File

@@ -39,14 +39,12 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HoodieFileInputFormatBase;
import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.PathWithLogFilePath;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
@@ -68,9 +66,7 @@ import java.util.stream.Collectors;
*
* NOTE: This class is invariant of the underlying file-format of the files being read
*/
public abstract class HoodieRealtimeFileInputFormatBase extends HoodieFileInputFormatBase implements Configurable {
private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInputFormat implements Configurable {
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
@@ -43,12 +42,13 @@ import java.io.IOException;
*/
@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
public class HoodieParquetRealtimeInputFormat extends HoodieRealtimeFileInputFormatBase implements Configurable {
public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat {
private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
// NOTE: We're only using {@code HoodieParquetInputFormat} to compose {@code RecordReader}
private final HoodieParquetInputFormat parquetInputFormat = new HoodieParquetInputFormat();
public HoodieParquetRealtimeInputFormat() {
super(new HoodieMergeOnReadTableInputFormat());
}
// To make Hive on Spark queries work with RT tables. Our theory is that due to
// {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
@@ -71,7 +71,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieRealtimeFileInputFor
}
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
parquetInputFormat.getRecordReader(split, jobConf, reporter));
super.getRecordReader(split, jobConf, reporter));
}
void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
@@ -96,8 +96,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieRealtimeFileInputFor
if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getHoodieVirtualKeyInfo());
}
this.conf = jobConf;
this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
jobConf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
setConf(jobConf);
}
}
}