From 3f263b82ce6687a8275419ce53ba23348561bdcb Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 7 Feb 2022 11:06:28 -0800 Subject: [PATCH] [HUDI-3206] Unify Hive's MOR implementations to avoid duplication (#4559) Unify Hive's MOR implementations to avoid duplication to avoid duplication across implementations for different file-formats (Parquet, HFile, etc) - Extracted HoodieRealtimeFileInputFormatBase (extending COW HoodieFileInputFormatBase base) - Rebased Parquet, HFile implementations onto HoodieRealtimeFileInputFormatBase - Tidying up --- .../hadoop/HoodieFileInputFormatBase.java | 204 ++++++++----- .../hudi/hadoop/HoodieParquetInputFormat.java | 55 +--- .../hive/HoodieCombineHiveInputFormat.java | 5 +- .../HoodieHFileRealtimeInputFormat.java | 32 +-- .../HoodieParquetRealtimeInputFormat.java | 268 ++---------------- .../HoodieRealtimeFileInputFormatBase.java | 259 +++++++++++++++++ .../hadoop/TestHoodieParquetInputFormat.java | 7 +- .../index/zorder/z-index-table-merged.json | 16 +- .../resources/index/zorder/z-index-table.json | 8 +- .../functional/TestColumnStatsIndex.scala | 21 +- 10 files changed, 460 insertions(+), 415 deletions(-) create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileInputFormatBase.java diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java index 010b8d6ac..2a62cc898 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java @@ -21,10 +21,12 @@ package org.apache.hudi.hadoop; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hudi.common.config.TypedProperties; @@ -65,12 +67,32 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState; *
  • Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)
  • *
  • External mode: reading non-Hudi partitions
  • * + * + * NOTE: This class is invariant of the underlying file-format of the files being read */ public abstract class HoodieFileInputFormatBase extends FileInputFormat implements Configurable { protected Configuration conf; + @Nonnull + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, Stream logFiles) { + List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + FileStatus baseFileStatus = getFileStatusUnchecked(baseFile); + try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus); + rtFileStatus.setDeltaLogFiles(sortedLogFiles); + rtFileStatus.setBaseFilePath(baseFile.getPath()); + if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { + rtFileStatus.setBootStrapFileStatus(baseFileStatus); + } + + return rtFileStatus; + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); + } + } + @Override public final Configuration getConf() { return conf; @@ -81,6 +103,24 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat targetFiles, List legacyFileStatuses) { - List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); - checkState(diff.isEmpty(), "Should be empty"); - } - - @Nonnull - private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { - try { - return HoodieInputFormatUtils.getFileStatus(baseFile); - } catch (IOException ioe) { - throw new HoodieIOException("Failed to get file-status", ioe); - } - } - /** * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified @@ -172,35 +198,25 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat logFiles, - Option latestCompletedInstantOpt, - HoodieTableMetaClient tableMetaClient) { - List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); - FileStatus baseFileStatus = getFileStatusUnchecked(baseFile); + private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) { try { - RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus); - rtFileStatus.setDeltaLogFiles(sortedLogFiles); - rtFileStatus.setBaseFilePath(baseFile.getPath()); - rtFileStatus.setBasePath(tableMetaClient.getBasePath()); - - if (latestCompletedInstantOpt.isPresent()) { - HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); - checkState(latestCompletedInstant.isCompleted()); - - rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); - } - - if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { - rtFileStatus.setBootStrapFileStatus(baseFileStatus); - } - - return rtFileStatus; + LOG.info("Making external data split for " + file); + FileStatus externalFileStatus = file.getBootstrapFileStatus(); + FileSplit externalFileSplit = makeSplit(externalFileStatus.getPath(), 0, externalFileStatus.getLen(), + new String[0], new String[0]); + return new BootstrapBaseFileSplit(split, externalFileSplit); } catch (IOException e) { - throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); + throw new HoodieIOException(e.getMessage(), e); } } @@ -209,38 +225,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat logFiles, - Option latestCompletedInstantOpt, - HoodieTableMetaClient tableMetaClient) { - List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); - try { - RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus()); - rtFileStatus.setDeltaLogFiles(sortedLogFiles); - rtFileStatus.setBasePath(tableMetaClient.getBasePath()); - - if (latestCompletedInstantOpt.isPresent()) { - HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); - checkState(latestCompletedInstant.isCompleted()); - - rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); - } - - return rtFileStatus; - } catch (IOException e) { - throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); - } - } - - private static Option fromScala(scala.Option opt) { - if (opt.isDefined()) { - return Option.of(opt.get()); - } - - return Option.empty(); - } - @Nonnull private List listStatusForSnapshotMode(JobConf job, Map tableMetaClientMap, @@ -317,4 +301,80 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat targetFiles, List legacyFileStatuses) { + List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); + checkState(diff.isEmpty(), "Should be empty"); + } + + @Nonnull + private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { + try { + return HoodieInputFormatUtils.getFileStatus(baseFile); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to get file-status", ioe); + } + } + + @Nonnull + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, + Stream logFiles, + Option latestCompletedInstantOpt, + HoodieTableMetaClient tableMetaClient) { + List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + FileStatus baseFileStatus = getFileStatusUnchecked(baseFile); + try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus); + rtFileStatus.setDeltaLogFiles(sortedLogFiles); + rtFileStatus.setBaseFilePath(baseFile.getPath()); + rtFileStatus.setBasePath(tableMetaClient.getBasePath()); + + if (latestCompletedInstantOpt.isPresent()) { + HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); + checkState(latestCompletedInstant.isCompleted()); + + rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); + } + + if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { + rtFileStatus.setBootStrapFileStatus(baseFileStatus); + } + + return rtFileStatus; + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); + } + } + + @Nonnull + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile, + Stream logFiles, + Option latestCompletedInstantOpt, + HoodieTableMetaClient tableMetaClient) { + List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus()); + rtFileStatus.setDeltaLogFiles(sortedLogFiles); + rtFileStatus.setBasePath(tableMetaClient.getBasePath()); + + if (latestCompletedInstantOpt.isPresent()) { + HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); + checkState(latestCompletedInstant.isCompleted()); + + rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); + } + + return rtFileStatus; + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); + } + } + + private static Option fromScala(scala.Option opt) { + if (opt.isDefined()) { + return Option.of(opt.get()); + } + + return Option.empty(); + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 078258f3e..01eda3b35 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -18,17 +18,7 @@ package org.apache.hudi.hadoop; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.utils.HoodieHiveUtils; -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; - import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -39,6 +29,9 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -62,10 +55,6 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen // {@code RecordReader} private final MapredParquetInputFormat mapredParquetInputFormat = new MapredParquetInputFormat(); - protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) { - return HoodieInputFormatUtils.filterInstantsTimeline(timeline); - } - protected boolean includeLogFilesForSnapshotView() { return false; } @@ -96,32 +85,6 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen return getRecordReaderInternal(split, job, reporter); } - @Override - protected boolean isSplitable(FileSystem fs, Path filename) { - return !(filename instanceof PathWithBootstrapFileStatus); - } - - @Override - protected FileSplit makeSplit(Path file, long start, long length, - String[] hosts) { - FileSplit split = new FileSplit(file, start, length, hosts); - - if (file instanceof PathWithBootstrapFileStatus) { - return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split); - } - return split; - } - - @Override - protected FileSplit makeSplit(Path file, long start, long length, - String[] hosts, String[] inMemoryHosts) { - FileSplit split = new FileSplit(file, start, length, hosts, inMemoryHosts); - if (file instanceof PathWithBootstrapFileStatus) { - return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split); - } - return split; - } - private RecordReader getRecordReaderInternal(InputSplit split, JobConf job, Reporter reporter) throws IOException { @@ -176,16 +139,4 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen true); } } - - private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) { - try { - LOG.info("Making external data split for " + file); - FileStatus externalFileStatus = file.getBootstrapFileStatus(); - FileSplit externalFileSplit = makeSplit(externalFileStatus.getPath(), 0, externalFileStatus.getLen(), - new String[0], new String[0]); - return new BootstrapBaseFileSplit(split, externalFileSplit); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } - } } \ No newline at end of file diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index c24c75359..f23e6ac86 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.hive; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.hadoop.HoodieFileInputFormatBase; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; @@ -876,7 +877,7 @@ public class HoodieCombineHiveInputFormat result; if (hoodieFilter) { - HoodieParquetInputFormat input; + HoodieFileInputFormatBase input; if (isRealTime) { LOG.info("Using HoodieRealtimeInputFormat"); input = createParquetRealtimeInputFormat(); @@ -916,7 +917,7 @@ public class HoodieCombineHiveInputFormat fileSplits = Arrays.stream(super.getSplits(job, numSplits)) - .map(is -> (FileSplit) is) - .collect(Collectors.toList()); - return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); - } - - @Override - protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) { - // no specific filtering for Realtime format - return timeline; - } + // NOTE: We're only using {@code HoodieHFileInputFormat} to compose {@code RecordReader} + private final HoodieHFileInputFormat hFileInputFormat = new HoodieHFileInputFormat(); @Override public RecordReader getRecordReader(final InputSplit split, final JobConf jobConf, @@ -102,6 +88,12 @@ public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat { "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split); return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, jobConf, - super.getRecordReader(split, jobConf, reporter)); + hFileInputFormat.getRecordReader(split, jobConf, reporter)); + } + + @Override + protected boolean isSplitable(FileSystem fs, Path filename) { + // This file isn't splittable. + return false; } } \ No newline at end of file diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index d8bfce67d..5f8b11cb1 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -18,256 +18,60 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFileGroup; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hadoop.BootstrapBaseFileSplit; -import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; -import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile; -import org.apache.hudi.hadoop.RealtimeFileStatus; -import org.apache.hudi.hadoop.PathWithLogFilePath; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.HashMap; -import java.util.stream.Collectors; - /** * Input Format, that provides a real-time view of data in a Hoodie table. */ @UseRecordReaderFromInputFormat @UseFileSplitsFromInputFormat -public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable { +public class HoodieParquetRealtimeInputFormat extends HoodieRealtimeFileInputFormatBase implements Configurable { private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class); + // NOTE: We're only using {@code HoodieParquetInputFormat} to compose {@code RecordReader} + private final HoodieParquetInputFormat parquetInputFormat = new HoodieParquetInputFormat(); + // To make Hive on Spark queries work with RT tables. Our theory is that due to // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher} // not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple // times which ultimately breaks the query. - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList()); + public RecordReader getRecordReader(final InputSplit split, final JobConf jobConf, + final Reporter reporter) throws IOException { + // sanity check + ValidationUtils.checkArgument(split instanceof RealtimeSplit, + "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split); + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + addProjectionToJobConf(realtimeSplit, jobConf); + LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - return HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits) - ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits) - : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); - } - - /** - * Keep the logic of mor_incr_view as same as spark datasource. - * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1). - * Step2: Get list of affected files status for these affected file status. - * Step3: Construct HoodieTableFileSystemView based on those affected file status. - * a. Filter affected partitions based on inputPaths. - * b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups. - * Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to - * this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step. - * Step5: Find candidate fileStatus, since when we get baseFileStatus from HoodieTableFileSystemView, - * the BaseFileStatus will missing file size information. - * We should use candidate fileStatus to update the size information for BaseFileStatus. - * Step6: For every file group from step3(b) - * Get 1st available base file from all file slices. then we use candidate file status to update the baseFileStatus, - * and construct RealTimeFileStatus and add it to result along with log files. - * If file group just has log files, construct RealTimeFileStatus and add it to result. - * TODO: unify the incremental view code between hive/spark-sql and spark datasource - */ - @Override - protected List listStatusForIncrementalMode(JobConf job, - HoodieTableMetaClient tableMetaClient, - List inputPaths, - String incrementalTable) throws IOException { - List result = new ArrayList<>(); - Job jobContext = Job.getInstance(job); - - // step1 - Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); - if (!timeline.isPresent()) { - return result; - } - HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTable, timeline.get()); - Option> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList())); - if (!commitsToCheck.isPresent()) { - return result; - } - // step2 - commitsToCheck.get().sort(HoodieInstant::compareTo); - List metadataList = commitsToCheck - .get().stream().map(instant -> { - try { - return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn); - } catch (IOException e) { - throw new HoodieException(String.format("cannot get metadata for instant: %s", instant)); - } - }).collect(Collectors.toList()); - - // build fileGroup from fsView - List affectedFileStatus = Arrays.asList(HoodieInputFormatUtils - .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList)); - // step3 - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0])); - // build fileGroup from fsView - Path basePath = new Path(tableMetaClient.getBasePath()); - // filter affectedPartition by inputPaths - List affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream() - .filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList()); - if (affectedPartition.isEmpty()) { - return result; - } - List fileGroups = affectedPartition.stream() - .flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList()); - // step4 - setInputPaths(job, affectedPartition.stream() - .map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, p).toString()).collect(Collectors.joining(","))); - - // step5 - // find all file status in partitionPaths. - FileStatus[] fileStatuses = doListStatus(job); - Map candidateFileStatus = new HashMap<>(); - for (int i = 0; i < fileStatuses.length; i++) { - String key = fileStatuses[i].getPath().toString(); - candidateFileStatus.put(key, fileStatuses[i]); + // for log only split, set the parquet reader as empty. + if (FSUtils.isLogFile(realtimeSplit.getPath())) { + return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); } - String maxCommitTime = fsView.getLastInstant().get().getTimestamp(); - // step6 - result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus)); - return result; - } - - private List collectAllIncrementalFiles(List fileGroups, String maxCommitTime, String basePath, Map candidateFileStatus) { - List result = new ArrayList<>(); - fileGroups.stream().forEach(f -> { - try { - List baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList()); - if (!baseFiles.isEmpty()) { - FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get()); - String baseFilePath = baseFileStatus.getPath().toUri().toString(); - if (!candidateFileStatus.containsKey(baseFilePath)) { - throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath); - } - // We cannot use baseFileStatus.getPath() here, since baseFileStatus.getPath() missing file size information. - // So we use candidateFileStatus.get(baseFileStatus.getPath()) to get a correct path. - RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath)); - fileStatus.setMaxCommitTime(maxCommitTime); - fileStatus.setBelongToIncrementalFileStatus(true); - fileStatus.setBasePath(basePath); - fileStatus.setBaseFilePath(baseFilePath); - fileStatus.setDeltaLogFiles(f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList())); - // try to set bootstrapfileStatus - if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { - fileStatus.setBootStrapFileStatus(baseFileStatus); - } - result.add(fileStatus); - } - // add file group which has only logs. - if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) { - List logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList()); - if (logFileStatus.size() > 0) { - RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0)); - fileStatus.setBelongToIncrementalFileStatus(true); - fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList())); - fileStatus.setMaxCommitTime(maxCommitTime); - fileStatus.setBasePath(basePath); - result.add(fileStatus); - } - } - } catch (IOException e) { - throw new HoodieException("Error obtaining data file/log file grouping ", e); - } - }); - return result; - } - - @Override - protected boolean includeLogFilesForSnapshotView() { - return true; - } - - @Override - protected boolean isSplitable(FileSystem fs, Path filename) { - if (filename instanceof PathWithLogFilePath) { - return ((PathWithLogFilePath)filename).splitable(); - } - return super.isSplitable(fs, filename); - } - - // make split for path. - // When query the incremental view, the read files may be bootstrap files, we wrap those bootstrap files into - // PathWithLogFilePath, so those bootstrap files should be processed int this function. - @Override - protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) { - if (file instanceof PathWithLogFilePath) { - return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null); - } - return super.makeSplit(file, start, length, hosts); - } - - @Override - protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { - if (file instanceof PathWithLogFilePath) { - return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts); - } - return super.makeSplit(file, start, length, hosts, inMemoryHosts); - } - - private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) { - if (!path.includeBootstrapFilePath()) { - return path.buildSplit(path, start, length, hosts); - } - - FileSplit bf = inMemoryHosts == null - ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts) - : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts); - - return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit( - (BootstrapBaseFileSplit) bf, - path.getBasePath(), - path.getDeltaLogFiles(), - path.getMaxCommitTime(), - path.getBelongsToIncrementalQuery()); - } - - @Override - protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) { - // no specific filtering for Realtime format - return timeline; + return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, + parquetInputFormat.getRecordReader(split, jobConf, reporter)); } void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) { @@ -297,25 +101,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i } } } + HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf); } - - @Override - public RecordReader getRecordReader(final InputSplit split, final JobConf jobConf, - final Reporter reporter) throws IOException { - // sanity check - ValidationUtils.checkArgument(split instanceof RealtimeSplit, - "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split); - RealtimeSplit realtimeSplit = (RealtimeSplit) split; - addProjectionToJobConf(realtimeSplit, jobConf); - LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) - + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - - // for log only split, set the parquet reader as empty. - if (FSUtils.isLogFile(realtimeSplit.getPath())) { - return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); - } - return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, - super.getRecordReader(split, jobConf, reporter)); - } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileInputFormatBase.java new file mode 100644 index 000000000..2fe3bbdc1 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileInputFormatBase.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.BootstrapBaseFileSplit; +import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; +import org.apache.hudi.hadoop.HoodieFileInputFormatBase; +import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile; +import org.apache.hudi.hadoop.PathWithLogFilePath; +import org.apache.hudi.hadoop.RealtimeFileStatus; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's + * Merge-on-Read (COW) tables in various configurations: + * + *
      + *
    • Snapshot mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)
    • + *
    • Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)
    • + *
    • External mode: reading non-Hudi partitions
    • + *
    + * + * NOTE: This class is invariant of the underlying file-format of the files being read + */ +public abstract class HoodieRealtimeFileInputFormatBase extends HoodieFileInputFormatBase implements Configurable { + + private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class); + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList()); + + return HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits) + ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits) + : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); + } + + /** + * Keep the logic of mor_incr_view as same as spark datasource. + * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1). + * Step2: Get list of affected files status for these affected file status. + * Step3: Construct HoodieTableFileSystemView based on those affected file status. + * a. Filter affected partitions based on inputPaths. + * b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups. + * Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to + * this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step. + * Step5: Find candidate fileStatus, since when we get baseFileStatus from HoodieTableFileSystemView, + * the BaseFileStatus will missing file size information. + * We should use candidate fileStatus to update the size information for BaseFileStatus. + * Step6: For every file group from step3(b) + * Get 1st available base file from all file slices. then we use candidate file status to update the baseFileStatus, + * and construct RealTimeFileStatus and add it to result along with log files. + * If file group just has log files, construct RealTimeFileStatus and add it to result. + * TODO: unify the incremental view code between hive/spark-sql and spark datasource + */ + @Override + protected List listStatusForIncrementalMode(JobConf job, + HoodieTableMetaClient tableMetaClient, + List inputPaths, + String incrementalTableName) throws IOException { + List result = new ArrayList<>(); + Job jobContext = Job.getInstance(job); + + // step1 + Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); + if (!timeline.isPresent()) { + return result; + } + HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTableName, timeline.get()); + Option> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList())); + if (!commitsToCheck.isPresent()) { + return result; + } + // step2 + commitsToCheck.get().sort(HoodieInstant::compareTo); + List metadataList = commitsToCheck + .get().stream().map(instant -> { + try { + return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn); + } catch (IOException e) { + throw new HoodieException(String.format("cannot get metadata for instant: %s", instant)); + } + }).collect(Collectors.toList()); + + // build fileGroup from fsView + List affectedFileStatus = Arrays.asList(HoodieInputFormatUtils + .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList)); + // step3 + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0])); + // build fileGroup from fsView + Path basePath = new Path(tableMetaClient.getBasePath()); + // filter affectedPartition by inputPaths + List affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream() + .filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList()); + if (affectedPartition.isEmpty()) { + return result; + } + List fileGroups = affectedPartition.stream() + .flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList()); + // step4 + setInputPaths(job, affectedPartition.stream() + .map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, p).toString()).collect(Collectors.joining(","))); + + // step5 + // find all file status in partitionPaths. + FileStatus[] fileStatuses = doListStatus(job); + Map candidateFileStatus = new HashMap<>(); + for (int i = 0; i < fileStatuses.length; i++) { + String key = fileStatuses[i].getPath().toString(); + candidateFileStatus.put(key, fileStatuses[i]); + } + + String maxCommitTime = fsView.getLastInstant().get().getTimestamp(); + // step6 + result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus)); + return result; + } + + @Override + protected boolean includeLogFilesForSnapshotView() { + return true; + } + + @Override + protected boolean isSplitable(FileSystem fs, Path filename) { + if (filename instanceof PathWithLogFilePath) { + return ((PathWithLogFilePath)filename).splitable(); + } + + return super.isSplitable(fs, filename); + } + + // make split for path. + // When query the incremental view, the read files may be bootstrap files, we wrap those bootstrap files into + // PathWithLogFilePath, so those bootstrap files should be processed int this function. + @Override + protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) { + if (file instanceof PathWithLogFilePath) { + return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null); + } + return super.makeSplit(file, start, length, hosts); + } + + @Override + protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { + if (file instanceof PathWithLogFilePath) { + return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts); + } + return super.makeSplit(file, start, length, hosts, inMemoryHosts); + } + + private List collectAllIncrementalFiles(List fileGroups, String maxCommitTime, String basePath, Map candidateFileStatus) { + List result = new ArrayList<>(); + fileGroups.stream().forEach(f -> { + try { + List baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList()); + if (!baseFiles.isEmpty()) { + FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get()); + String baseFilePath = baseFileStatus.getPath().toUri().toString(); + if (!candidateFileStatus.containsKey(baseFilePath)) { + throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath); + } + // We cannot use baseFileStatus.getPath() here, since baseFileStatus.getPath() missing file size information. + // So we use candidateFileStatus.get(baseFileStatus.getPath()) to get a correct path. + RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath)); + fileStatus.setMaxCommitTime(maxCommitTime); + fileStatus.setBelongToIncrementalFileStatus(true); + fileStatus.setBasePath(basePath); + fileStatus.setBaseFilePath(baseFilePath); + fileStatus.setDeltaLogFiles(f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList())); + // try to set bootstrapfileStatus + if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { + fileStatus.setBootStrapFileStatus(baseFileStatus); + } + result.add(fileStatus); + } + // add file group which has only logs. + if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) { + List logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList()); + if (logFileStatus.size() > 0) { + RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0)); + fileStatus.setBelongToIncrementalFileStatus(true); + fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList())); + fileStatus.setMaxCommitTime(maxCommitTime); + fileStatus.setBasePath(basePath); + result.add(fileStatus); + } + } + } catch (IOException e) { + throw new HoodieException("Error obtaining data file/log file grouping ", e); + } + }); + return result; + } + + private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) { + if (!path.includeBootstrapFilePath()) { + return path.buildSplit(path, start, length, hosts); + } else { + FileSplit bf = + inMemoryHosts == null + ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts) + : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts); + return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit( + (BootstrapBaseFileSplit) bf, + path.getBasePath(), + path.getDeltaLogFiles(), + path.getMaxCommitTime(), + path.getBelongsToIncrementalQuery()); + } + } +} + diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 5c7a1fdf2..b7cb72e38 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -44,6 +44,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapreduce.Job; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -115,7 +116,7 @@ public class TestHoodieParquetInputFormat { timeline.setInstants(instants); // Verify getCommitsTimelineBeforePendingCompaction does not return instants after first compaction instant - HoodieTimeline filteredTimeline = inputFormat.filterInstantsTimeline(timeline); + HoodieTimeline filteredTimeline = HoodieInputFormatUtils.filterInstantsTimeline(timeline); assertTrue(filteredTimeline.containsInstant(t1)); assertTrue(filteredTimeline.containsInstant(t2)); assertFalse(filteredTimeline.containsInstant(t3)); @@ -126,7 +127,7 @@ public class TestHoodieParquetInputFormat { instants.remove(t3); timeline = new HoodieActiveTimeline(metaClient); timeline.setInstants(instants); - filteredTimeline = inputFormat.filterInstantsTimeline(timeline); + filteredTimeline = HoodieInputFormatUtils.filterInstantsTimeline(timeline); // verify all remaining instants are returned. assertTrue(filteredTimeline.containsInstant(t1)); @@ -140,7 +141,7 @@ public class TestHoodieParquetInputFormat { instants.remove(t5); timeline = new HoodieActiveTimeline(metaClient); timeline.setInstants(instants); - filteredTimeline = inputFormat.filterInstantsTimeline(timeline); + filteredTimeline = HoodieInputFormatUtils.filterInstantsTimeline(timeline); // verify all remaining instants are returned. assertTrue(filteredTimeline.containsInstant(t1)); diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json index c8fead0c1..00d16c660 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json @@ -1,8 +1,8 @@ -{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-0-c000.snappy.parquet"} -{"c1_maxValue":770,"c1_minValue":300,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 300sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_num_nulls":0,"c5_maxValue":78,"c5_minValue":31,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-1-c000.snappy.parquet"} -{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-0-c000.snappy.parquet"} -{"c1_maxValue":719,"c1_minValue":125,"c1_num_nulls":0,"c2_maxValue":" 719sdc","c2_minValue":" 125sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":153.125,"c3_num_nulls":0,"c5_maxValue":73,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-09-27","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"OA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-1-c000.snappy.parquet"} -{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-0-c000.snappy.parquet"} -{"c1_maxValue":486,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 79sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":771.590,"c3_minValue":82.111,"c3_num_nulls":0,"c5_maxValue":50,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-22","c6_num_nulls":0,"c7_maxValue":"5g==","c7_minValue":"Ow==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-1-c000.snappy.parquet"} -{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-0-c000.snappy.parquet"} -{"c1_maxValue":272,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":430.129,"c3_num_nulls":0,"c5_maxValue":28,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-03-23","c6_num_nulls":0,"c7_maxValue":"8A==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-1-c000.snappy.parquet"} \ No newline at end of file +{"c1_maxValue":272,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":430.129,"c3_num_nulls":0,"c5_maxValue":28,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-03-23","c6_num_nulls":0,"c7_maxValue":"8A==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"} +{"c1_maxValue":486,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 79sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":771.590,"c3_minValue":82.111,"c3_num_nulls":0,"c5_maxValue":50,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-22","c6_num_nulls":0,"c7_maxValue":"5g==","c7_minValue":"Ow==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"} +{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"} +{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"} +{"c1_maxValue":719,"c1_minValue":125,"c1_num_nulls":0,"c2_maxValue":" 719sdc","c2_minValue":" 125sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":153.125,"c3_num_nulls":0,"c5_maxValue":73,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-09-27","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"OA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"} +{"c1_maxValue":770,"c1_minValue":300,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 300sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_num_nulls":0,"c5_maxValue":78,"c5_minValue":31,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"} +{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"} +{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json index 30fccb3f9..a633e3170 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json @@ -1,4 +1,4 @@ -{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-0-c000.snappy.parquet"} -{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-0-c000.snappy.parquet"} -{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-0-c000.snappy.parquet"} -{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-0-c000.snappy.parquet"} \ No newline at end of file +{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"} +{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"} +{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"} +{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 7b06bca87..ae41fa8eb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -20,23 +20,20 @@ package org.apache.hudi.functional import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} -import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieColumnRangeMetadata import org.apache.hudi.common.util.ParquetUtils import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.spark.sql._ import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.typedLit import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, RowFactory, SaveMode, SparkSession, functions} import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import java.math.BigInteger -import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ -import scala.util.{Random, Success} +import scala.util.Random class TestColumnStatsIndex extends HoodieClientTestBase { var spark: SparkSession = _ @@ -354,11 +351,10 @@ class TestColumnStatsIndex extends HoodieClientTestBase { .distinct() .collect() .map(_.getString(0)) - .sorted val uuidToIdx: UserDefinedFunction = functions.udf((fileName: String) => { - val (uuid, idx) = uuids.zipWithIndex.find { case (uuid, _) => fileName.contains(uuid) }.get - fileName.replace(uuid, idx.toString) + val uuid = uuids.find(uuid => fileName.contains(uuid)).get + fileName.replace(uuid, "xxx") }) ds.withColumn("file", uuidToIdx(ds("file"))) @@ -409,12 +405,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase { .mkString("\n") private def sort(df: DataFrame): DataFrame = { - // Since upon parsing JSON, Spark re-order columns in lexicographical order - // of their names, we have to shuffle new Z-index table columns order to match - // Rows are sorted by filename as well to avoid val sortedCols = df.columns.sorted + // Sort dataset by the first 2 columns (to minimize non-determinism in case multiple files have the same + // value of the first column) df.select(sortedCols.head, sortedCols.tail: _*) - .sort("file") + .sort("c1_maxValue", "c1_minValue") } }