diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
new file mode 100644
index 000000000..821e467e9
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's
+ * Copy-on-Write (COW) tables in various configurations:
+ *
+ *
+ * - Snapshot mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)
+ * - Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)
+ * - External mode: reading non-Hudi partitions
+ *
+ */
+public abstract class HoodieFileInputFormatBase extends FileInputFormat
+ implements Configurable {
+
+ protected Configuration conf;
+
+ protected abstract boolean includeLogFilesForSnapShotView();
+
+ @Override
+ public final Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public final void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public FileStatus[] listStatus(JobConf job) throws IOException {
+ // Segregate inputPaths[] to incremental, snapshot and non hoodie paths
+ List incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job));
+ InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
+ List returns = new ArrayList<>();
+
+ Map tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
+ // process incremental pulls first
+ for (String table : incrementalTables) {
+ HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+ if (metaClient == null) {
+ /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
+ * in the jobConf
+ */
+ continue;
+ }
+ List inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
+ List result = listStatusForIncrementalMode(job, metaClient, inputPaths);
+ if (result != null) {
+ returns.addAll(result);
+ }
+ }
+
+ // process non hoodie Paths next.
+ List nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
+ if (nonHoodiePaths.size() > 0) {
+ setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
+ FileStatus[] fileStatuses = doListStatus(job);
+ returns.addAll(Arrays.asList(fileStatuses));
+ }
+
+ // process snapshot queries next.
+ List snapshotPaths = inputPathHandler.getSnapshotPaths();
+ if (snapshotPaths.size() > 0) {
+ returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()));
+ }
+ return returns.toArray(new FileStatus[0]);
+ }
+
+ /**
+ * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
+ * partitions and then filtering based on the commits of interest, this logic first extracts the
+ * partitions touched by the desired commits and then lists only those partitions.
+ */
+ protected List listStatusForIncrementalMode(
+ JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException {
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ Job jobContext = Job.getInstance(job);
+ Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+ if (!timeline.isPresent()) {
+ return null;
+ }
+ Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
+ if (!commitsToCheck.isPresent()) {
+ return null;
+ }
+ Option incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
+ // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
+ if (!incrementalInputPaths.isPresent()) {
+ return null;
+ }
+ setInputPaths(job, incrementalInputPaths.get());
+ FileStatus[] fileStatuses = doListStatus(job);
+ return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+ }
+
+ /**
+ * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
+ * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
+ * as part of provided {@link JobConf}
+ */
+ protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+ return super.listStatus(job);
+ }
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
index e3bac0b4e..2baf140e2 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
@@ -18,114 +18,32 @@
package org.apache.hudi.hadoop;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
/**
* HoodieInputFormat for HUDI datasets which store data in HFile base file format.
*/
@UseFileSplitsFromInputFormat
-public class HoodieHFileInputFormat extends FileInputFormat implements Configurable {
-
- private static final Logger LOG = LogManager.getLogger(HoodieHFileInputFormat.class);
-
- protected Configuration conf;
+public class HoodieHFileInputFormat extends HoodieFileInputFormatBase {
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
}
@Override
- public FileStatus[] listStatus(JobConf job) throws IOException {
- // Segregate inputPaths[] to incremental, snapshot and non hoodie paths
- List incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job));
- InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
- List returns = new ArrayList<>();
-
- Map tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
- // process incremental pulls first
- for (String table : incrementalTables) {
- HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
- if (metaClient == null) {
- /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
- * in the jobConf
- */
- continue;
- }
- List inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
- List result = listStatusForIncrementalMode(job, metaClient, inputPaths);
- if (result != null) {
- returns.addAll(result);
- }
- }
-
- // process non hoodie Paths next.
- List nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
- if (nonHoodiePaths.size() > 0) {
- setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
- FileStatus[] fileStatuses = super.listStatus(job);
- returns.addAll(Arrays.asList(fileStatuses));
- }
-
- // process snapshot queries next.
- List snapshotPaths = inputPathHandler.getSnapshotPaths();
- if (snapshotPaths.size() > 0) {
- returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths));
- }
- return returns.toArray(new FileStatus[0]);
- }
-
- /**
- * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
- * partitions and then filtering based on the commits of interest, this logic first extracts the
- * partitions touched by the desired commits and then lists only those partitions.
- */
- private List listStatusForIncrementalMode(
- JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException {
- String tableName = tableMetaClient.getTableConfig().getTableName();
- Job jobContext = Job.getInstance(job);
- Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
- if (!timeline.isPresent()) {
- return null;
- }
- Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
- if (!commitsToCheck.isPresent()) {
- return null;
- }
- Option incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
- // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
- if (!incrementalInputPaths.isPresent()) {
- return null;
- }
- setInputPaths(job, incrementalInputPaths.get());
- FileStatus[] fileStatuses = super.listStatus(job);
- return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+ protected boolean includeLogFilesForSnapShotView() {
+ return false;
}
@Override
@@ -139,13 +57,4 @@ public class HoodieHFileInputFormat extends FileInputFormat incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job));
- InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
- List returns = new ArrayList<>();
-
- Map tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
- // process incremental pulls first
- for (String table : incrementalTables) {
- HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
- if (metaClient == null) {
- /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
- * in the jobConf
- */
- continue;
- }
- List inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
- List result = listStatusForIncrementalMode(job, metaClient, inputPaths);
- if (result != null) {
- returns.addAll(result);
- }
- }
-
- // process non hoodie Paths next.
- List nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
- if (nonHoodiePaths.size() > 0) {
- setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
- FileStatus[] fileStatuses = super.listStatus(job);
- returns.addAll(Arrays.asList(fileStatuses));
- }
-
- // process snapshot queries next.
- List snapshotPaths = inputPathHandler.getSnapshotPaths();
- if (snapshotPaths.size() > 0) {
- returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()));
- }
- return returns.toArray(new FileStatus[0]);
- }
-
-
-
- /**
- * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
- * partitions and then filtering based on the commits of interest, this logic first extracts the
- * partitions touched by the desired commits and then lists only those partitions.
- */
- protected List listStatusForIncrementalMode(
- JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException {
- String tableName = tableMetaClient.getTableConfig().getTableName();
- Job jobContext = Job.getInstance(job);
- Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
- if (!timeline.isPresent()) {
- return null;
- }
- Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
- if (!commitsToCheck.isPresent()) {
- return null;
- }
- Option incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
- // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
- if (!incrementalInputPaths.isPresent()) {
- return null;
- }
- setInputPaths(job, incrementalInputPaths.get());
- FileStatus[] fileStatuses = super.listStatus(job);
- return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
@Override
public RecordReader getRecordReader(final InputSplit split, final JobConf job,
final Reporter reporter) throws IOException {
@@ -175,53 +86,14 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
// clearOutExistingPredicate(job);
// }
if (split instanceof BootstrapBaseFileSplit) {
- BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
- String[] rawColNames = HoodieColumnProjectionUtils.getReadColumnNames(job);
- List rawColIds = HoodieColumnProjectionUtils.getReadColumnIDs(job);
- List> projectedColsWithIndex =
- IntStream.range(0, rawColIds.size()).mapToObj(idx -> Pair.of(rawColIds.get(idx), rawColNames[idx]))
- .collect(Collectors.toList());
-
- List> hoodieColsProjected = projectedColsWithIndex.stream()
- .filter(idxWithName -> HoodieRecord.HOODIE_META_COLUMNS.contains(idxWithName.getValue()))
- .collect(Collectors.toList());
- List> externalColsProjected = projectedColsWithIndex.stream()
- .filter(idxWithName -> !HoodieRecord.HOODIE_META_COLUMNS.contains(idxWithName.getValue())
- && !HoodieHiveUtils.VIRTUAL_COLUMN_NAMES.contains(idxWithName.getValue()))
- .collect(Collectors.toList());
-
- // This always matches hive table description
- List> colNameWithTypes = HoodieColumnProjectionUtils.getIOColumnNameAndTypes(job);
- List> colNamesWithTypesForExternal = colNameWithTypes.stream()
- .filter(p -> !HoodieRecord.HOODIE_META_COLUMNS.contains(p.getKey())).collect(Collectors.toList());
- LOG.info("colNameWithTypes =" + colNameWithTypes + ", Num Entries =" + colNameWithTypes.size());
- if (hoodieColsProjected.isEmpty()) {
- return super.getRecordReader(eSplit.getBootstrapFileSplit(), job, reporter);
- } else if (externalColsProjected.isEmpty()) {
- return super.getRecordReader(split, job, reporter);
- } else {
- FileSplit rightSplit = eSplit.getBootstrapFileSplit();
- // Hive PPD works at row-group level and only enabled when hive.optimize.index.filter=true;
- // The above config is disabled by default. But when enabled, would cause misalignment between
- // skeleton and bootstrap file. We will disable them specifically when query needs bootstrap and skeleton
- // file to be stitched.
- // This disables row-group filtering
- JobConf jobConfCopy = new JobConf(job);
- jobConfCopy.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
- jobConfCopy.unset(ConvertAstToSearchArg.SARG_PUSHDOWN);
-
- LOG.info("Generating column stitching reader for " + eSplit.getPath() + " and " + rightSplit.getPath());
- return new BootstrapColumnStichingRecordReader(super.getRecordReader(eSplit, jobConfCopy, reporter),
- HoodieRecord.HOODIE_META_COLUMNS.size(),
- super.getRecordReader(rightSplit, jobConfCopy, reporter),
- colNamesWithTypesForExternal.size(),
- true);
- }
+ return createBootstrappingRecordReader(split, job, reporter);
}
+
if (LOG.isDebugEnabled()) {
LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split);
}
- return super.getRecordReader(split, job, reporter);
+
+ return getRecordReaderInternal(split, job, reporter);
}
@Override
@@ -250,6 +122,61 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
return split;
}
+ private RecordReader getRecordReaderInternal(InputSplit split,
+ JobConf job,
+ Reporter reporter) throws IOException {
+ return mapredParquetInputFormat.getRecordReader(split, job, reporter);
+ }
+
+ private RecordReader createBootstrappingRecordReader(InputSplit split,
+ JobConf job,
+ Reporter reporter) throws IOException {
+ BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
+ String[] rawColNames = HoodieColumnProjectionUtils.getReadColumnNames(job);
+ List rawColIds = HoodieColumnProjectionUtils.getReadColumnIDs(job);
+ List> projectedColsWithIndex =
+ IntStream.range(0, rawColIds.size()).mapToObj(idx -> Pair.of(rawColIds.get(idx), rawColNames[idx]))
+ .collect(Collectors.toList());
+
+ List> hoodieColsProjected = projectedColsWithIndex.stream()
+ .filter(idxWithName -> HoodieRecord.HOODIE_META_COLUMNS.contains(idxWithName.getValue()))
+ .collect(Collectors.toList());
+ List> externalColsProjected = projectedColsWithIndex.stream()
+ .filter(idxWithName -> !HoodieRecord.HOODIE_META_COLUMNS.contains(idxWithName.getValue())
+ && !HoodieHiveUtils.VIRTUAL_COLUMN_NAMES.contains(idxWithName.getValue()))
+ .collect(Collectors.toList());
+
+ // This always matches hive table description
+ List> colNameWithTypes = HoodieColumnProjectionUtils.getIOColumnNameAndTypes(job);
+ List> colNamesWithTypesForExternal = colNameWithTypes.stream()
+ .filter(p -> !HoodieRecord.HOODIE_META_COLUMNS.contains(p.getKey())).collect(Collectors.toList());
+
+ LOG.info("colNameWithTypes =" + colNameWithTypes + ", Num Entries =" + colNameWithTypes.size());
+
+ if (hoodieColsProjected.isEmpty()) {
+ return getRecordReaderInternal(eSplit.getBootstrapFileSplit(), job, reporter);
+ } else if (externalColsProjected.isEmpty()) {
+ return getRecordReaderInternal(split, job, reporter);
+ } else {
+ FileSplit rightSplit = eSplit.getBootstrapFileSplit();
+ // Hive PPD works at row-group level and only enabled when hive.optimize.index.filter=true;
+ // The above config is disabled by default. But when enabled, would cause misalignment between
+ // skeleton and bootstrap file. We will disable them specifically when query needs bootstrap and skeleton
+ // file to be stitched.
+ // This disables row-group filtering
+ JobConf jobConfCopy = new JobConf(job);
+ jobConfCopy.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
+ jobConfCopy.unset(ConvertAstToSearchArg.SARG_PUSHDOWN);
+
+ LOG.info("Generating column stitching reader for " + eSplit.getPath() + " and " + rightSplit.getPath());
+ return new BootstrapColumnStichingRecordReader(getRecordReaderInternal(eSplit, jobConfCopy, reporter),
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
+ getRecordReaderInternal(rightSplit, jobConfCopy, reporter),
+ colNamesWithTypesForExternal.size(),
+ true);
+ }
+ }
+
private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
try {
LOG.info("Making external data split for " + file);
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
index 6f92359b2..525bec613 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
@@ -18,16 +18,6 @@
package org.apache.hudi.hadoop.realtime;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.hadoop.HoodieHFileInputFormat;
-import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
-import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
-
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
@@ -36,6 +26,14 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hadoop.HoodieHFileInputFormat;
+import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
+import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -58,12 +56,6 @@ public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}
- @Override
- public FileStatus[] listStatus(JobConf job) throws IOException {
- // Call the HoodieInputFormat::listStatus to obtain all latest hfiles, based on commit timeline.
- return super.listStatus(job);
- }
-
@Override
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
// no specific filtering for Realtime format
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index e683840c6..b6a7fe9f4 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -46,7 +46,6 @@ import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
@@ -89,7 +88,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
- return isIncrementalSplits ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream());
+ return isIncrementalSplits
+ ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream())
+ : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream());
}
/**
@@ -159,7 +160,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
// step5
// find all file status in partitionPaths.
- FileStatus[] fileStatuses = getStatus(job);
+ FileStatus[] fileStatuses = doListStatus(job);
Map candidateFileStatus = new HashMap<>();
for (int i = 0; i < fileStatuses.length; i++) {
String key = fileStatuses[i].getPath().toString();
@@ -261,13 +262,6 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
}
}
- @Override
- public FileStatus[] listStatus(JobConf job) throws IOException {
- // Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
- // timeline.
- return super.listStatus(job);
- }
-
@Override
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
// no specific filtering for Realtime format
@@ -322,9 +316,4 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
super.getRecordReader(split, jobConf, reporter));
}
-
- @Override
- public Configuration getConf() {
- return conf;
- }
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 4f4b69f98..95752bac3 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop.utils;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -268,7 +269,7 @@ public class HoodieInputFormatUtils {
* @param tableMetaClient
* @return
*/
- public static Option getFilteredCommitsTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
+ public static Option getFilteredCommitsTimeline(JobContext job, HoodieTableMetaClient tableMetaClient) {
String tableName = tableMetaClient.getTableConfig().getTableName();
HoodieDefaultTimeline baseTimeline;
if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
@@ -299,7 +300,7 @@ public class HoodieInputFormatUtils {
* @param timeline
* @return
*/
- public static HoodieTimeline getHoodieTimelineForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) {
+ public static HoodieTimeline getHoodieTimelineForIncrementalQuery(JobContext job, String tableName, HoodieTimeline timeline) {
String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
@@ -439,11 +440,6 @@ public class HoodieInputFormatUtils {
.build();
}
- public static List filterFileStatusForSnapshotMode(JobConf job, Map tableMetaClientMap,
- List snapshotPaths) throws IOException {
- return filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, false);
- }
-
public static List filterFileStatusForSnapshotMode(JobConf job, Map tableMetaClientMap,
List snapshotPaths, boolean includeLogFiles) throws IOException {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);