1
0

[HUDI-3206] Unify Hive's MOR implementations to avoid duplication (#4559)

Unify Hive's MOR implementations to avoid duplication to avoid duplication across implementations for different file-formats (Parquet, HFile, etc)

- Extracted HoodieRealtimeFileInputFormatBase (extending COW HoodieFileInputFormatBase base)
- Rebased Parquet, HFile implementations onto HoodieRealtimeFileInputFormatBase
- Tidying up
This commit is contained in:
Alexey Kudinkin
2022-02-07 11:06:28 -08:00
committed by GitHub
parent 773b317983
commit 3f263b82ce
10 changed files with 460 additions and 415 deletions

View File

@@ -21,10 +21,12 @@ package org.apache.hudi.hadoop;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.config.TypedProperties;
@@ -65,12 +67,32 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState;
* <li>Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)</li>
* <li>External mode: reading non-Hudi partitions</li>
* </ul>
*
* NOTE: This class is invariant of the underlying file-format of the files being read
*/
public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWritable, ArrayWritable>
implements Configurable {
protected Configuration conf;
@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, Stream<HoodieLogFile> logFiles) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBaseFilePath(baseFile.getPath());
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
rtFileStatus.setBootStrapFileStatus(baseFileStatus);
}
return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}
@Override
public final Configuration getConf() {
return conf;
@@ -81,6 +103,24 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
this.conf = conf;
}
protected abstract boolean includeLogFilesForSnapshotView();
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return !(filename instanceof PathWithBootstrapFileStatus);
}
@Override
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts) {
FileSplit split = new FileSplit(file, start, length, hosts);
if (file instanceof PathWithBootstrapFileStatus) {
return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
}
return split;
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
@@ -121,20 +161,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
return returns.toArray(new FileStatus[0]);
}
private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
checkState(diff.isEmpty(), "Should be empty");
}
@Nonnull
private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
return HoodieInputFormatUtils.getFileStatus(baseFile);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to get file-status", ioe);
}
}
/**
* Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
* lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
@@ -172,35 +198,25 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
protected abstract boolean includeLogFilesForSnapshotView();
@Override
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
FileSplit split = new FileSplit(file, start, length, hosts, inMemoryHosts);
if (file instanceof PathWithBootstrapFileStatus) {
return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
}
return split;
}
@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBaseFilePath(baseFile.getPath());
rtFileStatus.setBasePath(tableMetaClient.getBasePath());
if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());
rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
rtFileStatus.setBootStrapFileStatus(baseFileStatus);
}
return rtFileStatus;
LOG.info("Making external data split for " + file);
FileStatus externalFileStatus = file.getBootstrapFileStatus();
FileSplit externalFileSplit = makeSplit(externalFileStatus.getPath(), 0, externalFileStatus.getLen(),
new String[0], new String[0]);
return new BootstrapBaseFileSplit(split, externalFileSplit);
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
throw new HoodieIOException(e.getMessage(), e);
}
}
@@ -209,38 +225,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
}
@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBasePath(tableMetaClient.getBasePath());
if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());
rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}
return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}
private static Option<HoodieInstant> fromScala(scala.Option<HoodieInstant> opt) {
if (opt.isDefined()) {
return Option.of(opt.get());
}
return Option.empty();
}
@Nonnull
private List<FileStatus> listStatusForSnapshotMode(JobConf job,
Map<String, HoodieTableMetaClient> tableMetaClientMap,
@@ -317,4 +301,80 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
return targetFiles;
}
private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
checkState(diff.isEmpty(), "Should be empty");
}
@Nonnull
private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
return HoodieInputFormatUtils.getFileStatus(baseFile);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to get file-status", ioe);
}
}
@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBaseFilePath(baseFile.getPath());
rtFileStatus.setBasePath(tableMetaClient.getBasePath());
if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());
rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
rtFileStatus.setBootStrapFileStatus(baseFileStatus);
}
return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}
@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBasePath(tableMetaClient.getBasePath());
if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());
rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}
return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}
private static Option<HoodieInstant> fromScala(scala.Option<HoodieInstant> opt) {
if (opt.isDefined()) {
return Option.of(opt.get());
}
return Option.empty();
}
}

View File

@@ -18,17 +18,7 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -39,6 +29,9 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -62,10 +55,6 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen
// {@code RecordReader}
private final MapredParquetInputFormat mapredParquetInputFormat = new MapredParquetInputFormat();
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
}
protected boolean includeLogFilesForSnapshotView() {
return false;
}
@@ -96,32 +85,6 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen
return getRecordReaderInternal(split, job, reporter);
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return !(filename instanceof PathWithBootstrapFileStatus);
}
@Override
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts) {
FileSplit split = new FileSplit(file, start, length, hosts);
if (file instanceof PathWithBootstrapFileStatus) {
return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
}
return split;
}
@Override
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
FileSplit split = new FileSplit(file, start, length, hosts, inMemoryHosts);
if (file instanceof PathWithBootstrapFileStatus) {
return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
}
return split;
}
private RecordReader<NullWritable, ArrayWritable> getRecordReaderInternal(InputSplit split,
JobConf job,
Reporter reporter) throws IOException {
@@ -176,16 +139,4 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen
true);
}
}
private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
try {
LOG.info("Making external data split for " + file);
FileStatus externalFileStatus = file.getBootstrapFileStatus();
FileSplit externalFileSplit = makeSplit(externalFileStatus.getPath(), 0, externalFileStatus.getLen(),
new String[0], new String[0]);
return new BootstrapBaseFileSplit(split, externalFileSplit);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.hive;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieFileInputFormatBase;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
@@ -876,7 +877,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
LOG.info("Listing status in HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim");
List<FileStatus> result;
if (hoodieFilter) {
HoodieParquetInputFormat input;
HoodieFileInputFormatBase input;
if (isRealTime) {
LOG.info("Using HoodieRealtimeInputFormat");
input = createParquetRealtimeInputFormat();
@@ -916,7 +917,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
job.set("hudi.hive.realtime", "true");
InputSplit[] splits;
if (hoodieFilter) {
HoodieParquetInputFormat input = createParquetRealtimeInputFormat();
HoodieParquetRealtimeInputFormat input = createParquetRealtimeInputFormat();
input.setConf(job);
splits = input.getSplits(job, numSplits);
} else {

View File

@@ -18,15 +18,15 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
@@ -38,32 +38,18 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* HoodieRealtimeInputFormat for HUDI datasets which store data in HFile base file format.
*/
@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
public class HoodieHFileRealtimeInputFormat extends HoodieRealtimeFileInputFormatBase {
private static final Logger LOG = LogManager.getLogger(HoodieHFileRealtimeInputFormat.class);
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits))
.map(is -> (FileSplit) is)
.collect(Collectors.toList());
return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}
@Override
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
// no specific filtering for Realtime format
return timeline;
}
// NOTE: We're only using {@code HoodieHFileInputFormat} to compose {@code RecordReader}
private final HoodieHFileInputFormat hFileInputFormat = new HoodieHFileInputFormat();
@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
@@ -102,6 +88,12 @@ public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split);
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, jobConf,
super.getRecordReader(split, jobConf, reporter));
hFileInputFormat.getRecordReader(split, jobConf, reporter));
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
// This file isn't splittable.
return false;
}
}

View File

@@ -18,256 +18,60 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.PathWithLogFilePath;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;
/**
* Input Format, that provides a real-time view of data in a Hoodie table.
*/
@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable {
public class HoodieParquetRealtimeInputFormat extends HoodieRealtimeFileInputFormatBase implements Configurable {
private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
// NOTE: We're only using {@code HoodieParquetInputFormat} to compose {@code RecordReader}
private final HoodieParquetInputFormat parquetInputFormat = new HoodieParquetInputFormat();
// To make Hive on Spark queries work with RT tables. Our theory is that due to
// {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
// not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple
// times which ultimately breaks the query.
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
final Reporter reporter) throws IOException {
// sanity check
ValidationUtils.checkArgument(split instanceof RealtimeSplit,
"HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
addProjectionToJobConf(realtimeSplit, jobConf);
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
return HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits)
? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits)
: HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}
/**
* Keep the logic of mor_incr_view as same as spark datasource.
* Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1).
* Step2: Get list of affected files status for these affected file status.
* Step3: Construct HoodieTableFileSystemView based on those affected file status.
* a. Filter affected partitions based on inputPaths.
* b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups.
* Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to
* this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step.
* Step5: Find candidate fileStatus, since when we get baseFileStatus from HoodieTableFileSystemView,
* the BaseFileStatus will missing file size information.
* We should use candidate fileStatus to update the size information for BaseFileStatus.
* Step6: For every file group from step3(b)
* Get 1st available base file from all file slices. then we use candidate file status to update the baseFileStatus,
* and construct RealTimeFileStatus and add it to result along with log files.
* If file group just has log files, construct RealTimeFileStatus and add it to result.
* TODO: unify the incremental view code between hive/spark-sql and spark datasource
*/
@Override
protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
HoodieTableMetaClient tableMetaClient,
List<Path> inputPaths,
String incrementalTable) throws IOException {
List<FileStatus> result = new ArrayList<>();
Job jobContext = Job.getInstance(job);
// step1
Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
if (!timeline.isPresent()) {
return result;
}
HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTable, timeline.get());
Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
if (!commitsToCheck.isPresent()) {
return result;
}
// step2
commitsToCheck.get().sort(HoodieInstant::compareTo);
List<HoodieCommitMetadata> metadataList = commitsToCheck
.get().stream().map(instant -> {
try {
return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn);
} catch (IOException e) {
throw new HoodieException(String.format("cannot get metadata for instant: %s", instant));
}
}).collect(Collectors.toList());
// build fileGroup from fsView
List<FileStatus> affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
.listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList));
// step3
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
// build fileGroup from fsView
Path basePath = new Path(tableMetaClient.getBasePath());
// filter affectedPartition by inputPaths
List<String> affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream()
.filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
if (affectedPartition.isEmpty()) {
return result;
}
List<HoodieFileGroup> fileGroups = affectedPartition.stream()
.flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
// step4
setInputPaths(job, affectedPartition.stream()
.map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, p).toString()).collect(Collectors.joining(",")));
// step5
// find all file status in partitionPaths.
FileStatus[] fileStatuses = doListStatus(job);
Map<String, FileStatus> candidateFileStatus = new HashMap<>();
for (int i = 0; i < fileStatuses.length; i++) {
String key = fileStatuses[i].getPath().toString();
candidateFileStatus.put(key, fileStatuses[i]);
// for log only split, set the parquet reader as empty.
if (FSUtils.isLogFile(realtimeSplit.getPath())) {
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf));
}
String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
// step6
result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus));
return result;
}
private List<FileStatus> collectAllIncrementalFiles(List<HoodieFileGroup> fileGroups, String maxCommitTime, String basePath, Map<String, FileStatus> candidateFileStatus) {
List<FileStatus> result = new ArrayList<>();
fileGroups.stream().forEach(f -> {
try {
List<FileSlice> baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList());
if (!baseFiles.isEmpty()) {
FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
String baseFilePath = baseFileStatus.getPath().toUri().toString();
if (!candidateFileStatus.containsKey(baseFilePath)) {
throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath);
}
// We cannot use baseFileStatus.getPath() here, since baseFileStatus.getPath() missing file size information.
// So we use candidateFileStatus.get(baseFileStatus.getPath()) to get a correct path.
RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
fileStatus.setMaxCommitTime(maxCommitTime);
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setBasePath(basePath);
fileStatus.setBaseFilePath(baseFilePath);
fileStatus.setDeltaLogFiles(f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList()));
// try to set bootstrapfileStatus
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
fileStatus.setBootStrapFileStatus(baseFileStatus);
}
result.add(fileStatus);
}
// add file group which has only logs.
if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) {
List<FileStatus> logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList());
if (logFileStatus.size() > 0) {
RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0));
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList()));
fileStatus.setMaxCommitTime(maxCommitTime);
fileStatus.setBasePath(basePath);
result.add(fileStatus);
}
}
} catch (IOException e) {
throw new HoodieException("Error obtaining data file/log file grouping ", e);
}
});
return result;
}
@Override
protected boolean includeLogFilesForSnapshotView() {
return true;
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
if (filename instanceof PathWithLogFilePath) {
return ((PathWithLogFilePath)filename).splitable();
}
return super.isSplitable(fs, filename);
}
// make split for path.
// When query the incremental view, the read files may be bootstrap files, we wrap those bootstrap files into
// PathWithLogFilePath, so those bootstrap files should be processed int this function.
@Override
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
if (file instanceof PathWithLogFilePath) {
return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null);
}
return super.makeSplit(file, start, length, hosts);
}
@Override
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
if (file instanceof PathWithLogFilePath) {
return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts);
}
return super.makeSplit(file, start, length, hosts, inMemoryHosts);
}
private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
if (!path.includeBootstrapFilePath()) {
return path.buildSplit(path, start, length, hosts);
}
FileSplit bf = inMemoryHosts == null
? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts)
: super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit(
(BootstrapBaseFileSplit) bf,
path.getBasePath(),
path.getDeltaLogFiles(),
path.getMaxCommitTime(),
path.getBelongsToIncrementalQuery());
}
@Override
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
// no specific filtering for Realtime format
return timeline;
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
parquetInputFormat.getRecordReader(split, jobConf, reporter));
}
void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
@@ -297,25 +101,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
}
}
}
HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
}
@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
final Reporter reporter) throws IOException {
// sanity check
ValidationUtils.checkArgument(split instanceof RealtimeSplit,
"HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
addProjectionToJobConf(realtimeSplit, jobConf);
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
// for log only split, set the parquet reader as empty.
if (FSUtils.isLogFile(realtimeSplit.getPath())) {
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf));
}
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
super.getRecordReader(split, jobConf, reporter));
}
}

View File

@@ -0,0 +1,259 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HoodieFileInputFormatBase;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.PathWithLogFilePath;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's
* Merge-on-Read (COW) tables in various configurations:
*
* <ul>
* <li>Snapshot mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)</li>
* <li>Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)</li>
* <li>External mode: reading non-Hudi partitions</li>
* </ul>
*
* NOTE: This class is invariant of the underlying file-format of the files being read
*/
public abstract class HoodieRealtimeFileInputFormatBase extends HoodieFileInputFormatBase implements Configurable {
private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
return HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits)
? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits)
: HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}
/**
* Keep the logic of mor_incr_view as same as spark datasource.
* Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1).
* Step2: Get list of affected files status for these affected file status.
* Step3: Construct HoodieTableFileSystemView based on those affected file status.
* a. Filter affected partitions based on inputPaths.
* b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups.
* Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to
* this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step.
* Step5: Find candidate fileStatus, since when we get baseFileStatus from HoodieTableFileSystemView,
* the BaseFileStatus will missing file size information.
* We should use candidate fileStatus to update the size information for BaseFileStatus.
* Step6: For every file group from step3(b)
* Get 1st available base file from all file slices. then we use candidate file status to update the baseFileStatus,
* and construct RealTimeFileStatus and add it to result along with log files.
* If file group just has log files, construct RealTimeFileStatus and add it to result.
* TODO: unify the incremental view code between hive/spark-sql and spark datasource
*/
@Override
protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
HoodieTableMetaClient tableMetaClient,
List<Path> inputPaths,
String incrementalTableName) throws IOException {
List<FileStatus> result = new ArrayList<>();
Job jobContext = Job.getInstance(job);
// step1
Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
if (!timeline.isPresent()) {
return result;
}
HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTableName, timeline.get());
Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
if (!commitsToCheck.isPresent()) {
return result;
}
// step2
commitsToCheck.get().sort(HoodieInstant::compareTo);
List<HoodieCommitMetadata> metadataList = commitsToCheck
.get().stream().map(instant -> {
try {
return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn);
} catch (IOException e) {
throw new HoodieException(String.format("cannot get metadata for instant: %s", instant));
}
}).collect(Collectors.toList());
// build fileGroup from fsView
List<FileStatus> affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
.listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList));
// step3
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
// build fileGroup from fsView
Path basePath = new Path(tableMetaClient.getBasePath());
// filter affectedPartition by inputPaths
List<String> affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream()
.filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
if (affectedPartition.isEmpty()) {
return result;
}
List<HoodieFileGroup> fileGroups = affectedPartition.stream()
.flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
// step4
setInputPaths(job, affectedPartition.stream()
.map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, p).toString()).collect(Collectors.joining(",")));
// step5
// find all file status in partitionPaths.
FileStatus[] fileStatuses = doListStatus(job);
Map<String, FileStatus> candidateFileStatus = new HashMap<>();
for (int i = 0; i < fileStatuses.length; i++) {
String key = fileStatuses[i].getPath().toString();
candidateFileStatus.put(key, fileStatuses[i]);
}
String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
// step6
result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus));
return result;
}
@Override
protected boolean includeLogFilesForSnapshotView() {
return true;
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
if (filename instanceof PathWithLogFilePath) {
return ((PathWithLogFilePath)filename).splitable();
}
return super.isSplitable(fs, filename);
}
// make split for path.
// When query the incremental view, the read files may be bootstrap files, we wrap those bootstrap files into
// PathWithLogFilePath, so those bootstrap files should be processed int this function.
@Override
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
if (file instanceof PathWithLogFilePath) {
return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null);
}
return super.makeSplit(file, start, length, hosts);
}
@Override
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
if (file instanceof PathWithLogFilePath) {
return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts);
}
return super.makeSplit(file, start, length, hosts, inMemoryHosts);
}
private List<FileStatus> collectAllIncrementalFiles(List<HoodieFileGroup> fileGroups, String maxCommitTime, String basePath, Map<String, FileStatus> candidateFileStatus) {
List<FileStatus> result = new ArrayList<>();
fileGroups.stream().forEach(f -> {
try {
List<FileSlice> baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList());
if (!baseFiles.isEmpty()) {
FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
String baseFilePath = baseFileStatus.getPath().toUri().toString();
if (!candidateFileStatus.containsKey(baseFilePath)) {
throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath);
}
// We cannot use baseFileStatus.getPath() here, since baseFileStatus.getPath() missing file size information.
// So we use candidateFileStatus.get(baseFileStatus.getPath()) to get a correct path.
RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
fileStatus.setMaxCommitTime(maxCommitTime);
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setBasePath(basePath);
fileStatus.setBaseFilePath(baseFilePath);
fileStatus.setDeltaLogFiles(f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList()));
// try to set bootstrapfileStatus
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
fileStatus.setBootStrapFileStatus(baseFileStatus);
}
result.add(fileStatus);
}
// add file group which has only logs.
if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) {
List<FileStatus> logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList());
if (logFileStatus.size() > 0) {
RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0));
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList()));
fileStatus.setMaxCommitTime(maxCommitTime);
fileStatus.setBasePath(basePath);
result.add(fileStatus);
}
}
} catch (IOException e) {
throw new HoodieException("Error obtaining data file/log file grouping ", e);
}
});
return result;
}
private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
if (!path.includeBootstrapFilePath()) {
return path.buildSplit(path, start, length, hosts);
} else {
FileSplit bf =
inMemoryHosts == null
? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts)
: super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit(
(BootstrapBaseFileSplit) bf,
path.getBasePath(),
path.getDeltaLogFiles(),
path.getMaxCommitTime(),
path.getBelongsToIncrementalQuery());
}
}
}

View File

@@ -44,6 +44,7 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -115,7 +116,7 @@ public class TestHoodieParquetInputFormat {
timeline.setInstants(instants);
// Verify getCommitsTimelineBeforePendingCompaction does not return instants after first compaction instant
HoodieTimeline filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
HoodieTimeline filteredTimeline = HoodieInputFormatUtils.filterInstantsTimeline(timeline);
assertTrue(filteredTimeline.containsInstant(t1));
assertTrue(filteredTimeline.containsInstant(t2));
assertFalse(filteredTimeline.containsInstant(t3));
@@ -126,7 +127,7 @@ public class TestHoodieParquetInputFormat {
instants.remove(t3);
timeline = new HoodieActiveTimeline(metaClient);
timeline.setInstants(instants);
filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
filteredTimeline = HoodieInputFormatUtils.filterInstantsTimeline(timeline);
// verify all remaining instants are returned.
assertTrue(filteredTimeline.containsInstant(t1));
@@ -140,7 +141,7 @@ public class TestHoodieParquetInputFormat {
instants.remove(t5);
timeline = new HoodieActiveTimeline(metaClient);
timeline.setInstants(instants);
filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
filteredTimeline = HoodieInputFormatUtils.filterInstantsTimeline(timeline);
// verify all remaining instants are returned.
assertTrue(filteredTimeline.containsInstant(t1));

View File

@@ -1,8 +1,8 @@
{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-0-c000.snappy.parquet"}
{"c1_maxValue":770,"c1_minValue":300,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 300sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_num_nulls":0,"c5_maxValue":78,"c5_minValue":31,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-1-c000.snappy.parquet"}
{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-0-c000.snappy.parquet"}
{"c1_maxValue":719,"c1_minValue":125,"c1_num_nulls":0,"c2_maxValue":" 719sdc","c2_minValue":" 125sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":153.125,"c3_num_nulls":0,"c5_maxValue":73,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-09-27","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"OA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-1-c000.snappy.parquet"}
{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-0-c000.snappy.parquet"}
{"c1_maxValue":486,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 79sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":771.590,"c3_minValue":82.111,"c3_num_nulls":0,"c5_maxValue":50,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-22","c6_num_nulls":0,"c7_maxValue":"5g==","c7_minValue":"Ow==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-1-c000.snappy.parquet"}
{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-0-c000.snappy.parquet"}
{"c1_maxValue":272,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":430.129,"c3_num_nulls":0,"c5_maxValue":28,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-03-23","c6_num_nulls":0,"c7_maxValue":"8A==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-1-c000.snappy.parquet"}
{"c1_maxValue":272,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":430.129,"c3_num_nulls":0,"c5_maxValue":28,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-03-23","c6_num_nulls":0,"c7_maxValue":"8A==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}
{"c1_maxValue":486,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 79sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":771.590,"c3_minValue":82.111,"c3_num_nulls":0,"c5_maxValue":50,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-22","c6_num_nulls":0,"c7_maxValue":"5g==","c7_minValue":"Ow==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
{"c1_maxValue":719,"c1_minValue":125,"c1_num_nulls":0,"c2_maxValue":" 719sdc","c2_minValue":" 125sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":153.125,"c3_num_nulls":0,"c5_maxValue":73,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-09-27","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"OA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
{"c1_maxValue":770,"c1_minValue":300,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 300sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_num_nulls":0,"c5_maxValue":78,"c5_minValue":31,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}

View File

@@ -1,4 +1,4 @@
{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-0-c000.snappy.parquet"}
{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-0-c000.snappy.parquet"}
{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-0-c000.snappy.parquet"}
{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-0-c000.snappy.parquet"}
{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}

View File

@@ -20,23 +20,20 @@ package org.apache.hudi.functional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieColumnRangeMetadata
import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, RowFactory, SaveMode, SparkSession, functions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import java.math.BigInteger
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import scala.util.{Random, Success}
import scala.util.Random
class TestColumnStatsIndex extends HoodieClientTestBase {
var spark: SparkSession = _
@@ -354,11 +351,10 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
.distinct()
.collect()
.map(_.getString(0))
.sorted
val uuidToIdx: UserDefinedFunction = functions.udf((fileName: String) => {
val (uuid, idx) = uuids.zipWithIndex.find { case (uuid, _) => fileName.contains(uuid) }.get
fileName.replace(uuid, idx.toString)
val uuid = uuids.find(uuid => fileName.contains(uuid)).get
fileName.replace(uuid, "xxx")
})
ds.withColumn("file", uuidToIdx(ds("file")))
@@ -409,12 +405,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
.mkString("\n")
private def sort(df: DataFrame): DataFrame = {
// Since upon parsing JSON, Spark re-order columns in lexicographical order
// of their names, we have to shuffle new Z-index table columns order to match
// Rows are sorted by filename as well to avoid
val sortedCols = df.columns.sorted
// Sort dataset by the first 2 columns (to minimize non-determinism in case multiple files have the same
// value of the first column)
df.select(sortedCols.head, sortedCols.tail: _*)
.sort("file")
.sort("c1_maxValue", "c1_minValue")
}
}