1
0

[HUDI-2005] Removing direct fs call in HoodieLogFileReader (#3865)

This commit is contained in:
Sivabalan Narayanan
2021-11-25 18:51:38 -05:00
committed by GitHub
parent 6f5d8d04cd
commit 8340ccb503
17 changed files with 116 additions and 80 deletions

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
@@ -36,7 +38,7 @@ public class BaseFileWithLogsSplit extends FileSplit {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalSplit = false;
// the log file paths of this split.
private List<String> deltaLogPaths = new ArrayList<>();
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
// max commit time of current split.
private String maxCommitTime = "";
// the basePath of current hoodie table.
@@ -55,9 +57,10 @@ public class BaseFileWithLogsSplit extends FileSplit {
Text.writeString(out, maxCommitTime);
Text.writeString(out, basePath);
Text.writeString(out, baseFilePath);
out.writeInt(deltaLogPaths.size());
for (String logPath : deltaLogPaths) {
Text.writeString(out, logPath);
out.writeInt(deltaLogFiles.size());
for (HoodieLogFile logFile : deltaLogFiles) {
Text.writeString(out, logFile.getPath().toString());
out.writeLong(logFile.getFileSize());
}
}
@@ -69,11 +72,13 @@ public class BaseFileWithLogsSplit extends FileSplit {
basePath = Text.readString(in);
baseFilePath = Text.readString(in);
int deltaLogSize = in.readInt();
List<String> tempDeltaLogs = new ArrayList<>();
List<HoodieLogFile> tempDeltaLogs = new ArrayList<>();
for (int i = 0; i < deltaLogSize; i++) {
tempDeltaLogs.add(Text.readString(in));
String logPath = Text.readString(in);
long logFileSize = in.readLong();
tempDeltaLogs.add(new HoodieLogFile(new Path(logPath), logFileSize));
}
deltaLogPaths = tempDeltaLogs;
deltaLogFiles = tempDeltaLogs;
}
public boolean getBelongToIncrementalSplit() {
@@ -84,12 +89,12 @@ public class BaseFileWithLogsSplit extends FileSplit {
this.belongToIncrementalSplit = belongToIncrementalSplit;
}
public List<String> getDeltaLogPaths() {
return deltaLogPaths;
public List<HoodieLogFile> getDeltaLogFiles() {
return deltaLogFiles;
}
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
public void setDeltaLogFiles(List<HoodieLogFile> deltaLogFiles) {
this.deltaLogFiles = deltaLogFiles;
}
public String getMaxCommitTime() {

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
@@ -31,7 +33,7 @@ public class PathWithLogFilePath extends Path {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalPath = false;
// the log files belong this path.
private List<String> deltaLogPaths = new ArrayList<>();
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
// max commit time of current path.
private String maxCommitTime = "";
// the basePath of current hoodie table.
@@ -50,12 +52,12 @@ public class PathWithLogFilePath extends Path {
this.belongToIncrementalPath = belongToIncrementalPath;
}
public List<String> getDeltaLogPaths() {
return deltaLogPaths;
public List<HoodieLogFile> getDeltaLogFiles() {
return deltaLogFiles;
}
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
public void setDeltaLogFiles(List<HoodieLogFile> deltaLogFiles) {
this.deltaLogFiles = deltaLogFiles;
}
public String getMaxCommitTime() {
@@ -97,7 +99,7 @@ public class PathWithLogFilePath extends Path {
public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) {
BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts);
bs.setBelongToIncrementalSplit(belongToIncrementalPath);
bs.setDeltaLogPaths(deltaLogPaths);
bs.setDeltaLogFiles(deltaLogFiles);
bs.setMaxCommitTime(maxCommitTime);
bs.setBasePath(basePath);
bs.setBaseFilePath(baseFilePath);

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -35,7 +37,7 @@ public class RealtimeFileStatus extends FileStatus {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalFileStatus = false;
// the log files belong this fileStatus.
private List<String> deltaLogPaths = new ArrayList<>();
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
// max commit time of current fileStatus.
private String maxCommitTime = "";
// the basePath of current hoodie table.
@@ -55,7 +57,7 @@ public class RealtimeFileStatus extends FileStatus {
Path path = super.getPath();
PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName());
pathWithLogFilePath.setBelongToIncrementalPath(belongToIncrementalFileStatus);
pathWithLogFilePath.setDeltaLogPaths(deltaLogPaths);
pathWithLogFilePath.setDeltaLogFiles(deltaLogFiles);
pathWithLogFilePath.setMaxCommitTime(maxCommitTime);
pathWithLogFilePath.setBasePath(basePath);
pathWithLogFilePath.setBaseFilePath(baseFilePath);
@@ -69,12 +71,12 @@ public class RealtimeFileStatus extends FileStatus {
this.belongToIncrementalFileStatus = belongToIncrementalFileStatus;
}
public List<String> getDeltaLogPaths() {
return deltaLogPaths;
public List<HoodieLogFile> getDeltaLogFiles() {
return deltaLogFiles;
}
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
public void setDeltaLogFiles(List<HoodieLogFile> deltaLogFiles) {
this.deltaLogFiles = deltaLogFiles;
}
public String getMaxCommitTime() {

View File

@@ -82,7 +82,7 @@ public abstract class AbstractRealtimeRecordReader {
* job conf.
*/
private void init() throws IOException {
Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFiles(), jobConf);
if (schemaFromLogFile == null) {
writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf);
LOG.info("Writer Schema From Parquet => " + writerSchema.getFields());

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -189,7 +190,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setBasePath(basePath);
fileStatus.setBaseFilePath(baseFilePath);
fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()));
fileStatus.setDeltaLogFiles(f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList()));
// try to set bootstrapfileStatus
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
fileStatus.setBootStrapFileStatus(baseFileStatus);
@@ -202,7 +203,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
if (logFileStatus.size() > 0) {
RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0));
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setDeltaLogPaths(logFileStatus.stream().map(l -> l.getPath().toString()).collect(Collectors.toList()));
fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList()));
fileStatus.setMaxCommitTime(maxCommitTime);
fileStatus.setBasePath(basePath);
result.add(fileStatus);
@@ -256,7 +257,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts)
: super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
return HoodieRealtimeInputFormatUtils
.createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogPaths(), path.getMaxCommitTime());
.createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogFiles(), path.getMaxCommitTime());
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.mapred.FileSplit;
@@ -25,7 +26,9 @@ import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Filesplit that wraps the base split and a list of log files to merge deltas from.
@@ -33,6 +36,7 @@ import java.util.List;
public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit {
private List<String> deltaLogPaths;
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
private String maxCommitTime;
@@ -44,11 +48,12 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
super();
}
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths, String maxCommitTime,
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<HoodieLogFile> deltaLogFiles, String maxCommitTime,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo)
throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
this.deltaLogPaths = deltaLogPaths;
this.deltaLogFiles = deltaLogFiles;
this.deltaLogPaths = deltaLogFiles.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
this.maxCommitTime = maxCommitTime;
this.basePath = basePath;
this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
@@ -58,6 +63,10 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
return deltaLogPaths;
}
public List<HoodieLogFile> getDeltaLogFiles() {
return deltaLogFiles;
}
public String getMaxCommitTime() {
return maxCommitTime;
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
@@ -26,7 +27,9 @@ import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Realtime File Split with external base file.
@@ -34,6 +37,7 @@ import java.util.List;
public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit {
private List<String> deltaLogPaths;
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
private String maxInstantTime;
@@ -43,11 +47,12 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
super();
}
public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths,
public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List<HoodieLogFile> deltaLogFiles,
String maxInstantTime, FileSplit externalFileSplit) throws IOException {
super(baseSplit, externalFileSplit);
this.maxInstantTime = maxInstantTime;
this.deltaLogPaths = deltaLogPaths;
this.deltaLogFiles = deltaLogFiles;
this.deltaLogPaths = deltaLogFiles.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
this.basePath = basePath;
}
@@ -68,6 +73,11 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
return deltaLogPaths;
}
@Override
public List<HoodieLogFile> getDeltaLogFiles() {
return deltaLogFiles;
}
@Override
public String getMaxCommitTime() {
return maxInstantTime;

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.InputSplitUtils;
@@ -41,6 +42,8 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
*/
List<String> getDeltaLogPaths();
List<HoodieLogFile> getDeltaLogFiles();
/**
* Return Max Instant Time.
* @return

View File

@@ -466,7 +466,7 @@ public class HoodieInputFormatUtils {
HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient ->
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline));
List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
Map<FileStatus, List<String>> filteredLogs = new HashMap<>();
Map<FileStatus, List<HoodieLogFile>> filteredLogs = new HashMap<>();
for (Path p : entry.getValue()) {
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p);
List<HoodieBaseFile> matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
@@ -476,9 +476,8 @@ public class HoodieInputFormatUtils {
.filter(f -> !f.getBaseFile().isPresent() && f.getLatestLogFile().isPresent())
.collect(Collectors.toList());
logMatched.forEach(f -> {
List<String> logPaths = f.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(log -> log.getPath().toString()).collect(Collectors.toList());
filteredLogs.put(f.getLatestLogFile().get().getFileStatus(), logPaths);
List<HoodieLogFile> logPathSizePairs = f.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
filteredLogs.put(f.getLatestLogFile().get().getFileStatus(), logPathSizePairs);
});
}
}
@@ -492,9 +491,9 @@ public class HoodieInputFormatUtils {
returns.add(getFileStatus(filteredFile));
}
for (Map.Entry<FileStatus, List<String>> filterLogEntry : filteredLogs.entrySet()) {
for (Map.Entry<FileStatus, List<HoodieLogFile>> filterLogEntry : filteredLogs.entrySet()) {
RealtimeFileStatus rs = new RealtimeFileStatus(filterLogEntry.getKey());
rs.setDeltaLogPaths(filterLogEntry.getValue());
rs.setDeltaLogFiles(filterLogEntry.getValue());
returns.add(rs);
}
}

View File

@@ -120,13 +120,13 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
List<FileSplit> dataFileSplits = groupedInputSplits.getOrDefault(fileSlice.getFileId(), new ArrayList<>());
dataFileSplits.forEach(split -> {
try {
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
List<HoodieLogFile> logFiles = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.collect(Collectors.toList());
if (split instanceof BootstrapBaseFileSplit) {
BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFilePaths, maxCommitTime));
rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFiles, maxCommitTime));
} else {
rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime, finalHoodieVirtualKeyInfo));
rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFiles, maxCommitTime, finalHoodieVirtualKeyInfo));
}
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
@@ -162,7 +162,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
if (s instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
if (bs.getBelongToIncrementalSplit()) {
rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
}
} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
rtSplits.add(s);
@@ -206,7 +206,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
}
public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit(
BootstrapBaseFileSplit split, String basePath, List<String> deltaLogPaths, String maxInstantTime) {
BootstrapBaseFileSplit split, String basePath, List<HoodieLogFile> logFiles, String maxInstantTime) {
try {
String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
.filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0];
@@ -214,7 +214,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
.filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(),
hosts, inMemoryHosts);
return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, deltaLogPaths, maxInstantTime, split.getBootstrapFileSplit());
return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit());
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}