1
0

[HUDI-3094] Unify Hive's InputFormat implementations to avoid duplication (#4417)

This commit is contained in:
Alexey Kudinkin
2022-01-11 15:02:13 -08:00
committed by GitHub
parent 4b2fd37fb4
commit 6cdcd89afa
6 changed files with 226 additions and 268 deletions

View File

@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's
* Copy-on-Write (COW) tables in various configurations:
*
* <ul>
* <li>Snapshot mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)</li>
* <li>Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)</li>
* <li>External mode: reading non-Hudi partitions</li>
* </ul>
*/
public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWritable, ArrayWritable>
implements Configurable {
protected Configuration conf;
protected abstract boolean includeLogFilesForSnapShotView();
@Override
public final Configuration getConf() {
return conf;
}
@Override
public final void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
List<String> incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job));
InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
List<FileStatus> returns = new ArrayList<>();
Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
// process incremental pulls first
for (String table : incrementalTables) {
HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
if (metaClient == null) {
/* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
* in the jobConf
*/
continue;
}
List<Path> inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths);
if (result != null) {
returns.addAll(result);
}
}
// process non hoodie Paths next.
List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
if (nonHoodiePaths.size() > 0) {
setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
FileStatus[] fileStatuses = doListStatus(job);
returns.addAll(Arrays.asList(fileStatuses));
}
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()));
}
return returns.toArray(new FileStatus[0]);
}
/**
* Achieves listStatus functionality for an incrementally queried table. Instead of listing all
* partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions.
*/
protected List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
String tableName = tableMetaClient.getTableConfig().getTableName();
Job jobContext = Job.getInstance(job);
Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
if (!timeline.isPresent()) {
return null;
}
Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
if (!commitsToCheck.isPresent()) {
return null;
}
Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
// Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
if (!incrementalInputPaths.isPresent()) {
return null;
}
setInputPaths(job, incrementalInputPaths.get());
FileStatus[] fileStatuses = doListStatus(job);
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
/**
* Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
* lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
* as part of provided {@link JobConf}
*/
protected final FileStatus[] doListStatus(JobConf job) throws IOException {
return super.listStatus(job);
}
}

View File

@@ -18,114 +18,32 @@
package org.apache.hudi.hadoop;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* HoodieInputFormat for HUDI datasets which store data in HFile base file format.
*/
@UseFileSplitsFromInputFormat
public class HoodieHFileInputFormat extends FileInputFormat<NullWritable, ArrayWritable> implements Configurable {
private static final Logger LOG = LogManager.getLogger(HoodieHFileInputFormat.class);
protected Configuration conf;
public class HoodieHFileInputFormat extends HoodieFileInputFormatBase {
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
List<String> incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job));
InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
List<FileStatus> returns = new ArrayList<>();
Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
// process incremental pulls first
for (String table : incrementalTables) {
HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
if (metaClient == null) {
/* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
* in the jobConf
*/
continue;
}
List<Path> inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths);
if (result != null) {
returns.addAll(result);
}
}
// process non hoodie Paths next.
List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
if (nonHoodiePaths.size() > 0) {
setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
FileStatus[] fileStatuses = super.listStatus(job);
returns.addAll(Arrays.asList(fileStatuses));
}
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths));
}
return returns.toArray(new FileStatus[0]);
}
/**
* Achieves listStatus functionality for an incrementally queried table. Instead of listing all
* partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions.
*/
private List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
String tableName = tableMetaClient.getTableConfig().getTableName();
Job jobContext = Job.getInstance(job);
Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
if (!timeline.isPresent()) {
return null;
}
Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
if (!commitsToCheck.isPresent()) {
return null;
}
Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
// Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
if (!incrementalInputPaths.isPresent()) {
return null;
}
setInputPaths(job, incrementalInputPaths.get());
FileStatus[] fileStatuses = super.listStatus(job);
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
protected boolean includeLogFilesForSnapShotView() {
return false;
}
@Override
@@ -139,13 +57,4 @@ public class HoodieHFileInputFormat extends FileInputFormat<NullWritable, ArrayW
// This file isn't splittable.
return false;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
}

View File

@@ -19,18 +19,13 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,15 +39,11 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -63,102 +54,22 @@ import java.util.stream.IntStream;
*/
@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
public class HoodieParquetInputFormat extends MapredParquetInputFormat implements Configurable {
public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implements Configurable {
private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class);
protected Configuration conf;
// NOTE: We're only using {@code MapredParquetInputFormat} to compose vectorized
// {@code RecordReader}
private final MapredParquetInputFormat mapredParquetInputFormat = new MapredParquetInputFormat();
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
}
protected FileStatus[] getStatus(JobConf job) throws IOException {
return super.listStatus(job);
}
protected boolean includeLogFilesForSnapShotView() {
return false;
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
List<String> incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job));
InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
List<FileStatus> returns = new ArrayList<>();
Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
// process incremental pulls first
for (String table : incrementalTables) {
HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
if (metaClient == null) {
/* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
* in the jobConf
*/
continue;
}
List<Path> inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths);
if (result != null) {
returns.addAll(result);
}
}
// process non hoodie Paths next.
List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
if (nonHoodiePaths.size() > 0) {
setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
FileStatus[] fileStatuses = super.listStatus(job);
returns.addAll(Arrays.asList(fileStatuses));
}
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()));
}
return returns.toArray(new FileStatus[0]);
}
/**
* Achieves listStatus functionality for an incrementally queried table. Instead of listing all
* partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions.
*/
protected List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
String tableName = tableMetaClient.getTableConfig().getTableName();
Job jobContext = Job.getInstance(job);
Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
if (!timeline.isPresent()) {
return null;
}
Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
if (!commitsToCheck.isPresent()) {
return null;
}
Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
// Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
if (!incrementalInputPaths.isPresent()) {
return null;
}
setInputPaths(job, incrementalInputPaths.get());
FileStatus[] fileStatuses = super.listStatus(job);
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
final Reporter reporter) throws IOException {
@@ -175,53 +86,14 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
// clearOutExistingPredicate(job);
// }
if (split instanceof BootstrapBaseFileSplit) {
BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
String[] rawColNames = HoodieColumnProjectionUtils.getReadColumnNames(job);
List<Integer> rawColIds = HoodieColumnProjectionUtils.getReadColumnIDs(job);
List<Pair<Integer, String>> projectedColsWithIndex =
IntStream.range(0, rawColIds.size()).mapToObj(idx -> Pair.of(rawColIds.get(idx), rawColNames[idx]))
.collect(Collectors.toList());
List<Pair<Integer, String>> hoodieColsProjected = projectedColsWithIndex.stream()
.filter(idxWithName -> HoodieRecord.HOODIE_META_COLUMNS.contains(idxWithName.getValue()))
.collect(Collectors.toList());
List<Pair<Integer, String>> externalColsProjected = projectedColsWithIndex.stream()
.filter(idxWithName -> !HoodieRecord.HOODIE_META_COLUMNS.contains(idxWithName.getValue())
&& !HoodieHiveUtils.VIRTUAL_COLUMN_NAMES.contains(idxWithName.getValue()))
.collect(Collectors.toList());
// This always matches hive table description
List<Pair<String, String>> colNameWithTypes = HoodieColumnProjectionUtils.getIOColumnNameAndTypes(job);
List<Pair<String, String>> colNamesWithTypesForExternal = colNameWithTypes.stream()
.filter(p -> !HoodieRecord.HOODIE_META_COLUMNS.contains(p.getKey())).collect(Collectors.toList());
LOG.info("colNameWithTypes =" + colNameWithTypes + ", Num Entries =" + colNameWithTypes.size());
if (hoodieColsProjected.isEmpty()) {
return super.getRecordReader(eSplit.getBootstrapFileSplit(), job, reporter);
} else if (externalColsProjected.isEmpty()) {
return super.getRecordReader(split, job, reporter);
} else {
FileSplit rightSplit = eSplit.getBootstrapFileSplit();
// Hive PPD works at row-group level and only enabled when hive.optimize.index.filter=true;
// The above config is disabled by default. But when enabled, would cause misalignment between
// skeleton and bootstrap file. We will disable them specifically when query needs bootstrap and skeleton
// file to be stitched.
// This disables row-group filtering
JobConf jobConfCopy = new JobConf(job);
jobConfCopy.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
jobConfCopy.unset(ConvertAstToSearchArg.SARG_PUSHDOWN);
LOG.info("Generating column stitching reader for " + eSplit.getPath() + " and " + rightSplit.getPath());
return new BootstrapColumnStichingRecordReader(super.getRecordReader(eSplit, jobConfCopy, reporter),
HoodieRecord.HOODIE_META_COLUMNS.size(),
super.getRecordReader(rightSplit, jobConfCopy, reporter),
colNamesWithTypesForExternal.size(),
true);
}
return createBootstrappingRecordReader(split, job, reporter);
}
if (LOG.isDebugEnabled()) {
LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split);
}
return super.getRecordReader(split, job, reporter);
return getRecordReaderInternal(split, job, reporter);
}
@Override
@@ -250,6 +122,61 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
return split;
}
private RecordReader<NullWritable, ArrayWritable> getRecordReaderInternal(InputSplit split,
JobConf job,
Reporter reporter) throws IOException {
return mapredParquetInputFormat.getRecordReader(split, job, reporter);
}
private RecordReader<NullWritable, ArrayWritable> createBootstrappingRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException {
BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
String[] rawColNames = HoodieColumnProjectionUtils.getReadColumnNames(job);
List<Integer> rawColIds = HoodieColumnProjectionUtils.getReadColumnIDs(job);
List<Pair<Integer, String>> projectedColsWithIndex =
IntStream.range(0, rawColIds.size()).mapToObj(idx -> Pair.of(rawColIds.get(idx), rawColNames[idx]))
.collect(Collectors.toList());
List<Pair<Integer, String>> hoodieColsProjected = projectedColsWithIndex.stream()
.filter(idxWithName -> HoodieRecord.HOODIE_META_COLUMNS.contains(idxWithName.getValue()))
.collect(Collectors.toList());
List<Pair<Integer, String>> externalColsProjected = projectedColsWithIndex.stream()
.filter(idxWithName -> !HoodieRecord.HOODIE_META_COLUMNS.contains(idxWithName.getValue())
&& !HoodieHiveUtils.VIRTUAL_COLUMN_NAMES.contains(idxWithName.getValue()))
.collect(Collectors.toList());
// This always matches hive table description
List<Pair<String, String>> colNameWithTypes = HoodieColumnProjectionUtils.getIOColumnNameAndTypes(job);
List<Pair<String, String>> colNamesWithTypesForExternal = colNameWithTypes.stream()
.filter(p -> !HoodieRecord.HOODIE_META_COLUMNS.contains(p.getKey())).collect(Collectors.toList());
LOG.info("colNameWithTypes =" + colNameWithTypes + ", Num Entries =" + colNameWithTypes.size());
if (hoodieColsProjected.isEmpty()) {
return getRecordReaderInternal(eSplit.getBootstrapFileSplit(), job, reporter);
} else if (externalColsProjected.isEmpty()) {
return getRecordReaderInternal(split, job, reporter);
} else {
FileSplit rightSplit = eSplit.getBootstrapFileSplit();
// Hive PPD works at row-group level and only enabled when hive.optimize.index.filter=true;
// The above config is disabled by default. But when enabled, would cause misalignment between
// skeleton and bootstrap file. We will disable them specifically when query needs bootstrap and skeleton
// file to be stitched.
// This disables row-group filtering
JobConf jobConfCopy = new JobConf(job);
jobConfCopy.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
jobConfCopy.unset(ConvertAstToSearchArg.SARG_PUSHDOWN);
LOG.info("Generating column stitching reader for " + eSplit.getPath() + " and " + rightSplit.getPath());
return new BootstrapColumnStichingRecordReader(getRecordReaderInternal(eSplit, jobConfCopy, reporter),
HoodieRecord.HOODIE_META_COLUMNS.size(),
getRecordReaderInternal(rightSplit, jobConfCopy, reporter),
colNamesWithTypesForExternal.size(),
true);
}
}
private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
try {
LOG.info("Making external data split for " + file);

View File

@@ -18,16 +18,6 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
@@ -36,6 +26,14 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -58,12 +56,6 @@ public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Call the HoodieInputFormat::listStatus to obtain all latest hfiles, based on commit timeline.
return super.listStatus(job);
}
@Override
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
// no specific filtering for Realtime format

View File

@@ -46,7 +46,6 @@ import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
@@ -89,7 +88,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
return isIncrementalSplits ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream());
return isIncrementalSplits
? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream())
: HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream());
}
/**
@@ -159,7 +160,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
// step5
// find all file status in partitionPaths.
FileStatus[] fileStatuses = getStatus(job);
FileStatus[] fileStatuses = doListStatus(job);
Map<String, FileStatus> candidateFileStatus = new HashMap<>();
for (int i = 0; i < fileStatuses.length; i++) {
String key = fileStatuses[i].getPath().toString();
@@ -261,13 +262,6 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
}
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
// timeline.
return super.listStatus(job);
}
@Override
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
// no specific filtering for Realtime format
@@ -322,9 +316,4 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
super.getRecordReader(split, jobConf, reporter));
}
@Override
public Configuration getConf() {
return conf;
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop.utils;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -268,7 +269,7 @@ public class HoodieInputFormatUtils {
* @param tableMetaClient
* @return
*/
public static Option<HoodieTimeline> getFilteredCommitsTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
public static Option<HoodieTimeline> getFilteredCommitsTimeline(JobContext job, HoodieTableMetaClient tableMetaClient) {
String tableName = tableMetaClient.getTableConfig().getTableName();
HoodieDefaultTimeline baseTimeline;
if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
@@ -299,7 +300,7 @@ public class HoodieInputFormatUtils {
* @param timeline
* @return
*/
public static HoodieTimeline getHoodieTimelineForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) {
public static HoodieTimeline getHoodieTimelineForIncrementalQuery(JobContext job, String tableName, HoodieTimeline timeline) {
String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
@@ -439,11 +440,6 @@ public class HoodieInputFormatUtils {
.build();
}
public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap,
List<Path> snapshotPaths) throws IOException {
return filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, false);
}
public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap,
List<Path> snapshotPaths, boolean includeLogFiles) throws IOException {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);