1
0

[HUDI-2005] Avoiding direct fs calls in HoodieLogFileReader (#3757)

This commit is contained in:
Sivabalan Narayanan
2021-10-25 01:21:08 -04:00
committed by GitHub
parent d8560377c3
commit 1bb0532563
12 changed files with 62 additions and 33 deletions

View File

@@ -82,7 +82,7 @@ public abstract class AbstractRealtimeRecordReader {
* job conf.
*/
private void init() throws IOException {
Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFileStatus(), jobConf);
if (schemaFromLogFile == null) {
writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf);
LOG.info("Writer Schema From Parquet => " + writerSchema.getFields());

View File

@@ -20,12 +20,14 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* Filesplit that wraps the base split and a list of log files to merge deltas from.
@@ -33,6 +35,7 @@ import java.util.List;
public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit {
private List<String> deltaLogPaths;
private List<FileStatus> deltaLogFileStatus;
private String maxCommitTime;
@@ -44,11 +47,12 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
super();
}
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths, String maxCommitTime,
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<FileStatus> deltaLogFileStatus, String maxCommitTime,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo)
throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
this.deltaLogPaths = deltaLogPaths;
this.deltaLogFileStatus = deltaLogFileStatus;
this.deltaLogPaths = deltaLogFileStatus.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
this.maxCommitTime = maxCommitTime;
this.basePath = basePath;
this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
@@ -58,6 +62,10 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
return deltaLogPaths;
}
public List<FileStatus> getDeltaLogFileStatus() {
return deltaLogFileStatus;
}
public String getMaxCommitTime() {
return maxCommitTime;
}

View File

@@ -21,12 +21,14 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* Realtime File Split with external base file.
@@ -34,6 +36,7 @@ import java.util.List;
public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit {
private List<String> deltaLogPaths;
private List<FileStatus> deltaLogFileStatus;
private String maxInstantTime;
@@ -43,11 +46,12 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
super();
}
public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths,
public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List<FileStatus> deltaLogFileStatus,
String maxInstantTime, FileSplit externalFileSplit) throws IOException {
super(baseSplit, externalFileSplit);
this.maxInstantTime = maxInstantTime;
this.deltaLogPaths = deltaLogPaths;
this.deltaLogFileStatus = deltaLogFileStatus;
this.deltaLogPaths = deltaLogFileStatus.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
this.basePath = basePath;
}
@@ -68,6 +72,11 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
return deltaLogPaths;
}
@Override
public List<FileStatus> getDeltaLogFileStatus() {
return deltaLogFileStatus;
}
@Override
public String getMaxCommitTime() {
return maxInstantTime;

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.InputSplitUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
@@ -41,6 +42,8 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
*/
List<String> getDeltaLogPaths();
List<FileStatus> getDeltaLogFileStatus();
/**
* Return Max Instant Time.
* @return

View File

@@ -43,6 +43,7 @@ import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
@@ -130,8 +131,8 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
dataFileSplits.forEach(split -> {
try {
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
List<FileStatus> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getFileStatus()).collect(Collectors.toList());
if (split instanceof BootstrapBaseFileSplit) {
BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())