1
0

[HUDI-2086]redo the logical of mor_incremental_view for hive (#3203)

This commit is contained in:
xiarixiaoyao
2021-11-10 15:41:07 +08:00
committed by GitHub
parent fd0f5df26d
commit a40ac62e0c
9 changed files with 1005 additions and 32 deletions

View File

@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Encode additional information in split to track matching log file and base files.
* Hence, this class tracks a log/base file split.
*/
public class BaseFileWithLogsSplit extends FileSplit {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalSplit = false;
// the log file paths of this split.
private List<String> deltaLogPaths = new ArrayList<>();
// max commit time of current split.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this split.
private String baseFilePath = "";
public BaseFileWithLogsSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeBoolean(belongToIncrementalSplit);
Text.writeString(out, maxCommitTime);
Text.writeString(out, basePath);
Text.writeString(out, baseFilePath);
out.writeInt(deltaLogPaths.size());
for (String logPath : deltaLogPaths) {
Text.writeString(out, logPath);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
belongToIncrementalSplit = in.readBoolean();
maxCommitTime = Text.readString(in);
basePath = Text.readString(in);
baseFilePath = Text.readString(in);
int deltaLogSize = in.readInt();
List<String> tempDeltaLogs = new ArrayList<>();
for (int i = 0; i < deltaLogSize; i++) {
tempDeltaLogs.add(Text.readString(in));
}
deltaLogPaths = tempDeltaLogs;
}
public boolean getBelongToIncrementalSplit() {
return belongToIncrementalSplit;
}
public void setBelongToIncrementalSplit(boolean belongToIncrementalSplit) {
this.belongToIncrementalSplit = belongToIncrementalSplit;
}
public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}
public String getBasePath() {
return basePath;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
public String getBaseFilePath() {
return baseFilePath;
}
public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}
}

View File

@@ -73,6 +73,14 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
}
protected FileStatus[] getStatus(JobConf job) throws IOException {
return super.listStatus(job);
}
protected boolean includeLogFilesForSnapShotView() {
return false;
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
@@ -108,7 +116,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths));
returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()));
}
return returns.toArray(new FileStatus[0]);
}
@@ -120,7 +128,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
* partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions.
*/
private List<FileStatus> listStatusForIncrementalMode(
protected List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
String tableName = tableMetaClient.getTableConfig().getTableName();
Job jobContext = Job.getInstance(job);

View File

@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
import java.util.List;
/**
* Encode additional information in Path to track matching log file and base files.
* Hence, this class tracks a log/base file status.
*/
public class PathWithLogFilePath extends Path {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalPath = false;
// the log files belong this path.
private List<String> deltaLogPaths = new ArrayList<>();
// max commit time of current path.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this path;
private String baseFilePath = "";
// the bootstrap file belong to this path.
// only if current query table is bootstrap table, this field is used.
private PathWithBootstrapFileStatus pathWithBootstrapFileStatus;
public PathWithLogFilePath(Path parent, String child) {
super(parent, child);
}
public void setBelongToIncrementalPath(boolean belongToIncrementalPath) {
this.belongToIncrementalPath = belongToIncrementalPath;
}
public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}
public String getBasePath() {
return basePath;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}
public boolean splitable() {
return !baseFilePath.isEmpty();
}
public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() {
return pathWithBootstrapFileStatus;
}
public void setPathWithBootstrapFileStatus(PathWithBootstrapFileStatus pathWithBootstrapFileStatus) {
this.pathWithBootstrapFileStatus = pathWithBootstrapFileStatus;
}
public boolean includeBootstrapFilePath() {
return pathWithBootstrapFileStatus != null;
}
public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) {
BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts);
bs.setBelongToIncrementalSplit(belongToIncrementalPath);
bs.setDeltaLogPaths(deltaLogPaths);
bs.setMaxCommitTime(maxCommitTime);
bs.setBasePath(basePath);
bs.setBaseFilePath(baseFilePath);
return bs;
}
}

View File

@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* With the base input format implementations in Hadoop/Hive,
* we need to encode additional information in Path to track base files and logs files for realtime read.
* Hence, this class tracks a log/base file status
* in Path.
*/
public class RealtimeFileStatus extends FileStatus {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalFileStatus = false;
// the log files belong this fileStatus.
private List<String> deltaLogPaths = new ArrayList<>();
// max commit time of current fileStatus.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this status;
private String baseFilePath = "";
// the bootstrap file belong to this status.
// only if current query table is bootstrap table, this field is used.
private FileStatus bootStrapFileStatus;
public RealtimeFileStatus(FileStatus fileStatus) throws IOException {
super(fileStatus);
}
@Override
public Path getPath() {
Path path = super.getPath();
PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName());
pathWithLogFilePath.setBelongToIncrementalPath(belongToIncrementalFileStatus);
pathWithLogFilePath.setDeltaLogPaths(deltaLogPaths);
pathWithLogFilePath.setMaxCommitTime(maxCommitTime);
pathWithLogFilePath.setBasePath(basePath);
pathWithLogFilePath.setBaseFilePath(baseFilePath);
if (bootStrapFileStatus != null) {
pathWithLogFilePath.setPathWithBootstrapFileStatus((PathWithBootstrapFileStatus)bootStrapFileStatus.getPath());
}
return pathWithLogFilePath;
}
public void setBelongToIncrementalFileStatus(boolean belongToIncrementalFileStatus) {
this.belongToIncrementalFileStatus = belongToIncrementalFileStatus;
}
public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}
public String getBasePath() {
return basePath;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}
public void setBootStrapFileStatus(FileStatus bootStrapFileStatus) {
this.bootStrapFileStatus = bootStrapFileStatus;
}
}

View File

@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
/**
* Dummy record for log only realtime split.
*/
public class HoodieEmptyRecordReader extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> {
public HoodieEmptyRecordReader(RealtimeSplit split, JobConf job) {
super(split, job);
}
@Override
public boolean next(NullWritable nullWritable, ArrayWritable arrayWritable) throws IOException {
return false;
}
@Override
public NullWritable createKey() {
return null;
}
@Override
public ArrayWritable createValue() {
return new ArrayWritable(Writable.class, new Writable[getHiveSchema().getFields().size()]);
}
@Override
public long getPos() throws IOException {
return 0;
}
@Override
public void close() throws IOException {
}
@Override
public float getProgress() throws IOException {
return 0;
}
}

View File

@@ -18,14 +18,32 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.PathWithLogFilePath;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -41,8 +59,13 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.stream.Stream;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;
/**
* Input Format, that provides a real-time view of data in a Hoodie table.
@@ -61,9 +84,180 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
return isIncrementalSplits ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream());
}
/**
* Keep the logic of mor_incr_view as same as spark datasource.
* Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1).
* Step2: Get list of affected files status for these affected file status.
* Step3: Construct HoodieTableFileSystemView based on those affected file status.
* a. Filter affected partitions based on inputPaths.
* b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups.
* Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to
* this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step.
* Step5: Find candidate fileStatus, since when we get baseFileStatus from HoodieTableFileSystemView,
* the BaseFileStatus will missing file size information.
* We should use candidate fileStatus to update the size information for BaseFileStatus.
* Step6: For every file group from step3(b)
* Get 1st available base file from all file slices. then we use candidate file status to update the baseFileStatus,
* and construct RealTimeFileStatus and add it to result along with log files.
* If file group just has log files, construct RealTimeFileStatus and add it to result.
* TODO: unify the incremental view code between hive/spark-sql and spark datasource
*/
@Override
protected List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
List<FileStatus> result = new ArrayList<>();
String tableName = tableMetaClient.getTableConfig().getTableName();
Job jobContext = Job.getInstance(job);
// step1
Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
if (!timeline.isPresent()) {
return result;
}
HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, tableName, timeline.get());
Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
if (!commitsToCheck.isPresent()) {
return result;
}
// step2
commitsToCheck.get().sort(HoodieInstant::compareTo);
List<HoodieCommitMetadata> metadataList = commitsToCheck
.get().stream().map(instant -> {
try {
return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn);
} catch (IOException e) {
throw new HoodieException(String.format("cannot get metadata for instant: %s", instant));
}
}).collect(Collectors.toList());
// build fileGroup from fsView
List<FileStatus> affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
.listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList));
// step3
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
// build fileGroup from fsView
Path basePath = new Path(tableMetaClient.getBasePath());
// filter affectedPartition by inputPaths
List<String> affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream()
.filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
if (affectedPartition.isEmpty()) {
return result;
}
List<HoodieFileGroup> fileGroups = affectedPartition.stream()
.flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
// step4
setInputPaths(job, affectedPartition.stream()
.map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, p).toString()).collect(Collectors.joining(",")));
// step5
// find all file status in partitionPaths.
FileStatus[] fileStatuses = getStatus(job);
Map<String, FileStatus> candidateFileStatus = new HashMap<>();
for (int i = 0; i < fileStatuses.length; i++) {
String key = fileStatuses[i].getPath().toString();
candidateFileStatus.put(key, fileStatuses[i]);
}
String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
// step6
result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus));
return result;
}
private List<FileStatus> collectAllIncrementalFiles(List<HoodieFileGroup> fileGroups, String maxCommitTime, String basePath, Map<String, FileStatus> candidateFileStatus) {
List<FileStatus> result = new ArrayList<>();
fileGroups.stream().forEach(f -> {
try {
List<FileSlice> baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList());
if (!baseFiles.isEmpty()) {
FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
String baseFilePath = baseFileStatus.getPath().toUri().toString();
if (!candidateFileStatus.containsKey(baseFilePath)) {
throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath);
}
// We cannot use baseFileStatus.getPath() here, since baseFileStatus.getPath() missing file size information.
// So we use candidateFileStatus.get(baseFileStatus.getPath()) to get a correct path.
RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
fileStatus.setMaxCommitTime(maxCommitTime);
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setBasePath(basePath);
fileStatus.setBaseFilePath(baseFilePath);
fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()));
// try to set bootstrapfileStatus
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
fileStatus.setBootStrapFileStatus(baseFileStatus);
}
result.add(fileStatus);
}
// add file group which has only logs.
if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) {
List<FileStatus> logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList());
if (logFileStatus.size() > 0) {
RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0));
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setDeltaLogPaths(logFileStatus.stream().map(l -> l.getPath().toString()).collect(Collectors.toList()));
fileStatus.setMaxCommitTime(maxCommitTime);
fileStatus.setBasePath(basePath);
result.add(fileStatus);
}
}
} catch (IOException e) {
throw new HoodieException("Error obtaining data file/log file grouping ", e);
}
});
return result;
}
@Override
protected boolean includeLogFilesForSnapShotView() {
return true;
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
if (filename instanceof PathWithLogFilePath) {
return ((PathWithLogFilePath)filename).splitable();
}
return super.isSplitable(fs, filename);
}
// make split for path.
// When query the incremental view, the read files may be bootstrap files, we wrap those bootstrap files into
// PathWithLogFilePath, so those bootstrap files should be processed int this function.
@Override
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
if (file instanceof PathWithLogFilePath) {
return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null);
}
return super.makeSplit(file, start, length, hosts);
}
@Override
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
if (file instanceof PathWithLogFilePath) {
return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts);
}
return super.makeSplit(file, start, length, hosts, inMemoryHosts);
}
private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
if (!path.includeBootstrapFilePath()) {
return path.buildSplit(path, start, length, hosts);
} else {
FileSplit bf =
inMemoryHosts == null
? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts)
: super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
return HoodieRealtimeInputFormatUtils
.createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogPaths(), path.getMaxCommitTime());
}
}
@Override
@@ -119,6 +313,11 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
addProjectionToJobConf(realtimeSplit, jobConf);
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
// for log only split, set the parquet reader as empty.
if (FSUtils.isLogFile(realtimeSplit.getPath())) {
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf));
}
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
super.getRecordReader(split, jobConf, reporter));
}

View File

@@ -21,10 +21,12 @@ package org.apache.hudi.hadoop.utils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -35,10 +37,11 @@ import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
@@ -164,6 +167,10 @@ public class HoodieInputFormatUtils {
if (extension.equals(HoodieFileFormat.HFILE.getFileExtension())) {
return getInputFormat(HoodieFileFormat.HFILE, realtime, conf);
}
// now we support read log file, try to find log file
if (FSUtils.isLogFile(new Path(path)) && realtime) {
return getInputFormat(HoodieFileFormat.PARQUET, realtime, conf);
}
throw new HoodieIOException("Hoodie InputFormat not implemented for base file of type " + extension);
}
@@ -280,12 +287,24 @@ public class HoodieInputFormatUtils {
* @return
*/
public static Option<List<HoodieInstant>> getCommitsForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) {
return Option.of(getHoodieTimelineForIncrementalQuery(job, tableName, timeline)
.getInstants().collect(Collectors.toList()));
}
/**
* Get HoodieTimeline for incremental query from Hive map reduce configuration.
*
* @param job
* @param tableName
* @param timeline
* @return
*/
public static HoodieTimeline getHoodieTimelineForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) {
String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
.getInstants().collect(Collectors.toList()));
return timeline.findInstantsAfter(lastIncrementalTs, maxCommits);
}
/**
@@ -422,6 +441,11 @@ public class HoodieInputFormatUtils {
public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap,
List<Path> snapshotPaths) throws IOException {
return filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, false);
}
public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap,
List<Path> snapshotPaths, boolean includeLogFiles) throws IOException {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);
List<FileStatus> returns = new ArrayList<>();
@@ -442,10 +466,21 @@ public class HoodieInputFormatUtils {
HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient ->
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline));
List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
Map<FileStatus, List<String>> filteredLogs = new HashMap<>();
for (Path p : entry.getValue()) {
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p);
List<HoodieBaseFile> matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
filteredBaseFiles.addAll(matched);
if (includeLogFiles) {
List<FileSlice> logMatched = fsView.getLatestFileSlices(relativePartitionPath)
.filter(f -> !f.getBaseFile().isPresent() && f.getLatestLogFile().isPresent())
.collect(Collectors.toList());
logMatched.forEach(f -> {
List<String> logPaths = f.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(log -> log.getPath().toString()).collect(Collectors.toList());
filteredLogs.put(f.getLatestLogFile().get().getFileStatus(), logPaths);
});
}
}
LOG.info("Total paths to process after hoodie filter " + filteredBaseFiles.size());
@@ -456,6 +491,12 @@ public class HoodieInputFormatUtils {
filteredFile = refreshFileStatus(job, filteredFile);
returns.add(getFileStatus(filteredFile));
}
for (Map.Entry<FileStatus, List<String>> filterLogEntry : filteredLogs.entrySet()) {
RealtimeFileStatus rs = new RealtimeFileStatus(filterLogEntry.getKey());
rs.setDeltaLogPaths(filterLogEntry.getValue());
returns.add(rs);
}
}
} finally {
fsViewCache.forEach(((metaClient, fsView) -> fsView.close()));

View File

@@ -36,6 +36,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BaseFileWithLogsSplit;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
@@ -85,18 +86,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
if (partitionsToParquetSplits.size() > 0) {
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionsToParquetSplits.keySet().iterator().next());
HoodieTableConfig tableConfig = metaClient.getTableConfig();
if (!tableConfig.populateMetaFields()) {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
try {
MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema();
hoodieVirtualKeyInfo = Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp())));
} catch (Exception exception) {
throw new HoodieException("Fetching table schema failed with exception ", exception);
}
}
hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(metaClient);
}
Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
@@ -121,27 +111,24 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
// subgroup splits again by file id & match with log files.
Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
.collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
.collect(Collectors.groupingBy(split -> FSUtils.getFileIdFromFilePath(split.getPath())));
// Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table
String maxCommitTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
latestFileSlices.forEach(fileSlice -> {
List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
List<FileSplit> dataFileSplits = groupedInputSplits.getOrDefault(fileSlice.getFileId(), new ArrayList<>());
dataFileSplits.forEach(split -> {
try {
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
if (split instanceof BootstrapBaseFileSplit) {
BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
.filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0];
String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
.filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
FileSplit baseSplit = new FileSplit(eSplit.getPath(), eSplit.getStart(), eSplit.getLength(),
hosts, inMemoryHosts);
rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit, metaClient.getBasePath(),
logFilePaths, maxCommitTime, eSplit.getBootstrapFileSplit()));
rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFilePaths, maxCommitTime));
} else if (split instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)split;
HoodieRealtimeFileSplit hoodieRealtimeFileSplit = new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo);
rtSplits.add(hoodieRealtimeFileSplit);
} else {
rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime, finalHoodieVirtualKeyInfo));
}
@@ -161,6 +148,82 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
return rtSplits.toArray(new InputSplit[0]);
}
// get IncrementalRealtimeSplits
public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException {
List<InputSplit> rtSplits = new ArrayList<>();
List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList());
Set<Path> partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet());
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet);
// Pre process tableConfig from first partition to fetch virtual key info
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
if (partitionSet.size() > 0) {
hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
}
Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
fileSplitList.stream().forEach(s -> {
// deal with incremental query.
try {
if (s instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
if (bs.getBelongToIncrementalSplit()) {
rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
}
} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
rtSplits.add(s);
}
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
});
LOG.info("Returning a total splits of " + rtSplits.size());
return rtSplits.toArray(new InputSplit[0]);
}
public static Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
if (!tableConfig.populateMetaFields()) {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
try {
MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema();
return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp())));
} catch (Exception exception) {
throw new HoodieException("Fetching table schema failed with exception ", exception);
}
}
return Option.empty();
}
public static boolean isIncrementalQuerySplits(List<FileSplit> fileSplits) {
if (fileSplits == null || fileSplits.size() == 0) {
return false;
}
return fileSplits.stream().anyMatch(s -> {
if (s instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
return bs.getBelongToIncrementalSplit();
} else {
return s instanceof RealtimeBootstrapBaseFileSplit;
}
});
}
public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit(
BootstrapBaseFileSplit split, String basePath, List<String> deltaLogPaths, String maxInstantTime) {
try {
String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
.filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0];
String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
.filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(),
hosts, inMemoryHosts);
return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, deltaLogPaths, maxInstantTime, split.getBootstrapFileSplit());
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
}
// Return parquet file with a list of log files in the same file group.
public static List<Pair<Option<HoodieBaseFile>, List<String>>> groupLogsByBaseFile(Configuration conf, List<Path> partitionPaths) {
Set<Path> partitionSet = new HashSet<>(partitionPaths);

View File

@@ -18,17 +18,25 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -36,8 +44,11 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.BaseFileWithLogsSplit;
import org.apache.hudi.hadoop.PathWithLogFilePath;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
@@ -67,13 +78,17 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -554,4 +569,256 @@ public class TestHoodieRealtimeRecordReader {
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true)
);
}
@Test
public void testIncrementalWithOnlylog() throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
String instantTime = "100";
final int numRecords = 1000;
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numRecords, instantTime,
HoodieTableType.MERGE_ON_READ);
//FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
createDeltaCommitFile(basePath, instantTime,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0");
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
// insert new records to log file
try {
String newCommitTime = "102";
HoodieLogFormat.Writer writer =
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime,
numRecords, numRecords, 0);
writer.close();
createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", "2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0");
InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1);
HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
inputFormat.setConf(baseJobConf);
InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
assertTrue(splits.length == 1);
JobConf newJobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, newJobConf, false);
RecordReader<NullWritable, ArrayWritable> reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
// use reader to read log file.
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
while (reader.next(key, value)) {
Writable[] values = value.get();
// since we set incremental start commit as 101 and commit_number as 1.
// the data belong to commit 102 should be read out.
assertEquals(newCommitTime, values[0].toString());
key = reader.createKey();
value = reader.createValue();
}
reader.close();
} catch (IOException e) {
throw new HoodieException(e.getMessage(), e);
}
}
@Test
public void testIncrementalWithReplace() throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
String baseInstant = "100";
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant,
HoodieTableType.MERGE_ON_READ);
//FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0");
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
InputFormatTestUtil.simulateInserts(partitionDir, ".parquet", "fileid1", 1, "200");
Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
List<String> replacedFileId = new ArrayList<>();
replacedFileId.add("fileid0");
partitionToReplaceFileIds.put("2016/05/01", replacedFileId);
createReplaceCommitFile(basePath,
"200","2016/05/01", "2016/05/01/fileid10_1-0-1_200.parquet", "fileid10", partitionToReplaceFileIds);
InputFormatTestUtil.setupIncremental(baseJobConf, "0", 1);
HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
inputFormat.setConf(baseJobConf);
InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
assertTrue(splits.length == 1);
JobConf newJobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, newJobConf, false);
newJobConf.set("columns.types", "string,string,string,string,string,string,string,string,bigint,string,string");
RecordReader<NullWritable, ArrayWritable> reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
// use reader to read log file.
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
while (reader.next(key, value)) {
Writable[] values = value.get();
// since we set incremental start commit as 0 and commit_number as 1.
// the data belong to commit 100 should be read out.
assertEquals("100", values[0].toString());
key = reader.createKey();
value = reader.createValue();
}
reader.close();
}
private void createReplaceCommitFile(
java.nio.file.Path basePath,
String commitNumber,
String partitionPath,
String filePath,
String fileId,
Map<String, List<String>> partitionToReplaceFileIds) throws IOException {
List<HoodieWriteStat> writeStats = new ArrayList<>();
HoodieWriteStat writeStat = createHoodieWriteStat(basePath, commitNumber, partitionPath, filePath, fileId);
writeStats.add(writeStat);
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
replaceMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
writeStats.forEach(stat -> replaceMetadata.addWriteStat(partitionPath, stat));
File file = basePath.resolve(".hoodie").resolve(commitNumber + ".replacecommit").toFile();
file.createNewFile();
FileOutputStream fileOutputStream = new FileOutputStream(file);
fileOutputStream.write(replaceMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
fileOutputStream.flush();
fileOutputStream.close();
}
private HoodieWriteStat createHoodieWriteStat(java.nio.file.Path basePath, String commitNumber, String partitionPath, String filePath, String fileId) {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(fileId);
writeStat.setNumDeletes(0);
writeStat.setNumUpdateWrites(100);
writeStat.setNumWrites(100);
writeStat.setPath(filePath);
writeStat.setPartitionPath(partitionPath);
writeStat.setTotalLogFilesCompacted(100L);
HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
runtimeStats.setTotalScanTime(100);
runtimeStats.setTotalCreateTime(100);
runtimeStats.setTotalUpsertTime(100);
writeStat.setRuntimeStats(runtimeStats);
return writeStat;
}
private void createDeltaCommitFile(
java.nio.file.Path basePath,
String commitNumber,
String partitionPath,
String filePath,
String fileId) throws IOException {
List<HoodieWriteStat> writeStats = new ArrayList<>();
HoodieWriteStat writeStat = createHoodieWriteStat(basePath, commitNumber, partitionPath, filePath, fileId);
writeStats.add(writeStat);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
writeStats.forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat));
File file = basePath.resolve(".hoodie").resolve(commitNumber + ".deltacommit").toFile();
file.createNewFile();
FileOutputStream fileOutputStream = new FileOutputStream(file);
fileOutputStream.write(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
fileOutputStream.flush();
fileOutputStream.close();
}
@Test
public void testLogOnlyReader() throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
String baseInstant = "100";
File partitionDir = InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath, schema, 1, 100, baseInstant,
HoodieTableType.MERGE_ON_READ);
FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant);
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
FileSlice fileSlice = new FileSlice("default", baseInstant, "fileid1");
try {
// update files or generate new log file
int logVersion = 1;
int baseInstantTs = Integer.parseInt(baseInstant);
String instantTime = String.valueOf(baseInstantTs + logVersion);
HoodieLogFormat.Writer writer = InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid1", baseInstant,
instantTime, 100, 0, logVersion);
long size = writer.getCurrentSize();
writer.close();
assertTrue(size > 0, "block - size should be > 0");
FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
// create a split with new log file(s)
fileSlice.addLogFile(writer.getLogFile());
RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus(new FileStatus(0, false, 1, 1, 0, writer.getLogFile().getPath()));
realtimeFileStatus.setMaxCommitTime(instantTime);
realtimeFileStatus.setBasePath(basePath.toString());
realtimeFileStatus.setDeltaLogPaths(fileSlice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()));
PathWithLogFilePath pathWithLogFileStatus = (PathWithLogFilePath) realtimeFileStatus.getPath();
BaseFileWithLogsSplit bs = pathWithLogFileStatus.buildSplit(pathWithLogFileStatus, 0, 0, new String[] {""});
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), Option.empty());
JobConf newJobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, newJobConf, false);
// create a dummy RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new HoodieRealtimeRecordReader(split, newJobConf, new HoodieEmptyRecordReader(split, newJobConf));
// use reader to read log file.
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
while (reader.next(key, value)) {
Writable[] values = value.get();
assertEquals(instantTime, values[0].toString());
key = reader.createKey();
value = reader.createValue();
}
reader.close();
} catch (Exception e) {
throw new HoodieException(e.getMessage(), e);
}
}
@Test
public void testIncrementalWithCompaction() throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
String baseInstant = "100";
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant,
HoodieTableType.MERGE_ON_READ);
createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0");
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
createCompactionFile(basePath, "125");
// add inserts after compaction timestamp
InputFormatTestUtil.simulateInserts(partitionDir, ".parquet", "fileId2", 5, "200");
InputFormatTestUtil.commit(basePath, "200");
InputFormatTestUtil.setupIncremental(baseJobConf, "100", 10);
// verify that incremental reads do NOT show inserts after compaction timestamp
HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
inputFormat.setConf(baseJobConf);
InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
assertTrue(splits.length == 0);
}
private File createCompactionFile(java.nio.file.Path basePath, String commitTime)
throws IOException {
File file = basePath.resolve(".hoodie")
.resolve(HoodieTimeline.makeRequestedCompactionFileName(commitTime)).toFile();
assertTrue(file.createNewFile());
FileOutputStream os = new FileOutputStream(file);
try {
HoodieCompactionPlan compactionPlan = HoodieCompactionPlan.newBuilder().setVersion(2).build();
// Write empty commit metadata
os.write(TimelineMetadataUtils.serializeCompactionPlan(compactionPlan).get());
return file;
} finally {
os.close();
}
}
}