From e3fc74668fc43fefd73087ff725245b8ed85b4a1 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 25 Oct 2021 21:43:15 -0400 Subject: [PATCH] [HUDI-2625] Revert "[HUDI-2005] Avoiding direct fs calls in HoodieLogFileReader (#3757)" (#3863) This reverts commit 1bb05325637740498cac548872cf7223e34950d0. --- .../common/table/log/HoodieLogFileReader.java | 12 ++++++------ .../hudi/common/table/log/HoodieLogFormat.java | 2 +- .../table/log/HoodieLogFormatReader.java | 4 ++-- .../hudi/common/table/log/LogReaderUtils.java | 18 +++++++----------- .../metadata/HoodieMetadataFileSystemView.java | 2 +- .../realtime/AbstractRealtimeRecordReader.java | 2 +- .../realtime/HoodieRealtimeFileSplit.java | 12 ++---------- .../RealtimeBootstrapBaseFileSplit.java | 13 ++----------- .../hudi/hadoop/realtime/RealtimeSplit.java | 3 --- .../utils/HoodieRealtimeInputFormatUtils.java | 5 ++--- .../realtime/TestHoodieRealtimeFileSplit.java | 5 +---- .../TestHoodieRealtimeRecordReader.java | 17 ++++++++--------- 12 files changed, 33 insertions(+), 62 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 88b7e328a..f0f3842e9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -73,11 +73,6 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private boolean closed = false; private transient Thread shutdownThread = null; - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily) throws IOException { - this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false); - } - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); @@ -87,11 +82,16 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { this.readBlockLazily = readBlockLazily; this.reverseReader = reverseReader; if (this.reverseReader) { - this.reverseLogFilePosition = this.lastReverseLogFilePosition = logFile.getFileSize(); + this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen(); } addShutDownHook(); } + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, boolean readBlockLazily, + boolean reverseReader) throws IOException { + this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, readBlockLazily, reverseReader); + } + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index 569b4a23b..c566788fd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -274,7 +274,7 @@ public interface HoodieLogFormat { static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { - return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false); + return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false); } static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index e64e1a1d8..72672278b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -59,7 +59,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.prevReadersInOpenState = new ArrayList<>(); if (logFiles.size() > 0) { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily); + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); } } @@ -99,7 +99,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.prevReadersInOpenState.add(currentReader); } this.currentReader = - new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily); + new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java index c2a03965f..fe159df00 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java @@ -27,16 +27,14 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.collection.Pair; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; /** @@ -44,10 +42,9 @@ import java.util.stream.Collectors; */ public class LogReaderUtils { - private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, FileStatus logPathFileStatus) + private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, Path path) throws IOException { - // set length for the HoodieLogFile as it will be leveraged by HoodieLogFormat.Reader with reverseReading enabled - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(logPathFileStatus.getPath(), logPathFileStatus.getLen()), null, true, true); + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true, true); Schema writerSchema = null; HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); while (reader.hasPrev()) { @@ -65,19 +62,17 @@ public class LogReaderUtils { return writerSchema; } - public static Schema readLatestSchemaFromLogFiles(String basePath, List deltaFileStatus, Configuration config) + public static Schema readLatestSchemaFromLogFiles(String basePath, List deltaFilePaths, Configuration config) throws IOException { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(config).setBasePath(basePath).build(); - List deltaPaths = deltaFileStatus.stream().map(s -> new HoodieLogFile(s.getPath())) + List deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s))) .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString()) .collect(Collectors.toList()); - Map deltaFilePathToFileStatus = deltaFileStatus.stream().map(entry -> Pair.of(entry.getPath().toString(), entry)) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); if (deltaPaths.size() > 0) { for (String logPath : deltaPaths) { FileSystem fs = FSUtils.getFs(logPath, config); Schema schemaFromLogFile = - readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), deltaFilePathToFileStatus.get(logPath)); + readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), new Path(logPath)); if (schemaFromLogFile != null) { return schemaFromLogFile; } @@ -85,4 +80,5 @@ public class LogReaderUtils { } return null; } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java index a9180552a..453ec8f15 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java @@ -61,7 +61,7 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView { * @throws IOException */ @Override - public FileStatus[] listPartition(Path partitionPath) throws IOException { + protected FileStatus[] listPartition(Path partitionPath) throws IOException { return tableMetadata.getAllFilesInPartition(partitionPath); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 45c01eac8..ef3d4f1c8 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -82,7 +82,7 @@ public abstract class AbstractRealtimeRecordReader { * job conf. */ private void init() throws IOException { - Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFileStatus(), jobConf); + Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf); if (schemaFromLogFile == null) { writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf); LOG.info("Writer Schema From Parquet => " + writerSchema.getFields()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java index 3d9b62fed..6423f2cfd 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java @@ -20,14 +20,12 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.Option; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.mapred.FileSplit; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; -import java.util.stream.Collectors; /** * Filesplit that wraps the base split and a list of log files to merge deltas from. @@ -35,7 +33,6 @@ import java.util.stream.Collectors; public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit { private List deltaLogPaths; - private List deltaLogFileStatus; private String maxCommitTime; @@ -47,12 +44,11 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit super(); } - public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogFileStatus, String maxCommitTime, + public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogPaths, String maxCommitTime, Option hoodieVirtualKeyInfo) throws IOException { super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations()); - this.deltaLogFileStatus = deltaLogFileStatus; - this.deltaLogPaths = deltaLogFileStatus.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList()); + this.deltaLogPaths = deltaLogPaths; this.maxCommitTime = maxCommitTime; this.basePath = basePath; this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo; @@ -62,10 +58,6 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit return deltaLogPaths; } - public List getDeltaLogFileStatus() { - return deltaLogFileStatus; - } - public String getMaxCommitTime() { return maxCommitTime; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java index f9b0bd0e6..4da310da4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java @@ -21,14 +21,12 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.mapred.FileSplit; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; -import java.util.stream.Collectors; /** * Realtime File Split with external base file. @@ -36,7 +34,6 @@ import java.util.stream.Collectors; public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit { private List deltaLogPaths; - private List deltaLogFileStatus; private String maxInstantTime; @@ -46,12 +43,11 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple super(); } - public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List deltaLogFileStatus, + public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List deltaLogPaths, String maxInstantTime, FileSplit externalFileSplit) throws IOException { super(baseSplit, externalFileSplit); this.maxInstantTime = maxInstantTime; - this.deltaLogFileStatus = deltaLogFileStatus; - this.deltaLogPaths = deltaLogFileStatus.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList()); + this.deltaLogPaths = deltaLogPaths; this.basePath = basePath; } @@ -72,11 +68,6 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple return deltaLogPaths; } - @Override - public List getDeltaLogFileStatus() { - return deltaLogFileStatus; - } - @Override public String getMaxCommitTime() { return maxInstantTime; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java index 6dfaf165b..108613c18 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java @@ -21,7 +21,6 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.InputSplitUtils; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.InputSplitWithLocationInfo; @@ -42,8 +41,6 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { */ List getDeltaLogPaths(); - List getDeltaLogFileStatus(); - /** * Return Max Instant Time. * @return diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index 9cf61b238..f84e34405 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -43,7 +43,6 @@ import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit; import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapred.FileSplit; @@ -131,8 +130,8 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); dataFileSplits.forEach(split -> { try { - List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getFileStatus()).collect(Collectors.toList()); + List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); if (split instanceof BootstrapBaseFileSplit) { BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split; String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java index 06f7b721b..ac857868c 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java @@ -20,7 +20,6 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.Option; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; @@ -57,7 +56,6 @@ public class TestHoodieRealtimeFileSplit { private HoodieRealtimeFileSplit split; private String basePath; - private List deltaLogFileStatus; private List deltaLogPaths; private String fileSplitName; private FileSplit baseFileSplit; @@ -66,13 +64,12 @@ public class TestHoodieRealtimeFileSplit { @BeforeEach public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception { basePath = tempDir.toAbsolutePath().toString(); - deltaLogFileStatus = Collections.singletonList(new FileStatus(0L, false, 0, 0L, 0, new Path(basePath + "/1.log"))); deltaLogPaths = Collections.singletonList(basePath + "/1.log"); fileSplitName = basePath + "/test.file"; baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {}); maxCommitTime = "10001"; - split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogFileStatus, maxCommitTime, Option.empty()); + split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime, Option.empty()); } @Test diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index f0c1ab1b6..a647da9b9 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -41,7 +41,6 @@ import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -188,7 +187,7 @@ public class TestHoodieRealtimeRecordReader { HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf), basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(h -> new FileStatus(0L, false, 0, 0L, 0, h.getPath())).collect(Collectors.toList()), + .map(h -> h.getPath().toString()).collect(Collectors.toList()), instantTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader @@ -257,10 +256,10 @@ public class TestHoodieRealtimeRecordReader { FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime); // create a split with baseFile (parquet file written earlier) and new log file(s) - FileStatus logFileFileStatus = new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath()); + String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), Collections.singletonList(logFileFileStatus), newCommitTime, Option.empty()); + basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -337,10 +336,10 @@ public class TestHoodieRealtimeRecordReader { InputFormatTestUtil.deltaCommit(basePath, newCommitTime); // create a split with baseFile (parquet file written earlier) and new log file(s) - FileStatus logFileStatus = new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath()); + String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), Collections.singletonList(logFileStatus), newCommitTime, Option.empty()); + basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -450,7 +449,7 @@ public class TestHoodieRealtimeRecordReader { public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws Exception { // initial commit - List logFilePaths = new ArrayList<>(); + List logFilePaths = new ArrayList<>(); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); String instantTime = "100"; @@ -471,7 +470,7 @@ public class TestHoodieRealtimeRecordReader { InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, numberOfLogRecords, 0, 1); long size = writer.getCurrentSize(); - logFilePaths.add(new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath())); + logFilePaths.add(writer.getLogFile().getPath().toString()); writer.close(); assertTrue(size > 0, "block - size should be > 0"); @@ -479,7 +478,7 @@ public class TestHoodieRealtimeRecordReader { newCommitTime = "102"; writer = InputFormatTestUtil.writeRollbackBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, "101", 1); - logFilePaths.add(new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath())); + logFilePaths.add(writer.getLogFile().getPath().toString()); writer.close(); InputFormatTestUtil.deltaCommit(basePath, newCommitTime);