1
0

[HUDI-1445] Refactor AbstractHoodieLogRecordScanner to use Builder (#2313)

This commit is contained in:
Danny Chan
2020-12-10 20:02:02 +08:00
committed by GitHub
parent bd9cceccb5
commit 4bc45a391a
11 changed files with 431 additions and 82 deletions

View File

@@ -193,13 +193,25 @@ public class HoodieLogFileCommand implements CommandMarker {
if (shouldMerge) { if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS <==================="); System.out.println("===========================> MERGING RECORDS <===================");
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner scanner =
new HoodieMergedLogRecordScanner(fs, client.getBasePath(), logFilePaths, readerSchema, HoodieMergedLogRecordScanner.newBuilder()
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(), .withFileSystem(fs)
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, .withBasePath(client.getBasePath())
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), .withLogFilePaths(logFilePaths)
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), .withReaderSchema(readerSchema)
HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE, .withLatestInstantTime(
HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); client.getActiveTimeline()
.getCommitTimeline().lastInstant().get().getTimestamp())
.withReadBlocksLazily(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))
.withReverseReader(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED))
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
.withMaxMemorySizeInBytes(
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
.build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) { for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema); Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
if (allRecords.size() < limit) { if (allRecords.size() < limit) {

View File

@@ -197,13 +197,23 @@ public class TestHoodieLogFileCommand extends AbstractShellIntegrationTest {
// get expected result of 10 records. // get expected result of 10 records.
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*"))) List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*")))
.map(status -> status.getPath().toString()).collect(Collectors.toList()); .map(status -> status.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
new HoodieMergedLogRecordScanner(fs, tablePath, logFilePaths, schema, INSTANT_TIME, .withFileSystem(fs)
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, .withBasePath(tablePath)
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), .withLogFilePaths(logFilePaths)
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), .withReaderSchema(schema)
HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE, .withLatestInstantTime(INSTANT_TIME)
HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); .withMaxMemorySizeInBytes(
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withReadBlocksLazily(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))
.withReverseReader(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED))
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
.build();
Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator(); Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
int num = 0; int num = 0;

View File

@@ -129,10 +129,18 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
List<String> logFiles = operation.getDeltaFileNames().stream().map( List<String> logFiles = operation.getDeltaFileNames().stream().map(
p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
.collect(toList()); .collect(toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles, HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
readerSchema, maxInstantTime, maxMemoryPerCompaction, config.getCompactionLazyBlockReadEnabled(), .withFileSystem(fs)
config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), .withBasePath(metaClient.getBasePath())
config.getSpillableMapBasePath()); .withLogFilePaths(logFiles)
.withReaderSchema(readerSchema)
.withLatestInstantTime(maxInstantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.build();
if (!scanner.iterator().hasNext()) { if (!scanner.iterator().hasNext()) {
return new ArrayList<>(); return new ArrayList<>();
} }

View File

@@ -105,7 +105,6 @@ public abstract class AbstractHoodieLogRecordScanner {
// Progress // Progress
private float progress = 0.0f; private float progress = 0.0f;
// TODO (NA) - Change this to a builder, this constructor is too long
public AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, public AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize) { String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize) {
this.readerSchema = readerSchema; this.readerSchema = readerSchema;
@@ -358,4 +357,28 @@ public abstract class AbstractHoodieLogRecordScanner {
public long getTotalCorruptBlocks() { public long getTotalCorruptBlocks() {
return totalCorruptBlocks.get(); return totalCorruptBlocks.get();
} }
/**
* Builder used to build {@code AbstractHoodieLogRecordScanner}.
*/
public abstract static class Builder {
public abstract Builder withFileSystem(FileSystem fs);
public abstract Builder withBasePath(String basePath);
public abstract Builder withLogFilePaths(List<String> logFilePaths);
public abstract Builder withReaderSchema(Schema schema);
public abstract Builder withLatestInstantTime(String latestInstantTime);
public abstract Builder withReadBlocksLazily(boolean readBlocksLazily);
public abstract Builder withReverseReader(boolean reverseReader);
public abstract Builder withBufferSize(int bufferSize);
public abstract AbstractHoodieLogRecordScanner build();
}
} }

View File

@@ -105,6 +105,13 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
return numMergedRecordsInLog; return numMergedRecordsInLog;
} }
/**
* Returns the builder for {@code HoodieMergedLogRecordScanner}.
*/
public static HoodieMergedLogRecordScanner.Builder newBuilder() {
return new Builder();
}
@Override @Override
protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws IOException { protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws IOException {
String key = hoodieRecord.getRecordKey(); String key = hoodieRecord.getRecordKey();
@@ -128,5 +135,79 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
public long getTotalTimeTakenToReadAndMergeBlocks() { public long getTotalTimeTakenToReadAndMergeBlocks() {
return totalTimeTakenToReadAndMergeBlocks; return totalTimeTakenToReadAndMergeBlocks;
} }
/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
private FileSystem fs;
private String basePath;
private List<String> logFilePaths;
private Schema readerSchema;
private String latestInstantTime;
private boolean readBlocksLazily;
private boolean reverseReader;
private int bufferSize;
// specific configurations
private Long maxMemorySizeInBytes;
private String spillableMapBasePath;
public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
return this;
}
public Builder withBasePath(String basePath) {
this.basePath = basePath;
return this;
}
public Builder withLogFilePaths(List<String> logFilePaths) {
this.logFilePaths = logFilePaths;
return this;
}
public Builder withReaderSchema(Schema schema) {
this.readerSchema = schema;
return this;
}
public Builder withLatestInstantTime(String latestInstantTime) {
this.latestInstantTime = latestInstantTime;
return this;
}
public Builder withReadBlocksLazily(boolean readBlocksLazily) {
this.readBlocksLazily = readBlocksLazily;
return this;
}
public Builder withReverseReader(boolean reverseReader) {
this.reverseReader = reverseReader;
return this;
}
public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
return this;
}
public Builder withSpillableMapBasePath(String spillableMapBasePath) {
this.spillableMapBasePath = spillableMapBasePath;
return this;
}
@Override
public HoodieMergedLogRecordScanner build() {
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath);
}
}
} }

View File

@@ -41,6 +41,13 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
this.callback = callback; this.callback = callback;
} }
/**
* Returns the builder for {@code HoodieUnMergedLogRecordScanner}.
*/
public static HoodieUnMergedLogRecordScanner.Builder newBuilder() {
return new Builder();
}
@Override @Override
protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws Exception { protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws Exception {
// Just call callback without merging // Just call callback without merging
@@ -60,4 +67,71 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
public void apply(HoodieRecord<? extends HoodieRecordPayload> record) throws Exception; public void apply(HoodieRecord<? extends HoodieRecordPayload> record) throws Exception;
} }
/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
private FileSystem fs;
private String basePath;
private List<String> logFilePaths;
private Schema readerSchema;
private String latestInstantTime;
private boolean readBlocksLazily;
private boolean reverseReader;
private int bufferSize;
// specific configurations
private LogRecordScannerCallback callback;
public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
return this;
}
public Builder withBasePath(String basePath) {
this.basePath = basePath;
return this;
}
public Builder withLogFilePaths(List<String> logFilePaths) {
this.logFilePaths = logFilePaths;
return this;
}
public Builder withReaderSchema(Schema schema) {
this.readerSchema = schema;
return this;
}
public Builder withLatestInstantTime(String latestInstantTime) {
this.latestInstantTime = latestInstantTime;
return this;
}
public Builder withReadBlocksLazily(boolean readBlocksLazily) {
this.readBlocksLazily = readBlocksLazily;
return this;
}
public Builder withReverseReader(boolean reverseReader) {
this.reverseReader = reverseReader;
return this;
}
public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) {
this.callback = callback;
return this;
}
@Override
public HoodieUnMergedLogRecordScanner build() {
return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback);
}
}
} }

View File

@@ -460,9 +460,20 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
writer.close(); writer.close();
// scan all log blocks (across multiple log files) // scan all log blocks (across multiple log files)
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()), schema, "100", .withFileSystem(fs)
10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); .withBasePath(basePath)
.withLogFilePaths(
logFiles.stream()
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))
.withReaderSchema(schema)
.withLatestInstantTime("100")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
List<IndexedRecord> scannedRecords = new ArrayList<>(); List<IndexedRecord> scannedRecords = new ArrayList<>();
for (HoodieRecord record : scanner) { for (HoodieRecord record : scanner) {
@@ -601,8 +612,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList()); .map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100", HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("100")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
assertEquals(200, scanner.getTotalLogRecords()); assertEquals(200, scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -663,8 +684,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList()); .map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "102", HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("102")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches"); assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches");
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -742,8 +773,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList()); .map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "103", HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
10240L, true, false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("103")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(true)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records"); assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records");
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -802,8 +843,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList()); .map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "102", HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("102")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records"); assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records");
final List<String> readKeys = new ArrayList<>(200); final List<String> readKeys = new ArrayList<>(200);
final List<Boolean> emptyPayloads = new ArrayList<>(); final List<Boolean> emptyPayloads = new ArrayList<>();
@@ -833,8 +884,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
writer.appendBlock(commandBlock); writer.appendBlock(commandBlock);
readKeys.clear(); readKeys.clear();
scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", 10240L, readBlocksLazily, scanner = HoodieMergedLogRecordScanner.newBuilder()
false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("101")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete"); assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete");
} }
@@ -898,8 +959,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.map(s -> s.getPath().toString()).collect(Collectors.toList()); .map(s -> s.getPath().toString()).collect(Collectors.toList());
// all data must be rolled back before merge // all data must be rolled back before merge
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100", HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("100")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback"); assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback");
final List<String> readKeys = new ArrayList<>(); final List<String> readKeys = new ArrayList<>();
@@ -949,8 +1020,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList()); .map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100", HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("100")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
} }
@@ -983,8 +1064,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList()); .map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100", HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("100")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records"); assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records");
final List<String> readKeys = new ArrayList<>(100); final List<String> readKeys = new ArrayList<>(100);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -1036,8 +1127,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList()); .map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("101")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
} }
@@ -1126,8 +1227,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList()); .map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("101")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
} }
@@ -1183,8 +1294,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieLogFile.DELTA_EXTENSION, "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
"100", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); .withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("100")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.build();
assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog(), assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog(),
"We would read 100 records"); "We would read 100 records");

View File

@@ -63,17 +63,18 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using // but can return records for completed commits > the commit we are trying to read (if using
// readCommit() API) // readCommit() API)
return new HoodieMergedLogRecordScanner( return HoodieMergedLogRecordScanner.newBuilder()
FSUtils.getFs(split.getPath().toString(), jobConf), .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf))
split.getBasePath(), .withBasePath(split.getBasePath())
split.getDeltaLogPaths(), .withLogFilePaths(split.getDeltaLogPaths())
usesCustomPayload ? getWriterSchema() : getReaderSchema(), .withReaderSchema(usesCustomPayload ? getWriterSchema() : getReaderSchema())
split.getMaxCommitTime(), .withLatestInstantTime(split.getMaxCommitTime())
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), .withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), .withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
false, .withReverseReader(false)
jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), .withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)); .withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.build();
} }
@Override @Override

View File

@@ -77,15 +77,22 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
Option.empty(), x -> x, new DefaultSizeEstimator<>()); Option.empty(), x -> x, new DefaultSizeEstimator<>());
// Consumer of this record reader // Consumer of this record reader
this.iterator = this.executor.getQueue().iterator(); this.iterator = this.executor.getQueue().iterator();
this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), this.jobConf), this.logRecordScanner = HoodieUnMergedLogRecordScanner.newBuilder()
split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(), .withFileSystem(FSUtils.getFs(split.getPath().toString(), this.jobConf))
Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), .withBasePath(split.getBasePath())
false, this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> { .withLogFilePaths(split.getDeltaLogPaths())
.withReaderSchema(getReaderSchema())
.withLatestInstantTime(split.getMaxCommitTime())
.withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
.withReverseReader(false)
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withLogRecordScannerCallback(record -> {
// convert Hoodie log record to Hadoop AvroWritable and buffer // convert Hoodie log record to Hadoop AvroWritable and buffer
GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get(); GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get();
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema()); ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
this.executor.getQueue().insertRecord(aWritable); this.executor.getQueue().insertRecord(aWritable);
}); })
.build();
// Start reading and buffering // Start reading and buffering
this.executor.startProducers(); this.executor.startProducers();
} }

View File

@@ -249,14 +249,21 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
return itr; return itr;
} else { } else {
// If there is no data file, fall back to reading log files // If there is no data file, fall back to reading log files
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(metaClient.getFs(), HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
metaClient.getBasePath(), .withFileSystem(metaClient.getFs())
fileSlice.getLogFiles().map(l -> l.getPath().getName()).collect(Collectors.toList()), .withBasePath(metaClient.getBasePath())
new Schema.Parser().parse(schemaStr), metaClient.getActiveTimeline().getCommitsTimeline() .withLogFilePaths(
.filterCompletedInstants().lastInstant().get().getTimestamp(), fileSlice.getLogFiles().map(l -> l.getPath().getName()).collect(Collectors.toList()))
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, true, false, .withReaderSchema(new Schema.Parser().parse(schemaStr))
HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE, .withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline()
HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); .filterCompletedInstants().lastInstant().get().getTimestamp())
.withMaxMemorySizeInBytes(
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withReadBlocksLazily(true)
.withReverseReader(false)
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
.build();
// readAvro log files // readAvro log files
Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator(); Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator();
Schema schema = new Schema.Parser().parse(schemaStr); Schema schema = new Schema.Parser().parse(schemaStr);

View File

@@ -255,19 +255,24 @@ private object HoodieMergeOnReadRDD {
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = { def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config) val fs = FSUtils.getFs(split.tablePath, config)
new HoodieMergedLogRecordScanner( HoodieMergedLogRecordScanner.newBuilder()
fs, .withFileSystem(fs)
split.tablePath, .withBasePath(split.tablePath)
split.logPaths.get.asJava, .withLogFilePaths(split.logPaths.get.asJava)
logSchema, .withReaderSchema(logSchema)
split.latestCommit, .withLatestInstantTime(split.latestCommit)
split.maxCompactionMemoryInBytes, .withReadBlocksLazily(
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean).getOrElse(false), HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
false, .getOrElse(false))
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, .withReverseReader(false)
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), .withBufferSize(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
.withSpillableMapBasePath(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.build()
} }
} }