diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index f0f3842e9..88b7e328a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -73,6 +73,11 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private boolean closed = false; private transient Thread shutdownThread = null; + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, + boolean readBlockLazily) throws IOException { + this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false); + } + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); @@ -82,16 +87,11 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { this.readBlockLazily = readBlockLazily; this.reverseReader = reverseReader; if (this.reverseReader) { - this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen(); + this.reverseLogFilePosition = this.lastReverseLogFilePosition = logFile.getFileSize(); } addShutDownHook(); } - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, boolean readBlockLazily, - boolean reverseReader) throws IOException { - this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, readBlockLazily, reverseReader); - } - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index c566788fd..569b4a23b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -274,7 +274,7 @@ public interface HoodieLogFormat { static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { - return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false); + return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false); } static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index 72672278b..e64e1a1d8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -59,7 +59,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.prevReadersInOpenState = new ArrayList<>(); if (logFiles.size() > 0) { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily); } } @@ -99,7 +99,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.prevReadersInOpenState.add(currentReader); } this.currentReader = - new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); + new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java index fe159df00..c2a03965f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java @@ -27,14 +27,16 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.collection.Pair; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -42,9 +44,10 @@ import java.util.stream.Collectors; */ public class LogReaderUtils { - private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, Path path) + private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, FileStatus logPathFileStatus) throws IOException { - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true, true); + // set length for the HoodieLogFile as it will be leveraged by HoodieLogFormat.Reader with reverseReading enabled + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(logPathFileStatus.getPath(), logPathFileStatus.getLen()), null, true, true); Schema writerSchema = null; HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); while (reader.hasPrev()) { @@ -62,17 +65,19 @@ public class LogReaderUtils { return writerSchema; } - public static Schema readLatestSchemaFromLogFiles(String basePath, List deltaFilePaths, Configuration config) + public static Schema readLatestSchemaFromLogFiles(String basePath, List deltaFileStatus, Configuration config) throws IOException { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(config).setBasePath(basePath).build(); - List deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s))) + List deltaPaths = deltaFileStatus.stream().map(s -> new HoodieLogFile(s.getPath())) .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString()) .collect(Collectors.toList()); + Map deltaFilePathToFileStatus = deltaFileStatus.stream().map(entry -> Pair.of(entry.getPath().toString(), entry)) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); if (deltaPaths.size() > 0) { for (String logPath : deltaPaths) { FileSystem fs = FSUtils.getFs(logPath, config); Schema schemaFromLogFile = - readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), new Path(logPath)); + readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), deltaFilePathToFileStatus.get(logPath)); if (schemaFromLogFile != null) { return schemaFromLogFile; } @@ -80,5 +85,4 @@ public class LogReaderUtils { } return null; } - } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java index 453ec8f15..a9180552a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java @@ -61,7 +61,7 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView { * @throws IOException */ @Override - protected FileStatus[] listPartition(Path partitionPath) throws IOException { + public FileStatus[] listPartition(Path partitionPath) throws IOException { return tableMetadata.getAllFilesInPartition(partitionPath); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index ef3d4f1c8..45c01eac8 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -82,7 +82,7 @@ public abstract class AbstractRealtimeRecordReader { * job conf. */ private void init() throws IOException { - Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf); + Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFileStatus(), jobConf); if (schemaFromLogFile == null) { writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf); LOG.info("Writer Schema From Parquet => " + writerSchema.getFields()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java index 6423f2cfd..3d9b62fed 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java @@ -20,12 +20,14 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.Option; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.mapred.FileSplit; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; /** * Filesplit that wraps the base split and a list of log files to merge deltas from. @@ -33,6 +35,7 @@ import java.util.List; public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit { private List deltaLogPaths; + private List deltaLogFileStatus; private String maxCommitTime; @@ -44,11 +47,12 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit super(); } - public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogPaths, String maxCommitTime, + public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogFileStatus, String maxCommitTime, Option hoodieVirtualKeyInfo) throws IOException { super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations()); - this.deltaLogPaths = deltaLogPaths; + this.deltaLogFileStatus = deltaLogFileStatus; + this.deltaLogPaths = deltaLogFileStatus.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList()); this.maxCommitTime = maxCommitTime; this.basePath = basePath; this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo; @@ -58,6 +62,10 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit return deltaLogPaths; } + public List getDeltaLogFileStatus() { + return deltaLogFileStatus; + } + public String getMaxCommitTime() { return maxCommitTime; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java index 4da310da4..f9b0bd0e6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java @@ -21,12 +21,14 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.mapred.FileSplit; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; /** * Realtime File Split with external base file. @@ -34,6 +36,7 @@ import java.util.List; public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit { private List deltaLogPaths; + private List deltaLogFileStatus; private String maxInstantTime; @@ -43,11 +46,12 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple super(); } - public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List deltaLogPaths, + public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List deltaLogFileStatus, String maxInstantTime, FileSplit externalFileSplit) throws IOException { super(baseSplit, externalFileSplit); this.maxInstantTime = maxInstantTime; - this.deltaLogPaths = deltaLogPaths; + this.deltaLogFileStatus = deltaLogFileStatus; + this.deltaLogPaths = deltaLogFileStatus.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList()); this.basePath = basePath; } @@ -68,6 +72,11 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple return deltaLogPaths; } + @Override + public List getDeltaLogFileStatus() { + return deltaLogFileStatus; + } + @Override public String getMaxCommitTime() { return maxInstantTime; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java index 108613c18..6dfaf165b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java @@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.InputSplitUtils; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.InputSplitWithLocationInfo; @@ -41,6 +42,8 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { */ List getDeltaLogPaths(); + List getDeltaLogFileStatus(); + /** * Return Max Instant Time. * @return diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index f84e34405..9cf61b238 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -43,6 +43,7 @@ import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit; import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapred.FileSplit; @@ -130,8 +131,8 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); dataFileSplits.forEach(split -> { try { - List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); + List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getFileStatus()).collect(Collectors.toList()); if (split instanceof BootstrapBaseFileSplit) { BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split; String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java index ac857868c..06f7b721b 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java @@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.Option; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; @@ -56,6 +57,7 @@ public class TestHoodieRealtimeFileSplit { private HoodieRealtimeFileSplit split; private String basePath; + private List deltaLogFileStatus; private List deltaLogPaths; private String fileSplitName; private FileSplit baseFileSplit; @@ -64,12 +66,13 @@ public class TestHoodieRealtimeFileSplit { @BeforeEach public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception { basePath = tempDir.toAbsolutePath().toString(); + deltaLogFileStatus = Collections.singletonList(new FileStatus(0L, false, 0, 0L, 0, new Path(basePath + "/1.log"))); deltaLogPaths = Collections.singletonList(basePath + "/1.log"); fileSplitName = basePath + "/test.file"; baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {}); maxCommitTime = "10001"; - split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime, Option.empty()); + split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogFileStatus, maxCommitTime, Option.empty()); } @Test diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index a647da9b9..f0c1ab1b6 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -41,6 +41,7 @@ import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -187,7 +188,7 @@ public class TestHoodieRealtimeRecordReader { HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf), basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(h -> h.getPath().toString()).collect(Collectors.toList()), + .map(h -> new FileStatus(0L, false, 0, 0L, 0, h.getPath())).collect(Collectors.toList()), instantTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader @@ -256,10 +257,10 @@ public class TestHoodieRealtimeRecordReader { FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime); // create a split with baseFile (parquet file written earlier) and new log file(s) - String logFilePath = writer.getLogFile().getPath().toString(); + FileStatus logFileFileStatus = new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath()); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty()); + basePath.toUri().toString(), Collections.singletonList(logFileFileStatus), newCommitTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -336,10 +337,10 @@ public class TestHoodieRealtimeRecordReader { InputFormatTestUtil.deltaCommit(basePath, newCommitTime); // create a split with baseFile (parquet file written earlier) and new log file(s) - String logFilePath = writer.getLogFile().getPath().toString(); + FileStatus logFileStatus = new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath()); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty()); + basePath.toUri().toString(), Collections.singletonList(logFileStatus), newCommitTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -449,7 +450,7 @@ public class TestHoodieRealtimeRecordReader { public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws Exception { // initial commit - List logFilePaths = new ArrayList<>(); + List logFilePaths = new ArrayList<>(); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); String instantTime = "100"; @@ -470,7 +471,7 @@ public class TestHoodieRealtimeRecordReader { InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, numberOfLogRecords, 0, 1); long size = writer.getCurrentSize(); - logFilePaths.add(writer.getLogFile().getPath().toString()); + logFilePaths.add(new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath())); writer.close(); assertTrue(size > 0, "block - size should be > 0"); @@ -478,7 +479,7 @@ public class TestHoodieRealtimeRecordReader { newCommitTime = "102"; writer = InputFormatTestUtil.writeRollbackBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, "101", 1); - logFilePaths.add(writer.getLogFile().getPath().toString()); + logFilePaths.add(new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath())); writer.close(); InputFormatTestUtil.deltaCommit(basePath, newCommitTime);