diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
index 010b8d6ac..2a62cc898 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
@@ -21,10 +21,12 @@ package org.apache.hudi.hadoop;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.config.TypedProperties;
@@ -65,12 +67,32 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState;
*
Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)
* External mode: reading non-Hudi partitions
*
+ *
+ * NOTE: This class is invariant of the underlying file-format of the files being read
*/
public abstract class HoodieFileInputFormatBase extends FileInputFormat
implements Configurable {
protected Configuration conf;
+ @Nonnull
+ private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, Stream logFiles) {
+ List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+ FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+ try {
+ RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+ rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+ rtFileStatus.setBaseFilePath(baseFile.getPath());
+ if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+ rtFileStatus.setBootStrapFileStatus(baseFileStatus);
+ }
+
+ return rtFileStatus;
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
+ }
+ }
+
@Override
public final Configuration getConf() {
return conf;
@@ -81,6 +103,24 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat targetFiles, List legacyFileStatuses) {
- List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
- checkState(diff.isEmpty(), "Should be empty");
- }
-
- @Nonnull
- private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
- try {
- return HoodieInputFormatUtils.getFileStatus(baseFile);
- } catch (IOException ioe) {
- throw new HoodieIOException("Failed to get file-status", ioe);
- }
- }
-
/**
* Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
* lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
@@ -172,35 +198,25 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat logFiles,
- Option latestCompletedInstantOpt,
- HoodieTableMetaClient tableMetaClient) {
- List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
- FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+ private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
try {
- RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
- rtFileStatus.setDeltaLogFiles(sortedLogFiles);
- rtFileStatus.setBaseFilePath(baseFile.getPath());
- rtFileStatus.setBasePath(tableMetaClient.getBasePath());
-
- if (latestCompletedInstantOpt.isPresent()) {
- HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
- checkState(latestCompletedInstant.isCompleted());
-
- rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
- }
-
- if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
- rtFileStatus.setBootStrapFileStatus(baseFileStatus);
- }
-
- return rtFileStatus;
+ LOG.info("Making external data split for " + file);
+ FileStatus externalFileStatus = file.getBootstrapFileStatus();
+ FileSplit externalFileSplit = makeSplit(externalFileStatus.getPath(), 0, externalFileStatus.getLen(),
+ new String[0], new String[0]);
+ return new BootstrapBaseFileSplit(split, externalFileSplit);
} catch (IOException e) {
- throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
+ throw new HoodieIOException(e.getMessage(), e);
}
}
@@ -209,38 +225,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat logFiles,
- Option latestCompletedInstantOpt,
- HoodieTableMetaClient tableMetaClient) {
- List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
- try {
- RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
- rtFileStatus.setDeltaLogFiles(sortedLogFiles);
- rtFileStatus.setBasePath(tableMetaClient.getBasePath());
-
- if (latestCompletedInstantOpt.isPresent()) {
- HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
- checkState(latestCompletedInstant.isCompleted());
-
- rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
- }
-
- return rtFileStatus;
- } catch (IOException e) {
- throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
- }
- }
-
- private static Option fromScala(scala.Option opt) {
- if (opt.isDefined()) {
- return Option.of(opt.get());
- }
-
- return Option.empty();
- }
-
@Nonnull
private List listStatusForSnapshotMode(JobConf job,
Map tableMetaClientMap,
@@ -317,4 +301,80 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat targetFiles, List legacyFileStatuses) {
+ List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
+ checkState(diff.isEmpty(), "Should be empty");
+ }
+
+ @Nonnull
+ private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+ try {
+ return HoodieInputFormatUtils.getFileStatus(baseFile);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to get file-status", ioe);
+ }
+ }
+
+ @Nonnull
+ private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+ Stream logFiles,
+ Option latestCompletedInstantOpt,
+ HoodieTableMetaClient tableMetaClient) {
+ List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+ FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+ try {
+ RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+ rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+ rtFileStatus.setBaseFilePath(baseFile.getPath());
+ rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+ if (latestCompletedInstantOpt.isPresent()) {
+ HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+ checkState(latestCompletedInstant.isCompleted());
+
+ rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
+ }
+
+ if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+ rtFileStatus.setBootStrapFileStatus(baseFileStatus);
+ }
+
+ return rtFileStatus;
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
+ }
+ }
+
+ @Nonnull
+ private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
+ Stream logFiles,
+ Option latestCompletedInstantOpt,
+ HoodieTableMetaClient tableMetaClient) {
+ List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+ try {
+ RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
+ rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+ rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+ if (latestCompletedInstantOpt.isPresent()) {
+ HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+ checkState(latestCompletedInstant.isCompleted());
+
+ rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
+ }
+
+ return rtFileStatus;
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
+ }
+ }
+
+ private static Option fromScala(scala.Option opt) {
+ if (opt.isDefined()) {
+ return Option.of(opt.get());
+ }
+
+ return Option.empty();
+ }
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index 078258f3e..01eda3b35 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -18,17 +18,7 @@
package org.apache.hudi.hadoop;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-
import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -39,6 +29,9 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -62,10 +55,6 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen
// {@code RecordReader}
private final MapredParquetInputFormat mapredParquetInputFormat = new MapredParquetInputFormat();
- protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
- return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
- }
-
protected boolean includeLogFilesForSnapshotView() {
return false;
}
@@ -96,32 +85,6 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen
return getRecordReaderInternal(split, job, reporter);
}
- @Override
- protected boolean isSplitable(FileSystem fs, Path filename) {
- return !(filename instanceof PathWithBootstrapFileStatus);
- }
-
- @Override
- protected FileSplit makeSplit(Path file, long start, long length,
- String[] hosts) {
- FileSplit split = new FileSplit(file, start, length, hosts);
-
- if (file instanceof PathWithBootstrapFileStatus) {
- return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
- }
- return split;
- }
-
- @Override
- protected FileSplit makeSplit(Path file, long start, long length,
- String[] hosts, String[] inMemoryHosts) {
- FileSplit split = new FileSplit(file, start, length, hosts, inMemoryHosts);
- if (file instanceof PathWithBootstrapFileStatus) {
- return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
- }
- return split;
- }
-
private RecordReader getRecordReaderInternal(InputSplit split,
JobConf job,
Reporter reporter) throws IOException {
@@ -176,16 +139,4 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen
true);
}
}
-
- private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
- try {
- LOG.info("Making external data split for " + file);
- FileStatus externalFileStatus = file.getBootstrapFileStatus();
- FileSplit externalFileSplit = makeSplit(externalFileStatus.getPath(), 0, externalFileStatus.getLen(),
- new String[0], new String[0]);
- return new BootstrapBaseFileSplit(split, externalFileSplit);
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
- }
}
\ No newline at end of file
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index c24c75359..f23e6ac86 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.hive;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hadoop.HoodieFileInputFormatBase;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
@@ -876,7 +877,7 @@ public class HoodieCombineHiveInputFormat result;
if (hoodieFilter) {
- HoodieParquetInputFormat input;
+ HoodieFileInputFormatBase input;
if (isRealTime) {
LOG.info("Using HoodieRealtimeInputFormat");
input = createParquetRealtimeInputFormat();
@@ -916,7 +917,7 @@ public class HoodieCombineHiveInputFormat fileSplits = Arrays.stream(super.getSplits(job, numSplits))
- .map(is -> (FileSplit) is)
- .collect(Collectors.toList());
- return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
- }
-
- @Override
- protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
- // no specific filtering for Realtime format
- return timeline;
- }
+ // NOTE: We're only using {@code HoodieHFileInputFormat} to compose {@code RecordReader}
+ private final HoodieHFileInputFormat hFileInputFormat = new HoodieHFileInputFormat();
@Override
public RecordReader getRecordReader(final InputSplit split, final JobConf jobConf,
@@ -102,6 +88,12 @@ public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split);
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, jobConf,
- super.getRecordReader(split, jobConf, reporter));
+ hFileInputFormat.getRecordReader(split, jobConf, reporter));
+ }
+
+ @Override
+ protected boolean isSplitable(FileSystem fs, Path filename) {
+ // This file isn't splittable.
+ return false;
}
}
\ No newline at end of file
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index d8bfce67d..5f8b11cb1 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -18,256 +18,60 @@
package org.apache.hudi.hadoop.realtime;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileGroup;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
-import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
-import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
-import org.apache.hudi.hadoop.RealtimeFileStatus;
-import org.apache.hudi.hadoop.PathWithLogFilePath;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.stream.Collectors;
-
/**
* Input Format, that provides a real-time view of data in a Hoodie table.
*/
@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
-public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable {
+public class HoodieParquetRealtimeInputFormat extends HoodieRealtimeFileInputFormatBase implements Configurable {
private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
+ // NOTE: We're only using {@code HoodieParquetInputFormat} to compose {@code RecordReader}
+ private final HoodieParquetInputFormat parquetInputFormat = new HoodieParquetInputFormat();
+
// To make Hive on Spark queries work with RT tables. Our theory is that due to
// {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
// not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple
// times which ultimately breaks the query.
-
@Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+ public RecordReader getRecordReader(final InputSplit split, final JobConf jobConf,
+ final Reporter reporter) throws IOException {
+ // sanity check
+ ValidationUtils.checkArgument(split instanceof RealtimeSplit,
+ "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);
+ RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+ addProjectionToJobConf(realtimeSplit, jobConf);
+ LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
- return HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits)
- ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits)
- : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
- }
-
- /**
- * Keep the logic of mor_incr_view as same as spark datasource.
- * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1).
- * Step2: Get list of affected files status for these affected file status.
- * Step3: Construct HoodieTableFileSystemView based on those affected file status.
- * a. Filter affected partitions based on inputPaths.
- * b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups.
- * Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to
- * this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step.
- * Step5: Find candidate fileStatus, since when we get baseFileStatus from HoodieTableFileSystemView,
- * the BaseFileStatus will missing file size information.
- * We should use candidate fileStatus to update the size information for BaseFileStatus.
- * Step6: For every file group from step3(b)
- * Get 1st available base file from all file slices. then we use candidate file status to update the baseFileStatus,
- * and construct RealTimeFileStatus and add it to result along with log files.
- * If file group just has log files, construct RealTimeFileStatus and add it to result.
- * TODO: unify the incremental view code between hive/spark-sql and spark datasource
- */
- @Override
- protected List listStatusForIncrementalMode(JobConf job,
- HoodieTableMetaClient tableMetaClient,
- List inputPaths,
- String incrementalTable) throws IOException {
- List result = new ArrayList<>();
- Job jobContext = Job.getInstance(job);
-
- // step1
- Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
- if (!timeline.isPresent()) {
- return result;
- }
- HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTable, timeline.get());
- Option> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
- if (!commitsToCheck.isPresent()) {
- return result;
- }
- // step2
- commitsToCheck.get().sort(HoodieInstant::compareTo);
- List metadataList = commitsToCheck
- .get().stream().map(instant -> {
- try {
- return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn);
- } catch (IOException e) {
- throw new HoodieException(String.format("cannot get metadata for instant: %s", instant));
- }
- }).collect(Collectors.toList());
-
- // build fileGroup from fsView
- List affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
- .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList));
- // step3
- HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
- // build fileGroup from fsView
- Path basePath = new Path(tableMetaClient.getBasePath());
- // filter affectedPartition by inputPaths
- List affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream()
- .filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
- if (affectedPartition.isEmpty()) {
- return result;
- }
- List fileGroups = affectedPartition.stream()
- .flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
- // step4
- setInputPaths(job, affectedPartition.stream()
- .map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, p).toString()).collect(Collectors.joining(",")));
-
- // step5
- // find all file status in partitionPaths.
- FileStatus[] fileStatuses = doListStatus(job);
- Map candidateFileStatus = new HashMap<>();
- for (int i = 0; i < fileStatuses.length; i++) {
- String key = fileStatuses[i].getPath().toString();
- candidateFileStatus.put(key, fileStatuses[i]);
+ // for log only split, set the parquet reader as empty.
+ if (FSUtils.isLogFile(realtimeSplit.getPath())) {
+ return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf));
}
- String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
- // step6
- result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus));
- return result;
- }
-
- private List collectAllIncrementalFiles(List fileGroups, String maxCommitTime, String basePath, Map candidateFileStatus) {
- List result = new ArrayList<>();
- fileGroups.stream().forEach(f -> {
- try {
- List baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList());
- if (!baseFiles.isEmpty()) {
- FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
- String baseFilePath = baseFileStatus.getPath().toUri().toString();
- if (!candidateFileStatus.containsKey(baseFilePath)) {
- throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath);
- }
- // We cannot use baseFileStatus.getPath() here, since baseFileStatus.getPath() missing file size information.
- // So we use candidateFileStatus.get(baseFileStatus.getPath()) to get a correct path.
- RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
- fileStatus.setMaxCommitTime(maxCommitTime);
- fileStatus.setBelongToIncrementalFileStatus(true);
- fileStatus.setBasePath(basePath);
- fileStatus.setBaseFilePath(baseFilePath);
- fileStatus.setDeltaLogFiles(f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList()));
- // try to set bootstrapfileStatus
- if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
- fileStatus.setBootStrapFileStatus(baseFileStatus);
- }
- result.add(fileStatus);
- }
- // add file group which has only logs.
- if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) {
- List logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList());
- if (logFileStatus.size() > 0) {
- RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0));
- fileStatus.setBelongToIncrementalFileStatus(true);
- fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList()));
- fileStatus.setMaxCommitTime(maxCommitTime);
- fileStatus.setBasePath(basePath);
- result.add(fileStatus);
- }
- }
- } catch (IOException e) {
- throw new HoodieException("Error obtaining data file/log file grouping ", e);
- }
- });
- return result;
- }
-
- @Override
- protected boolean includeLogFilesForSnapshotView() {
- return true;
- }
-
- @Override
- protected boolean isSplitable(FileSystem fs, Path filename) {
- if (filename instanceof PathWithLogFilePath) {
- return ((PathWithLogFilePath)filename).splitable();
- }
- return super.isSplitable(fs, filename);
- }
-
- // make split for path.
- // When query the incremental view, the read files may be bootstrap files, we wrap those bootstrap files into
- // PathWithLogFilePath, so those bootstrap files should be processed int this function.
- @Override
- protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
- if (file instanceof PathWithLogFilePath) {
- return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null);
- }
- return super.makeSplit(file, start, length, hosts);
- }
-
- @Override
- protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
- if (file instanceof PathWithLogFilePath) {
- return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts);
- }
- return super.makeSplit(file, start, length, hosts, inMemoryHosts);
- }
-
- private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
- if (!path.includeBootstrapFilePath()) {
- return path.buildSplit(path, start, length, hosts);
- }
-
- FileSplit bf = inMemoryHosts == null
- ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts)
- : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
-
- return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit(
- (BootstrapBaseFileSplit) bf,
- path.getBasePath(),
- path.getDeltaLogFiles(),
- path.getMaxCommitTime(),
- path.getBelongsToIncrementalQuery());
- }
-
- @Override
- protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
- // no specific filtering for Realtime format
- return timeline;
+ return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
+ parquetInputFormat.getRecordReader(split, jobConf, reporter));
}
void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
@@ -297,25 +101,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
}
}
}
+
HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
}
-
- @Override
- public RecordReader getRecordReader(final InputSplit split, final JobConf jobConf,
- final Reporter reporter) throws IOException {
- // sanity check
- ValidationUtils.checkArgument(split instanceof RealtimeSplit,
- "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);
- RealtimeSplit realtimeSplit = (RealtimeSplit) split;
- addProjectionToJobConf(realtimeSplit, jobConf);
- LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
- + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-
- // for log only split, set the parquet reader as empty.
- if (FSUtils.isLogFile(realtimeSplit.getPath())) {
- return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf));
- }
- return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
- super.getRecordReader(split, jobConf, reporter));
- }
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileInputFormatBase.java
new file mode 100644
index 000000000..2fe3bbdc1
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileInputFormatBase.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
+import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
+import org.apache.hudi.hadoop.HoodieFileInputFormatBase;
+import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
+import org.apache.hudi.hadoop.PathWithLogFilePath;
+import org.apache.hudi.hadoop.RealtimeFileStatus;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's
+ * Merge-on-Read (COW) tables in various configurations:
+ *
+ *
+ * - Snapshot mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)
+ * - Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)
+ * - External mode: reading non-Hudi partitions
+ *
+ *
+ * NOTE: This class is invariant of the underlying file-format of the files being read
+ */
+public abstract class HoodieRealtimeFileInputFormatBase extends HoodieFileInputFormatBase implements Configurable {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+
+ return HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits)
+ ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits)
+ : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
+ }
+
+ /**
+ * Keep the logic of mor_incr_view as same as spark datasource.
+ * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1).
+ * Step2: Get list of affected files status for these affected file status.
+ * Step3: Construct HoodieTableFileSystemView based on those affected file status.
+ * a. Filter affected partitions based on inputPaths.
+ * b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups.
+ * Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to
+ * this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step.
+ * Step5: Find candidate fileStatus, since when we get baseFileStatus from HoodieTableFileSystemView,
+ * the BaseFileStatus will missing file size information.
+ * We should use candidate fileStatus to update the size information for BaseFileStatus.
+ * Step6: For every file group from step3(b)
+ * Get 1st available base file from all file slices. then we use candidate file status to update the baseFileStatus,
+ * and construct RealTimeFileStatus and add it to result along with log files.
+ * If file group just has log files, construct RealTimeFileStatus and add it to result.
+ * TODO: unify the incremental view code between hive/spark-sql and spark datasource
+ */
+ @Override
+ protected List listStatusForIncrementalMode(JobConf job,
+ HoodieTableMetaClient tableMetaClient,
+ List inputPaths,
+ String incrementalTableName) throws IOException {
+ List result = new ArrayList<>();
+ Job jobContext = Job.getInstance(job);
+
+ // step1
+ Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+ if (!timeline.isPresent()) {
+ return result;
+ }
+ HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTableName, timeline.get());
+ Option> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+ if (!commitsToCheck.isPresent()) {
+ return result;
+ }
+ // step2
+ commitsToCheck.get().sort(HoodieInstant::compareTo);
+ List metadataList = commitsToCheck
+ .get().stream().map(instant -> {
+ try {
+ return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn);
+ } catch (IOException e) {
+ throw new HoodieException(String.format("cannot get metadata for instant: %s", instant));
+ }
+ }).collect(Collectors.toList());
+
+ // build fileGroup from fsView
+ List affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
+ .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList));
+ // step3
+ HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
+ // build fileGroup from fsView
+ Path basePath = new Path(tableMetaClient.getBasePath());
+ // filter affectedPartition by inputPaths
+ List affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream()
+ .filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+ if (affectedPartition.isEmpty()) {
+ return result;
+ }
+ List fileGroups = affectedPartition.stream()
+ .flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
+ // step4
+ setInputPaths(job, affectedPartition.stream()
+ .map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, p).toString()).collect(Collectors.joining(",")));
+
+ // step5
+ // find all file status in partitionPaths.
+ FileStatus[] fileStatuses = doListStatus(job);
+ Map candidateFileStatus = new HashMap<>();
+ for (int i = 0; i < fileStatuses.length; i++) {
+ String key = fileStatuses[i].getPath().toString();
+ candidateFileStatus.put(key, fileStatuses[i]);
+ }
+
+ String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
+ // step6
+ result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus));
+ return result;
+ }
+
+ @Override
+ protected boolean includeLogFilesForSnapshotView() {
+ return true;
+ }
+
+ @Override
+ protected boolean isSplitable(FileSystem fs, Path filename) {
+ if (filename instanceof PathWithLogFilePath) {
+ return ((PathWithLogFilePath)filename).splitable();
+ }
+
+ return super.isSplitable(fs, filename);
+ }
+
+ // make split for path.
+ // When query the incremental view, the read files may be bootstrap files, we wrap those bootstrap files into
+ // PathWithLogFilePath, so those bootstrap files should be processed int this function.
+ @Override
+ protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
+ if (file instanceof PathWithLogFilePath) {
+ return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null);
+ }
+ return super.makeSplit(file, start, length, hosts);
+ }
+
+ @Override
+ protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
+ if (file instanceof PathWithLogFilePath) {
+ return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts);
+ }
+ return super.makeSplit(file, start, length, hosts, inMemoryHosts);
+ }
+
+ private List collectAllIncrementalFiles(List fileGroups, String maxCommitTime, String basePath, Map candidateFileStatus) {
+ List result = new ArrayList<>();
+ fileGroups.stream().forEach(f -> {
+ try {
+ List baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList());
+ if (!baseFiles.isEmpty()) {
+ FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
+ String baseFilePath = baseFileStatus.getPath().toUri().toString();
+ if (!candidateFileStatus.containsKey(baseFilePath)) {
+ throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath);
+ }
+ // We cannot use baseFileStatus.getPath() here, since baseFileStatus.getPath() missing file size information.
+ // So we use candidateFileStatus.get(baseFileStatus.getPath()) to get a correct path.
+ RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
+ fileStatus.setMaxCommitTime(maxCommitTime);
+ fileStatus.setBelongToIncrementalFileStatus(true);
+ fileStatus.setBasePath(basePath);
+ fileStatus.setBaseFilePath(baseFilePath);
+ fileStatus.setDeltaLogFiles(f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList()));
+ // try to set bootstrapfileStatus
+ if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+ fileStatus.setBootStrapFileStatus(baseFileStatus);
+ }
+ result.add(fileStatus);
+ }
+ // add file group which has only logs.
+ if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) {
+ List logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList());
+ if (logFileStatus.size() > 0) {
+ RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0));
+ fileStatus.setBelongToIncrementalFileStatus(true);
+ fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList()));
+ fileStatus.setMaxCommitTime(maxCommitTime);
+ fileStatus.setBasePath(basePath);
+ result.add(fileStatus);
+ }
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Error obtaining data file/log file grouping ", e);
+ }
+ });
+ return result;
+ }
+
+ private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
+ if (!path.includeBootstrapFilePath()) {
+ return path.buildSplit(path, start, length, hosts);
+ } else {
+ FileSplit bf =
+ inMemoryHosts == null
+ ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts)
+ : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
+ return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit(
+ (BootstrapBaseFileSplit) bf,
+ path.getBasePath(),
+ path.getDeltaLogFiles(),
+ path.getMaxCommitTime(),
+ path.getBelongsToIncrementalQuery());
+ }
+ }
+}
+
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 5c7a1fdf2..b7cb72e38 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -115,7 +116,7 @@ public class TestHoodieParquetInputFormat {
timeline.setInstants(instants);
// Verify getCommitsTimelineBeforePendingCompaction does not return instants after first compaction instant
- HoodieTimeline filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
+ HoodieTimeline filteredTimeline = HoodieInputFormatUtils.filterInstantsTimeline(timeline);
assertTrue(filteredTimeline.containsInstant(t1));
assertTrue(filteredTimeline.containsInstant(t2));
assertFalse(filteredTimeline.containsInstant(t3));
@@ -126,7 +127,7 @@ public class TestHoodieParquetInputFormat {
instants.remove(t3);
timeline = new HoodieActiveTimeline(metaClient);
timeline.setInstants(instants);
- filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
+ filteredTimeline = HoodieInputFormatUtils.filterInstantsTimeline(timeline);
// verify all remaining instants are returned.
assertTrue(filteredTimeline.containsInstant(t1));
@@ -140,7 +141,7 @@ public class TestHoodieParquetInputFormat {
instants.remove(t5);
timeline = new HoodieActiveTimeline(metaClient);
timeline.setInstants(instants);
- filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
+ filteredTimeline = HoodieInputFormatUtils.filterInstantsTimeline(timeline);
// verify all remaining instants are returned.
assertTrue(filteredTimeline.containsInstant(t1));
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json
index c8fead0c1..00d16c660 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json
@@ -1,8 +1,8 @@
-{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-0-c000.snappy.parquet"}
-{"c1_maxValue":770,"c1_minValue":300,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 300sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_num_nulls":0,"c5_maxValue":78,"c5_minValue":31,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-1-c000.snappy.parquet"}
-{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-0-c000.snappy.parquet"}
-{"c1_maxValue":719,"c1_minValue":125,"c1_num_nulls":0,"c2_maxValue":" 719sdc","c2_minValue":" 125sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":153.125,"c3_num_nulls":0,"c5_maxValue":73,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-09-27","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"OA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-1-c000.snappy.parquet"}
-{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-0-c000.snappy.parquet"}
-{"c1_maxValue":486,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 79sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":771.590,"c3_minValue":82.111,"c3_num_nulls":0,"c5_maxValue":50,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-22","c6_num_nulls":0,"c7_maxValue":"5g==","c7_minValue":"Ow==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-1-c000.snappy.parquet"}
-{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-0-c000.snappy.parquet"}
-{"c1_maxValue":272,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":430.129,"c3_num_nulls":0,"c5_maxValue":28,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-03-23","c6_num_nulls":0,"c7_maxValue":"8A==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-1-c000.snappy.parquet"}
\ No newline at end of file
+{"c1_maxValue":272,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":430.129,"c3_num_nulls":0,"c5_maxValue":28,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-03-23","c6_num_nulls":0,"c7_maxValue":"8A==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}
+{"c1_maxValue":486,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 79sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":771.590,"c3_minValue":82.111,"c3_num_nulls":0,"c5_maxValue":50,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-22","c6_num_nulls":0,"c7_maxValue":"5g==","c7_minValue":"Ow==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
+{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
+{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
+{"c1_maxValue":719,"c1_minValue":125,"c1_num_nulls":0,"c2_maxValue":" 719sdc","c2_minValue":" 125sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":153.125,"c3_num_nulls":0,"c5_maxValue":73,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-09-27","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"OA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
+{"c1_maxValue":770,"c1_minValue":300,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 300sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_num_nulls":0,"c5_maxValue":78,"c5_minValue":31,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
+{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
+{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json
index 30fccb3f9..a633e3170 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json
@@ -1,4 +1,4 @@
-{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-0-c000.snappy.parquet"}
-{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-0-c000.snappy.parquet"}
-{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-0-c000.snappy.parquet"}
-{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-0-c000.snappy.parquet"}
\ No newline at end of file
+{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
+{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
+{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
+{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 7b06bca87..ae41fa8eb 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -20,23 +20,20 @@ package org.apache.hudi.functional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieColumnRangeMetadata
import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper
import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.spark.sql._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, RowFactory, SaveMode, SparkSession, functions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import java.math.BigInteger
-import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
-import scala.util.{Random, Success}
+import scala.util.Random
class TestColumnStatsIndex extends HoodieClientTestBase {
var spark: SparkSession = _
@@ -354,11 +351,10 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
.distinct()
.collect()
.map(_.getString(0))
- .sorted
val uuidToIdx: UserDefinedFunction = functions.udf((fileName: String) => {
- val (uuid, idx) = uuids.zipWithIndex.find { case (uuid, _) => fileName.contains(uuid) }.get
- fileName.replace(uuid, idx.toString)
+ val uuid = uuids.find(uuid => fileName.contains(uuid)).get
+ fileName.replace(uuid, "xxx")
})
ds.withColumn("file", uuidToIdx(ds("file")))
@@ -409,12 +405,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
.mkString("\n")
private def sort(df: DataFrame): DataFrame = {
- // Since upon parsing JSON, Spark re-order columns in lexicographical order
- // of their names, we have to shuffle new Z-index table columns order to match
- // Rows are sorted by filename as well to avoid
val sortedCols = df.columns.sorted
+ // Sort dataset by the first 2 columns (to minimize non-determinism in case multiple files have the same
+ // value of the first column)
df.select(sortedCols.head, sortedCols.tail: _*)
- .sort("file")
+ .sort("c1_maxValue", "c1_minValue")
}
}