1
0

[HUDI-2625] Revert "[HUDI-2005] Avoiding direct fs calls in HoodieLogFileReader (#3757)" (#3863)

This reverts commit 1bb0532563.
This commit is contained in:
Sivabalan Narayanan
2021-10-25 21:43:15 -04:00
committed by GitHub
parent 4b5512e685
commit e3fc74668f
12 changed files with 33 additions and 62 deletions

View File

@@ -73,11 +73,6 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private boolean closed = false; private boolean closed = false;
private transient Thread shutdownThread = null; private transient Thread shutdownThread = null;
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false);
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException { boolean readBlockLazily, boolean reverseReader) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
@@ -87,11 +82,16 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
this.readBlockLazily = readBlockLazily; this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader; this.reverseReader = reverseReader;
if (this.reverseReader) { if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = logFile.getFileSize(); this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen();
} }
addShutDownHook(); addShutDownHook();
} }
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, boolean readBlockLazily,
boolean reverseReader) throws IOException {
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, readBlockLazily, reverseReader);
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException {
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false); this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false);
} }

View File

@@ -274,7 +274,7 @@ public interface HoodieLogFormat {
static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema)
throws IOException { throws IOException {
return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false); return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false);
} }
static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema,

View File

@@ -59,7 +59,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.prevReadersInOpenState = new ArrayList<>(); this.prevReadersInOpenState = new ArrayList<>();
if (logFiles.size() > 0) { if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0); HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily); this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
} }
} }
@@ -99,7 +99,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.prevReadersInOpenState.add(currentReader); this.prevReadersInOpenState.add(currentReader);
} }
this.currentReader = this.currentReader =
new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily); new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io); throw new HoodieIOException("unable to initialize read with log file ", io);
} }

View File

@@ -27,16 +27,14 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@@ -44,10 +42,9 @@ import java.util.stream.Collectors;
*/ */
public class LogReaderUtils { public class LogReaderUtils {
private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, FileStatus logPathFileStatus) private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, Path path)
throws IOException { throws IOException {
// set length for the HoodieLogFile as it will be leveraged by HoodieLogFormat.Reader with reverseReading enabled Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true, true);
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(logPathFileStatus.getPath(), logPathFileStatus.getLen()), null, true, true);
Schema writerSchema = null; Schema writerSchema = null;
HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
while (reader.hasPrev()) { while (reader.hasPrev()) {
@@ -65,19 +62,17 @@ public class LogReaderUtils {
return writerSchema; return writerSchema;
} }
public static Schema readLatestSchemaFromLogFiles(String basePath, List<FileStatus> deltaFileStatus, Configuration config) public static Schema readLatestSchemaFromLogFiles(String basePath, List<String> deltaFilePaths, Configuration config)
throws IOException { throws IOException {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(config).setBasePath(basePath).build(); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(config).setBasePath(basePath).build();
List<String> deltaPaths = deltaFileStatus.stream().map(s -> new HoodieLogFile(s.getPath())) List<String> deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s)))
.sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString()) .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString())
.collect(Collectors.toList()); .collect(Collectors.toList());
Map<String, FileStatus> deltaFilePathToFileStatus = deltaFileStatus.stream().map(entry -> Pair.of(entry.getPath().toString(), entry))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
if (deltaPaths.size() > 0) { if (deltaPaths.size() > 0) {
for (String logPath : deltaPaths) { for (String logPath : deltaPaths) {
FileSystem fs = FSUtils.getFs(logPath, config); FileSystem fs = FSUtils.getFs(logPath, config);
Schema schemaFromLogFile = Schema schemaFromLogFile =
readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), deltaFilePathToFileStatus.get(logPath)); readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), new Path(logPath));
if (schemaFromLogFile != null) { if (schemaFromLogFile != null) {
return schemaFromLogFile; return schemaFromLogFile;
} }
@@ -85,4 +80,5 @@ public class LogReaderUtils {
} }
return null; return null;
} }
} }

View File

@@ -61,7 +61,7 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
* @throws IOException * @throws IOException
*/ */
@Override @Override
public FileStatus[] listPartition(Path partitionPath) throws IOException { protected FileStatus[] listPartition(Path partitionPath) throws IOException {
return tableMetadata.getAllFilesInPartition(partitionPath); return tableMetadata.getAllFilesInPartition(partitionPath);
} }

View File

@@ -82,7 +82,7 @@ public abstract class AbstractRealtimeRecordReader {
* job conf. * job conf.
*/ */
private void init() throws IOException { private void init() throws IOException {
Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFileStatus(), jobConf); Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
if (schemaFromLogFile == null) { if (schemaFromLogFile == null) {
writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf); writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf);
LOG.info("Writer Schema From Parquet => " + writerSchema.getFields()); LOG.info("Writer Schema From Parquet => " + writerSchema.getFields());

View File

@@ -20,14 +20,12 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* Filesplit that wraps the base split and a list of log files to merge deltas from. * Filesplit that wraps the base split and a list of log files to merge deltas from.
@@ -35,7 +33,6 @@ import java.util.stream.Collectors;
public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit { public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit {
private List<String> deltaLogPaths; private List<String> deltaLogPaths;
private List<FileStatus> deltaLogFileStatus;
private String maxCommitTime; private String maxCommitTime;
@@ -47,12 +44,11 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
super(); super();
} }
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<FileStatus> deltaLogFileStatus, String maxCommitTime, public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths, String maxCommitTime,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo)
throws IOException { throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations()); super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
this.deltaLogFileStatus = deltaLogFileStatus; this.deltaLogPaths = deltaLogPaths;
this.deltaLogPaths = deltaLogFileStatus.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
this.maxCommitTime = maxCommitTime; this.maxCommitTime = maxCommitTime;
this.basePath = basePath; this.basePath = basePath;
this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo; this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
@@ -62,10 +58,6 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
return deltaLogPaths; return deltaLogPaths;
} }
public List<FileStatus> getDeltaLogFileStatus() {
return deltaLogFileStatus;
}
public String getMaxCommitTime() { public String getMaxCommitTime() {
return maxCommitTime; return maxCommitTime;
} }

View File

@@ -21,14 +21,12 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit; import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* Realtime File Split with external base file. * Realtime File Split with external base file.
@@ -36,7 +34,6 @@ import java.util.stream.Collectors;
public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit { public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit {
private List<String> deltaLogPaths; private List<String> deltaLogPaths;
private List<FileStatus> deltaLogFileStatus;
private String maxInstantTime; private String maxInstantTime;
@@ -46,12 +43,11 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
super(); super();
} }
public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List<FileStatus> deltaLogFileStatus, public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths,
String maxInstantTime, FileSplit externalFileSplit) throws IOException { String maxInstantTime, FileSplit externalFileSplit) throws IOException {
super(baseSplit, externalFileSplit); super(baseSplit, externalFileSplit);
this.maxInstantTime = maxInstantTime; this.maxInstantTime = maxInstantTime;
this.deltaLogFileStatus = deltaLogFileStatus; this.deltaLogPaths = deltaLogPaths;
this.deltaLogPaths = deltaLogFileStatus.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
this.basePath = basePath; this.basePath = basePath;
} }
@@ -72,11 +68,6 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
return deltaLogPaths; return deltaLogPaths;
} }
@Override
public List<FileStatus> getDeltaLogFileStatus() {
return deltaLogFileStatus;
}
@Override @Override
public String getMaxCommitTime() { public String getMaxCommitTime() {
return maxInstantTime; return maxInstantTime;

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.InputSplitUtils; import org.apache.hudi.hadoop.InputSplitUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplitWithLocationInfo; import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
@@ -42,8 +41,6 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
*/ */
List<String> getDeltaLogPaths(); List<String> getDeltaLogPaths();
List<FileStatus> getDeltaLogFileStatus();
/** /**
* Return Max Instant Time. * Return Max Instant Time.
* @return * @return

View File

@@ -43,7 +43,6 @@ import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.FileSplit;
@@ -131,8 +130,8 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
dataFileSplits.forEach(split -> { dataFileSplits.forEach(split -> {
try { try {
List<FileStatus> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getFileStatus()).collect(Collectors.toList()); .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
if (split instanceof BootstrapBaseFileSplit) { if (split instanceof BootstrapBaseFileSplit) {
BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split; BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.FileSplit;
@@ -57,7 +56,6 @@ public class TestHoodieRealtimeFileSplit {
private HoodieRealtimeFileSplit split; private HoodieRealtimeFileSplit split;
private String basePath; private String basePath;
private List<FileStatus> deltaLogFileStatus;
private List<String> deltaLogPaths; private List<String> deltaLogPaths;
private String fileSplitName; private String fileSplitName;
private FileSplit baseFileSplit; private FileSplit baseFileSplit;
@@ -66,13 +64,12 @@ public class TestHoodieRealtimeFileSplit {
@BeforeEach @BeforeEach
public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception { public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception {
basePath = tempDir.toAbsolutePath().toString(); basePath = tempDir.toAbsolutePath().toString();
deltaLogFileStatus = Collections.singletonList(new FileStatus(0L, false, 0, 0L, 0, new Path(basePath + "/1.log")));
deltaLogPaths = Collections.singletonList(basePath + "/1.log"); deltaLogPaths = Collections.singletonList(basePath + "/1.log");
fileSplitName = basePath + "/test.file"; fileSplitName = basePath + "/test.file";
baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {}); baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {});
maxCommitTime = "10001"; maxCommitTime = "10001";
split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogFileStatus, maxCommitTime, Option.empty()); split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime, Option.empty());
} }
@Test @Test

View File

@@ -41,7 +41,6 @@ import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Field;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -188,7 +187,7 @@ public class TestHoodieRealtimeRecordReader {
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf), new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf),
basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(h -> new FileStatus(0L, false, 0, 0L, 0, h.getPath())).collect(Collectors.toList()), .map(h -> h.getPath().toString()).collect(Collectors.toList()),
instantTime, Option.empty()); instantTime, Option.empty());
// create a RecordReader to be used by HoodieRealtimeRecordReader // create a RecordReader to be used by HoodieRealtimeRecordReader
@@ -257,10 +256,10 @@ public class TestHoodieRealtimeRecordReader {
FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime); FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime);
// create a split with baseFile (parquet file written earlier) and new log file(s) // create a split with baseFile (parquet file written earlier) and new log file(s)
FileStatus logFileFileStatus = new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath()); String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
basePath.toUri().toString(), Collections.singletonList(logFileFileStatus), newCommitTime, Option.empty()); basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty());
// create a RecordReader to be used by HoodieRealtimeRecordReader // create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
@@ -337,10 +336,10 @@ public class TestHoodieRealtimeRecordReader {
InputFormatTestUtil.deltaCommit(basePath, newCommitTime); InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
// create a split with baseFile (parquet file written earlier) and new log file(s) // create a split with baseFile (parquet file written earlier) and new log file(s)
FileStatus logFileStatus = new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath()); String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
basePath.toUri().toString(), Collections.singletonList(logFileStatus), newCommitTime, Option.empty()); basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty());
// create a RecordReader to be used by HoodieRealtimeRecordReader // create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
@@ -450,7 +449,7 @@ public class TestHoodieRealtimeRecordReader {
public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMap.DiskMapType diskMapType, public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws Exception { boolean isCompressionEnabled) throws Exception {
// initial commit // initial commit
List<FileStatus> logFilePaths = new ArrayList<>(); List<String> logFilePaths = new ArrayList<>();
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
String instantTime = "100"; String instantTime = "100";
@@ -471,7 +470,7 @@ public class TestHoodieRealtimeRecordReader {
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime,
numberOfLogRecords, 0, 1); numberOfLogRecords, 0, 1);
long size = writer.getCurrentSize(); long size = writer.getCurrentSize();
logFilePaths.add(new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath())); logFilePaths.add(writer.getLogFile().getPath().toString());
writer.close(); writer.close();
assertTrue(size > 0, "block - size should be > 0"); assertTrue(size > 0, "block - size should be > 0");
@@ -479,7 +478,7 @@ public class TestHoodieRealtimeRecordReader {
newCommitTime = "102"; newCommitTime = "102";
writer = InputFormatTestUtil.writeRollbackBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, writer = InputFormatTestUtil.writeRollbackBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime,
newCommitTime, "101", 1); newCommitTime, "101", 1);
logFilePaths.add(new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath())); logFilePaths.add(writer.getLogFile().getPath().toString());
writer.close(); writer.close();
InputFormatTestUtil.deltaCommit(basePath, newCommitTime); InputFormatTestUtil.deltaCommit(basePath, newCommitTime);