1
0

[HUDI-1294] Adding inline read and seek based read(batch get) for hfile log blocks in metadata table (#3762)

This commit is contained in:
Sivabalan Narayanan
2021-10-29 12:12:44 -04:00
committed by GitHub
parent 0223c442ec
commit 69ee790a47
17 changed files with 591 additions and 142 deletions

View File

@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestHoodieHFileReaderWriter {
@TempDir File tempDir;
private Path filePath;
@BeforeEach
public void setup() throws IOException {
filePath = new Path(tempDir.toString() + "tempFile.txt");
}
@AfterEach
public void clearTempFile() {
File file = new File(filePath.toString());
if (file.exists()) {
file.delete();
}
}
private HoodieHFileWriter createHFileWriter(Schema avroSchema) throws Exception {
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name());
Configuration conf = new Configuration();
TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class);
String instantTime = "000";
HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024,
filter);
return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier);
}
@Test
public void testWriteReadHFile() throws Exception {
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc");
HoodieHFileWriter writer = createHFileWriter(avroSchema);
List<String> keys = new ArrayList<>();
Map<String, GenericRecord> recordMap = new HashMap<>();
for (int i = 0; i < 100; i++) {
GenericRecord record = new GenericData.Record(avroSchema);
String key = String.format("%s%04d", "key", i);
record.put("_row_key", key);
keys.add(key);
record.put("time", Integer.toString(RANDOM.nextInt()));
record.put("number", i);
writer.writeAvro(key, record);
recordMap.put(key, record);
}
writer.close();
Configuration conf = new Configuration();
CacheConfig cacheConfig = new CacheConfig(conf);
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf));
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())));
hoodieHFileReader.close();
for (int i = 0; i < 20; i++) {
int randomRowstoFetch = 5 + RANDOM.nextInt(50);
Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
List<String> rowsList = new ArrayList<>(rowsToFetch);
Collections.sort(rowsList);
hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf));
List<Pair<String, GenericRecord>> result = hoodieHFileReader.readRecords(rowsList);
assertEquals(result.size(), randomRowstoFetch);
result.forEach(entry -> {
assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
});
hoodieHFileReader.close();
}
}
private Set<String> getRandomKeys(int count, List<String> keys) {
Set<String> rowKeys = new HashSet<>();
int totalKeys = keys.size();
while (rowKeys.size() < count) {
int index = RANDOM.nextInt(totalKeys);
if (!rowKeys.contains(index)) {
rowKeys.add(keys.get(index));
}
}
return rowKeys;
}
}

View File

@@ -160,9 +160,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
doRollbackAndValidate(testTable, "0000003", "0000004");
}
doWriteOperationAndValidate(testTable, "0000005");
// trigger an upsert and validate
// trigger couple of upserts
doWriteOperation(testTable, "0000005");
doWriteOperation(testTable, "0000006");
validateMetadata(testTable, true);
}
@@ -222,9 +221,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
* Test various table operations sync to Metadata Table correctly.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testTableOperations(HoodieTableType tableType) throws Exception {
init(tableType);
@MethodSource("bootstrapAndTableOperationTestArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception {
init(tableType, true, enableFullScan);
doWriteInsertAndUpsert(testTable);
// trigger an upsert
@@ -236,7 +235,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
// trigger an upsert
doWriteOperationAndValidate(testTable, "0000005");
doWriteOperation(testTable, "0000005");
// trigger clean
doCleanAndValidate(testTable, "0000006", singletonList("0000001"));
@@ -255,7 +254,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
doWriteOperation(testTable, "0000002");
doCleanAndValidate(testTable, "0000003", Arrays.asList("0000001"));
if (tableType == MERGE_ON_READ) {
doCompactionAndValidate(testTable, "0000004");
doCompaction(testTable, "0000004");
}
doWriteOperation(testTable, "0000005");
validateMetadata(testTable, emptyList(), true);
@@ -288,7 +287,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
doWriteOperationAndValidate(testTable, "0000003");
// trigger a commit and rollback
doWriteOperationAndValidate(testTable, "0000004");
doWriteOperation(testTable, "0000004");
doRollbackAndValidate(testTable, "0000004", "0000005");
// trigger few upserts and validate
@@ -297,7 +296,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
validateMetadata(testTable);
doWriteOperationAndValidate(testTable, "0000010");
doWriteOperation(testTable, "0000010");
// rollback last commit. and validate.
doRollbackAndValidate(testTable, "0000010", "0000011");
@@ -309,7 +308,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
// roll back of delete
doWriteOperationAndValidate(testTable, "0000014", DELETE);
doWriteOperation(testTable, "0000014", DELETE);
doRollbackAndValidate(testTable, "0000014", "0000015");
// rollback partial commit
@@ -394,9 +393,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
syncTableMetadata(writeConfig);
validateMetadata(testTable);
doWriteOperationAndValidate(testTable, "00000003", INSERT);
doWriteOperationAndValidate(testTable, "00000004", UPSERT);
doWriteOperationAndValidate(testTable, "00000005", UPSERT);
doWriteOperation(testTable, "00000003", INSERT);
doWriteOperation(testTable, "00000004", UPSERT);
doWriteOperation(testTable, "00000005", UPSERT);
// trigger compaction
if (MERGE_ON_READ.equals(tableType)) {
@@ -404,13 +403,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
// trigger an upsert
doWriteOperationAndValidate(testTable, "00000008");
doWriteOperation(testTable, "00000008");
// trigger delete
doWriteOperationAndValidate(testTable, "00000009", DELETE);
doWriteOperation(testTable, "00000009", DELETE);
// trigger clean
doCleanAndValidate(testTable, "00000010", asList("00000003", "00000004"));
// trigger another upsert
doWriteOperationAndValidate(testTable, "00000011");
doWriteOperation(testTable, "00000011");
// trigger clustering
doClusterAndValidate(testTable, "00000012");
@@ -528,7 +527,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
records = dataGen.generateUniqueUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Write 4 (updates and inserts)
newCommitTime = "0000004";
@@ -552,7 +550,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
records = dataGen.generateUpdates(newCommitTime, 5);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
@@ -568,7 +565,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
client.startCommitWithTime(newCommitTime);
client.delete(deleteKeys, newCommitTime);
validateMetadata(client);
// Clean
newCommitTime = "0000009";
@@ -1128,7 +1124,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
Collections.sort(fsFileNames);
Collections.sort(metadataFilenames);
assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length);
assertEquals(fsStatuses.length, partitionToFilesMap.get(partitionPath.toString()).length);
// File sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));

View File

@@ -72,6 +72,10 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
}
public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException {
init(tableType, enableMetadataTable, true);
}
public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan) throws IOException {
this.tableType = tableType;
initPath();
initSparkContexts("TestHoodieMetadata");
@@ -80,7 +84,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
writeConfig = getWriteConfig(true, enableMetadataTable);
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, false,
enableFullScan).build();
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
}
@@ -256,7 +261,13 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics);
}
protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) {
protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics) {
return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true);
}
protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics, boolean enableFullScan) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
.withAutoCommit(autoCommit)
@@ -271,6 +282,7 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(useFileListingMetadata)
.enableFullScan(enableFullScan)
.enableMetrics(enableMetrics).build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).build())

View File

@@ -115,6 +115,20 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Parallelism to use, when listing the table on lake storage.");
public static final ConfigProperty<Boolean> ENABLE_INLINE_READING = ConfigProperty
.key(METADATA_PREFIX + ".enable.inline.reading")
.defaultValue(true)
.sinceVersion("0.10.0")
.withDocumentation("Enable inline reading of Log files. By default log block contents are read as byte[] using regular input stream and records "
+ "are deserialized from it. Enabling this will read each log block as an inline file and read records from the same. For instance, "
+ "for HFileDataBlock, a inline file will be read using HFileReader.");
public static final ConfigProperty<Boolean> ENABLE_FULL_SCAN_LOG_FILES = ConfigProperty
.key(METADATA_PREFIX + ".enable.full.scan.log.files")
.defaultValue(true)
.sinceVersion("0.10.0")
.withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries.");
private HoodieMetadataConfig() {
super();
}
@@ -143,6 +157,10 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return getString(DIR_FILTER_REGEX);
}
public boolean enableFullScan() {
return getBoolean(ENABLE_FULL_SCAN_LOG_FILES);
}
public static class Builder {
private final HoodieMetadataConfig metadataConfig = new HoodieMetadataConfig();
@@ -210,6 +228,11 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return this;
}
public Builder enableFullScan(boolean enableFullScan) {
metadataConfig.setValue(ENABLE_FULL_SCAN_LOG_FILES, String.valueOf(enableFullScan));
return this;
}
public HoodieMetadataConfig build() {
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
return metadataConfig;

View File

@@ -47,6 +47,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
@@ -71,9 +72,9 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlo
* <p>
* This results in two I/O passes over the log file.
*/
public abstract class AbstractHoodieLogRecordScanner {
public abstract class AbstractHoodieLogRecordReader {
private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordScanner.class);
private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordReader.class);
// Reader schema for the records
protected final Schema readerSchema;
@@ -114,12 +115,23 @@ public abstract class AbstractHoodieLogRecordScanner {
private AtomicLong totalCorruptBlocks = new AtomicLong(0);
// Store the last instant log blocks (needed to implement rollback)
private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
// Enables full scan of log records
protected final boolean enableFullScan;
private int totalScannedLogFiles;
// Progress
private float progress = 0.0f;
protected AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
int bufferSize, Option<InstantRange> instantRange, boolean withOperationField) {
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
int bufferSize, Option<InstantRange> instantRange, boolean withOperationField) {
this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField,
true);
}
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
int bufferSize, Option<InstantRange> instantRange, boolean withOperationField,
boolean enableFullScan) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
@@ -132,18 +144,27 @@ public abstract class AbstractHoodieLogRecordScanner {
}
this.totalLogFiles.addAndGet(logFilePaths.size());
this.logFilePaths = logFilePaths;
this.readBlocksLazily = readBlocksLazily;
this.reverseReader = reverseReader;
this.readBlocksLazily = readBlocksLazily;
this.fs = fs;
this.bufferSize = bufferSize;
this.instantRange = instantRange;
this.withOperationField = withOperationField;
this.enableFullScan = enableFullScan;
}
/**
* Scan Log files.
*/
public void scan() {
scan(Option.empty());
}
public void scan(Option<List<String>> keys) {
currentInstantLogBlocks = new ArrayDeque<>();
progress = 0.0f;
totalLogFiles = new AtomicLong(0);
totalRollbacks = new AtomicLong(0);
totalCorruptBlocks = new AtomicLong(0);
totalLogBlocks = new AtomicLong(0);
totalLogRecords = new AtomicLong(0);
HoodieLogFormatReader logFormatReaderWrapper = null;
HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline();
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
@@ -152,7 +173,7 @@ public abstract class AbstractHoodieLogRecordScanner {
// iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize);
readerSchema, readBlocksLazily, reverseReader, bufferSize, !enableFullScan);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
@@ -160,16 +181,16 @@ public abstract class AbstractHoodieLogRecordScanner {
scannedLogFiles.add(logFile);
totalLogFiles.set(scannedLogFiles.size());
// Use the HoodieLogFileReader to iterate through the blocks in the log file
HoodieLogBlock r = logFormatReaderWrapper.next();
final String instantTime = r.getLogBlockHeader().get(INSTANT_TIME);
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
if (r.getBlockType() != CORRUPT_BLOCK
&& !HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
if (logBlock.getBlockType() != CORRUPT_BLOCK
&& !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
)) {
// hit a block with instant time greater than should be processed, stop processing further
break;
}
if (r.getBlockType() != CORRUPT_BLOCK && r.getBlockType() != COMMAND_BLOCK) {
if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) {
if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
|| inflightInstantsTimeline.containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
@@ -180,28 +201,28 @@ public abstract class AbstractHoodieLogRecordScanner {
continue;
}
}
switch (r.getBlockType()) {
switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
LOG.info("Reading a data block from file " + logFile.getPath() + " at instant "
+ r.getLogBlockHeader().get(INSTANT_TIME));
if (isNewInstantBlock(r) && !readBlocksLazily) {
+ logBlock.getLogBlockHeader().get(INSTANT_TIME));
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is an avro data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size());
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
}
// store the current block
currentInstantLogBlocks.push(r);
currentInstantLogBlocks.push(logBlock);
break;
case DELETE_BLOCK:
LOG.info("Reading a delete block from file " + logFile.getPath());
if (isNewInstantBlock(r) && !readBlocksLazily) {
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is a delete data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size());
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
}
// store deletes so can be rolled back
currentInstantLogBlocks.push(r);
currentInstantLogBlocks.push(logBlock);
break;
case COMMAND_BLOCK:
// Consider the following scenario
@@ -218,9 +239,9 @@ public abstract class AbstractHoodieLogRecordScanner {
// both B1 & B2
LOG.info("Reading a command block from file " + logFile.getPath());
// This is a command block - take appropriate action based on the command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
String targetInstantForCommandBlock =
r.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
switch (commandBlock.getType()) { // there can be different types of command blocks
case ROLLBACK_PREVIOUS_BLOCK:
// Rollback the last read log block
@@ -264,7 +285,7 @@ public abstract class AbstractHoodieLogRecordScanner {
LOG.info("Found a corrupt block in " + logFile.getPath());
totalCorruptBlocks.incrementAndGet();
// If there is a corrupt block - we will assume that this was the next data block
currentInstantLogBlocks.push(r);
currentInstantLogBlocks.push(logBlock);
break;
default:
throw new UnsupportedOperationException("Block type not supported yet");
@@ -273,7 +294,7 @@ public abstract class AbstractHoodieLogRecordScanner {
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
LOG.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size());
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
}
// Done
progress = 1.0f;
@@ -308,9 +329,14 @@ public abstract class AbstractHoodieLogRecordScanner {
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to
* handle it.
*/
private void processDataBlock(HoodieDataBlock dataBlock) throws Exception {
private void processDataBlock(HoodieDataBlock dataBlock, Option<List<String>> keys) throws Exception {
// TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use that here
List<IndexedRecord> recs = dataBlock.getRecords();
List<IndexedRecord> recs = new ArrayList<>();
if (!keys.isPresent()) {
recs = dataBlock.getRecords();
} else {
recs = dataBlock.getRecords(keys.get());
}
totalLogRecords.addAndGet(recs.size());
for (IndexedRecord rec : recs) {
processNextRecord(createHoodieRecord(rec));
@@ -342,17 +368,18 @@ public abstract class AbstractHoodieLogRecordScanner {
/**
* Process the set of log blocks belonging to the last instant which is read fully.
*/
private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> lastBlocks, int numLogFilesSeen) throws Exception {
while (!lastBlocks.isEmpty()) {
LOG.info("Number of remaining logblocks to merge " + lastBlocks.size());
private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int numLogFilesSeen,
Option<List<String>> keys) throws Exception {
while (!logBlocks.isEmpty()) {
LOG.info("Number of remaining logblocks to merge " + logBlocks.size());
// poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = lastBlocks.pollLast();
HoodieLogBlock lastBlock = logBlocks.pollLast();
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
processDataBlock((HoodieAvroDataBlock) lastBlock);
processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
break;
case HFILE_DATA_BLOCK:
processDataBlock((HoodieHFileDataBlock) lastBlock);
processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
break;
case DELETE_BLOCK:
Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
@@ -432,6 +459,6 @@ public abstract class AbstractHoodieLogRecordScanner {
throw new UnsupportedOperationException();
}
public abstract AbstractHoodieLogRecordScanner build();
public abstract AbstractHoodieLogRecordReader build();
}
}

View File

@@ -70,17 +70,24 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private long reverseLogFilePosition;
private long lastReverseLogFilePosition;
private boolean reverseReader;
private boolean enableInlineReading;
private boolean closed = false;
private transient Thread shutdownThread = null;
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false);
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader, boolean enableInlineReading) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
this.logFile = logFile;
this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
this.enableInlineReading = enableInlineReading;
if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen();
}
@@ -248,7 +255,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
}
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, readerSchema, header, footer);
contentPosition, contentLength, blockEndPos, readerSchema, header, footer, enableInlineReading);
case DELETE_BLOCK:
return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, header, footer);

View File

@@ -49,7 +49,12 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
boolean reverseLogReader, int bufferSize) throws IOException {
boolean reverseLogReader, int bufferSize) throws IOException {
this(fs, logFiles, readerSchema, readBlocksLazily, reverseLogReader, bufferSize, false);
}
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
boolean reverseLogReader, int bufferSize, boolean enableInlineReading) throws IOException {
this.logFiles = logFiles;
this.fs = fs;
this.readerSchema = readerSchema;
@@ -59,7 +64,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.prevReadersInOpenState = new ArrayList<>();
if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
}
}

View File

@@ -54,7 +54,7 @@ import java.util.Map;
* This results in two I/O passes over the log file.
*/
public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
@@ -77,8 +77,9 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
boolean reverseReader, int bufferSize, String spillableMapBasePath,
Option<InstantRange> instantRange, boolean autoScan,
ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
boolean withOperationField) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField);
boolean withOperationField, boolean enableFullScan) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField,
enableFullScan);
try {
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
@@ -166,7 +167,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
public static class Builder extends AbstractHoodieLogRecordReader.Builder {
protected FileSystem fs;
protected String basePath;
protected List<String> logFilePaths;
@@ -276,7 +277,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath, instantRange, autoScan,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField);
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true);
}
}
}

View File

@@ -31,7 +31,7 @@ import java.util.List;
/**
* A scanner used to scan hoodie unmerged log records.
*/
public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner {
public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReader {
private final LogRecordScannerCallback callback;
@@ -72,7 +72,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
public static class Builder extends AbstractHoodieLogRecordReader.Builder {
private FileSystem fs;
private String basePath;
private List<String> logFilePaths;

View File

@@ -111,6 +111,17 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
return records;
}
/**
* Batch get of keys of interest. Implementation can choose to either do full scan and return matched entries or
* do a seek based parsing and return matched entries.
* @param keys keys of interest.
* @return List of IndexedRecords for the keys of interest.
* @throws IOException
*/
public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
throw new UnsupportedOperationException("On demand batch get based on interested keys not supported");
}
public Schema getSchema() {
// if getSchema was invoked before converting byte [] to records
if (records == null) {
@@ -119,7 +130,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
return schema;
}
private void createRecordsFromContentBytes() throws IOException {
protected void createRecordsFromContentBytes() throws IOException {
if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk
inflate();

View File

@@ -19,12 +19,16 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -44,6 +48,7 @@ import org.apache.hadoop.hbase.util.Pair;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -61,6 +66,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
private static final Logger LOG = LogManager.getLogger(HoodieHFileDataBlock.class);
private static Compression.Algorithm compressionAlgorithm = Compression.Algorithm.GZ;
private static int blockSize = 1 * 1024 * 1024;
private boolean enableInlineReading = false;
public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@@ -71,10 +77,11 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema,
Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer, boolean enableInlineReading) {
super(content, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
footer);
this.enableInlineReading = enableInlineReading;
}
public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
@@ -141,6 +148,50 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
return baos.toByteArray();
}
@Override
protected void createRecordsFromContentBytes() throws IOException {
if (enableInlineReading) {
getRecords(Collections.emptyList());
} else {
super.createRecordsFromContentBytes();
}
}
@Override
public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
readWithInlineFS(keys);
return records;
}
private void readWithInlineFS(List<String> keys) throws IOException {
boolean enableFullScan = keys.isEmpty();
// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
// If readerSchema was not present, use writerSchema
if (schema == null) {
schema = writerSchema;
}
Configuration conf = new Configuration();
CacheConfig cacheConf = new CacheConfig(conf);
Configuration inlineConf = new Configuration();
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName());
Path inlinePath = InLineFSUtils.getInlineFilePath(
getBlockContentLocation().get().getLogFile().getPath(),
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
getBlockContentLocation().get().getContentPositionInLogFile(),
getBlockContentLocation().get().getBlockSize());
if (!enableFullScan) {
// HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks.
Collections.sort(keys);
}
HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath, cacheConf, inlinePath.getFileSystem(inlineConf));
List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords = enableFullScan ? reader.readAllRecords(writerSchema, schema) :
reader.readRecords(keys, schema);
reader.close();
this.records = logRecords.stream().map(t -> t.getSecond()).collect(Collectors.toList());
}
@Override
protected void deserializeRecords() throws IOException {
// Get schema from the header

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
@@ -33,6 +34,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
@@ -55,6 +57,7 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
private Path path;
private Configuration conf;
private HFile.Reader reader;
private FSDataInputStream fsDataInputStream;
private Schema schema;
// Scanner used to read individual keys. This is cached to prevent the overhead of opening the scanner for each
// key retrieval.
@@ -72,6 +75,13 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
this.reader = HFile.createReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf);
}
public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem inlineFs) throws IOException {
this.conf = configuration;
this.path = path;
this.fsDataInputStream = inlineFs.open(path);
this.reader = HFile.createReader(inlineFs, path, cacheConfig, configuration);
}
public HoodieHFileReader(byte[] content) throws IOException {
Configuration conf = new Configuration();
Path path = new Path("hoodie");
@@ -164,6 +174,25 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
return readAllRecords(schema, schema);
}
public List<Pair<String, R>> readRecords(List<String> keys) throws IOException {
reader.loadFileInfo();
Schema schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes())));
return readRecords(keys, schema);
}
public List<Pair<String, R>> readRecords(List<String> keys, Schema schema) throws IOException {
this.schema = schema;
reader.loadFileInfo();
List<Pair<String, R>> records = new ArrayList<>();
for (String key: keys) {
Option<R> value = getRecordByKey(key, schema);
if (value.isPresent()) {
records.add(new Pair(key, value.get()));
}
}
return records;
}
@Override
public Iterator getRecordIterator(Schema readerSchema) throws IOException {
final HFileScanner scanner = reader.getScanner(false, false);
@@ -217,7 +246,7 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
synchronized (this) {
if (keyScanner == null) {
keyScanner = reader.getScanner(true, true);
keyScanner = reader.getScanner(false, true);
}
if (keyScanner.seekTo(kv) == 0) {
@@ -250,6 +279,9 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
try {
reader.close();
reader = null;
if (fsDataInputStream != null) {
fsDataInputStream.close();
}
keyScanner = null;
} catch (IOException e) {
throw new HoodieIOException("Error closing the hfile reader", e);

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hadoop.fs.FileStatus;
@@ -38,10 +39,13 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class BaseTableMetadata implements HoodieTableMetadata {
@@ -126,15 +130,12 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
}
@Override
public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitionPaths)
public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitions)
throws IOException {
if (enabled) {
Map<String, FileStatus[]> partitionsFilesMap = new HashMap<>();
try {
for (String partitionPath : partitionPaths) {
partitionsFilesMap.put(partitionPath, fetchAllFilesInPartition(new Path(partitionPath)));
}
List<Path> partitionPaths = partitions.stream().map(entry -> new Path(entry)).collect(Collectors.toList());
Map<String, FileStatus[]> partitionsFilesMap = fetchAllFilesInPartitionPaths(partitionPaths);
return partitionsFilesMap;
} catch (Exception e) {
throw new HoodieMetadataException("Failed to retrieve files in partition from metadata", e);
@@ -142,7 +143,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
}
return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning())
.getAllFilesInPartitions(partitionPaths);
.getAllFilesInPartitions(partitions);
}
/**
@@ -150,7 +151,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
*/
protected List<String> fetchAllPartitionPaths() throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();
Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKeyFromMetadata(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath());
Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
List<String> partitions = Collections.emptyList();
@@ -184,7 +185,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
}
HoodieTimer timer = new HoodieTimer().startTimer();
Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKeyFromMetadata(partitionName, MetadataPartitionType.FILES.partitionPath());
Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(partitionName, MetadataPartitionType.FILES.partitionPath());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
FileStatus[] statuses = {};
@@ -200,7 +201,48 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
return statuses;
}
protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key, String partitionName);
Map<String, FileStatus[]> fetchAllFilesInPartitionPaths(List<Path> partitionPaths) throws IOException {
Map<String, Path> partitionInfo = new HashMap<>();
boolean foundNonPartitionedPath = false;
for (Path partitionPath: partitionPaths) {
String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath);
if (partitionName.isEmpty()) {
if (partitionInfo.size() > 1) {
throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table");
}
partitionInfo.put(NON_PARTITIONED_NAME, partitionPath);
foundNonPartitionedPath = true;
} else {
if (foundNonPartitionedPath) {
throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table");
}
partitionInfo.put(partitionName, partitionPath);
}
}
HoodieTimer timer = new HoodieTimer().startTimer();
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> partitionsFileStatus =
getRecordsByKeys(new ArrayList<>(partitionInfo.keySet()), MetadataPartitionType.FILES.partitionPath());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
Map<String, FileStatus[]> result = new HashMap<>();
for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry: partitionsFileStatus) {
if (entry.getValue().isPresent()) {
if (!entry.getValue().get().getData().getDeletions().isEmpty()) {
throw new HoodieMetadataException("Metadata record for partition " + entry.getKey() + " is inconsistent: "
+ entry.getValue().get().getData());
}
result.put(partitionInfo.get(entry.getKey()).toString(), entry.getValue().get().getData().getFileStatuses(hadoopConf.get(), partitionInfo.get(entry.getKey())));
}
}
LOG.info("Listed files in partitions from metadata: partition list =" + Arrays.toString(partitionPaths.toArray()));
return result;
}
protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key, String partitionName);
protected abstract List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> key, String partitionName);
protected HoodieEngineContext getEngineContext() {
return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get());

View File

@@ -81,7 +81,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
private final boolean reuse;
// Readers for latest file slice corresponding to file groups in the metadata partition of interest
private Map<String, Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner>> partitionReaders = new ConcurrentHashMap<>();
private Map<String, Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>> partitionReaders = new ConcurrentHashMap<>();
public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
String datasetBasePath, String spillableMapDirectory) {
@@ -120,48 +120,25 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
@Override
protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key, String partitionName) {
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers = openReadersIfNeeded(key, partitionName);
protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key, String partitionName) {
return getRecordsByKeys(Collections.singletonList(key), partitionName).get(0).getValue();
}
protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys, String partitionName) {
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = openReadersIfNeeded(keys.get(0), partitionName);
try {
List<Long> timings = new ArrayList<>();
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieFileReader baseFileReader = readers.getKey();
HoodieMetadataMergedLogRecordScanner logRecordScanner = readers.getRight();
HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
// Retrieve record from base file
HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
if (baseFileReader != null) {
HoodieTimer readTimer = new HoodieTimer().startTimer();
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
if (baseRecord.isPresent()) {
hoodieRecord = metadataTableConfig.populateMetaFields()
? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false)
: SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()), false);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
}
}
timings.add(timer.endTimer());
// Retrieve record from log file
timer.startTimer();
if (logRecordScanner != null) {
Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = logRecordScanner.getRecordByKey(key);
if (logHoodieRecord.isPresent()) {
if (hoodieRecord != null) {
// Merge the payloads
HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
} else {
hoodieRecord = logHoodieRecord.get();
}
}
}
timings.add(timer.endTimer());
LOG.info(String.format("Metadata read for key %s took [baseFileRead, logMerge] %s ms", key, timings));
return Option.ofNullable(hoodieRecord);
// local map to assist in merging with base file records
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = readLogRecords(logRecordScanner, keys, timings);
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = readFromBaseAndMergeWithLogRecords(baseFileReader,
keys, logRecords, timings);
LOG.info(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", keys.size(), timings));
return result;
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for key :" + key, ioe);
throw new HoodieIOException("Error merging records from metadata table for " + keys.size() + " key : ", ioe);
} finally {
if (!reuse) {
close(partitionName);
@@ -169,16 +146,88 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
}
private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
List<String> keys, List<Long> timings) {
HoodieTimer timer = new HoodieTimer().startTimer();
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
// Retrieve records from log file
timer.startTimer();
if (logRecordScanner != null) {
if (metadataConfig.enableFullScan()) {
// path which does full scan of log files
for (String key : keys) {
logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
}
} else {
// this path will do seeks pertaining to the keys passed in
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList = logRecordScanner.getRecordsByKeys(keys);
for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
logRecords.put(entry.getKey(), entry.getValue());
}
}
}
timings.add(timer.endTimer());
return logRecords;
}
private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader,
List<String> keys, Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
List<Long> timings) throws IOException {
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
// merge with base records
HoodieTimer timer = new HoodieTimer().startTimer();
timer.startTimer();
HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
// Retrieve record from base file
if (baseFileReader != null) {
HoodieTimer readTimer = new HoodieTimer().startTimer();
for (String key : keys) {
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
if (baseRecord.isPresent()) {
hoodieRecord = metadataTableConfig.populateMetaFields()
? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false)
: SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()), false);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
// merge base file record w/ log record if present
if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
result.add(Pair.of(key, Option.of(new HoodieRecord(hoodieRecord.getKey(), mergedPayload))));
} else {
// only base record
result.add(Pair.of(key, Option.of(hoodieRecord)));
}
} else {
// only log record
if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
result.add(Pair.of(key, Option.of(new HoodieRecord(hoodieRecord.getKey(), mergedPayload))));
} else { // not found in both base file and log files
result.add(Pair.of(key, Option.empty()));
}
}
}
timings.add(timer.endTimer());
} else {
// no base file at all
timings.add(timer.endTimer());
for (Map.Entry<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecords.entrySet()) {
result.add(Pair.of(entry.getKey(), entry.getValue()));
}
}
return result;
}
/**
* Returns a new pair of readers to the base and log files.
*/
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> openReadersIfNeeded(String key, String partitionName) {
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String key, String partitionName) {
return partitionReaders.computeIfAbsent(partitionName, k -> {
try {
final long baseFileOpenMs;
final long logScannerOpenMs;
HoodieFileReader baseFileReader = null;
HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
HoodieMetadataMergedLogRecordReader logRecordScanner = null;
// Metadata is in sync till the latest completed instant on the dataset
HoodieTimer timer = new HoodieTimer().startTimer();
@@ -192,7 +241,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
// Open the log record scanner using the log files from the latest file slice
Pair<HoodieMetadataMergedLogRecordScanner, Long> logRecordScannerOpenTimePair = getLogRecordScanner(slice);
Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair = getLogRecordScanner(slice);
logRecordScanner = logRecordScannerOpenTimePair.getKey();
logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
@@ -244,7 +293,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return validInstantTimestamps;
}
private Pair<HoodieMetadataMergedLogRecordScanner, Long> getLogRecordScanner(FileSlice slice) {
private Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(FileSlice slice) {
HoodieTimer timer = new HoodieTimer().startTimer();
List<String> logFilePaths = slice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
@@ -261,7 +310,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
// Load the schema
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
HoodieMetadataMergedLogRecordScanner logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
HoodieMetadataMergedLogRecordReader logRecordScanner = HoodieMetadataMergedLogRecordReader.newBuilder()
.withFileSystem(metadataMetaClient.getFs())
.withBasePath(metadataBasePath)
.withLogFilePaths(logFilePaths)
@@ -273,6 +322,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
.withDiskMapType(commonConfig.getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
.withLogBlockTimestamps(validInstantTimestamps)
.enableFullScan(metadataConfig.enableFullScan())
.build();
Long logScannerOpenMs = timer.endTimer();
@@ -319,7 +369,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
private synchronized void close(String partitionName) {
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers = partitionReaders.remove(partitionName);
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = partitionReaders.remove(partitionName);
if (readers != null) {
try {
if (readers.getKey() != null) {

View File

@@ -19,12 +19,16 @@
package org.apache.hudi.metadata;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -32,26 +36,30 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
/**
* A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
* useful in limiting memory usage when only a small subset of updates records are to be read.
*/
public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordScanner {
public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordScanner {
private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class);
// Set of all record keys that are to be read in memory
private Set<String> mergeKeyFilter;
private HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths,
private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize,
String spillableMapBasePath, Set<String> mergeKeyFilter,
ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
Option<InstantRange> instantRange) {
Option<InstantRange> instantRange, boolean enableFullScan) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false);
spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false, enableFullScan);
this.mergeKeyFilter = mergeKeyFilter;
performScan();
if (enableFullScan) {
performScan();
}
}
@Override
@@ -71,8 +79,8 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
/**
* Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
*/
public static HoodieMetadataMergedLogRecordScanner.Builder newBuilder() {
return new HoodieMetadataMergedLogRecordScanner.Builder();
public static HoodieMetadataMergedLogRecordReader.Builder newBuilder() {
return new HoodieMetadataMergedLogRecordReader.Builder();
}
/**
@@ -81,8 +89,22 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
* @param key Key of the record to retrieve
* @return {@code HoodieRecord} if key was found else {@code Option.empty()}
*/
public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key) {
return Option.ofNullable((HoodieRecord) records.get(key));
public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordByKey(String key) {
return Collections.singletonList(Pair.of(key, Option.ofNullable((HoodieRecord) records.get(key))));
}
public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys) {
records.clear();
scan(Option.of(keys));
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> metadataRecords = new ArrayList<>();
keys.forEach(entry -> {
if (records.containsKey(entry)) {
metadataRecords.add(Pair.of(entry, Option.ofNullable((HoodieRecord) records.get(entry))));
} else {
metadataRecords.add(Pair.of(entry, Option.empty()));
}
});
return metadataRecords;
}
/**
@@ -90,6 +112,8 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
*/
public static class Builder extends HoodieMergedLogRecordScanner.Builder {
private Set<String> mergeKeyFilter = Collections.emptySet();
private boolean enableFullScan;
private boolean enableInlineReading;
@Override
public Builder withFileSystem(FileSystem fs) {
@@ -171,11 +195,16 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
return this;
}
public Builder enableFullScan(boolean enableFullScan) {
this.enableFullScan = enableFullScan;
return this;
}
@Override
public HoodieMetadataMergedLogRecordScanner build() {
return new HoodieMetadataMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
public HoodieMetadataMergedLogRecordReader build() {
return new HoodieMetadataMergedLogRecordReader(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter,
diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange);
diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, enableFullScan);
}
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -137,6 +138,24 @@ public class TestHoodieRealtimeRecordReader {
public void testReader(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean partitioned) throws Exception {
testReaderInternal(diskMapType, isCompressionEnabled, partitioned);
}
@Test
public void testHFileInlineReader() throws Exception {
testReaderInternal(ExternalSpillableMap.DiskMapType.BITCASK, false, false,
HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK);
}
private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean partitioned) throws Exception {
testReaderInternal(diskMapType, isCompressionEnabled, partitioned, HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
}
private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean partitioned, HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
@@ -175,7 +194,7 @@ public class TestHoodieRealtimeRecordReader {
} else {
writer =
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant,
instantTime, 120, 0, logVersion);
instantTime, 120, 0, logVersion, logBlockType);
}
long size = writer.getCurrentSize();
writer.close();

View File

@@ -27,6 +27,8 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -301,7 +303,14 @@ public class InputFormatTestUtil {
public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, FileSystem fs, Schema schema, String
fileId,
String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion)
String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) throws IOException, InterruptedException {
return writeDataBlockToLogFile(partitionDir, fs, schema, fileId, baseCommit, newCommit, numberOfRecords, offset, logVersion, HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
}
public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, FileSystem fs, Schema schema, String
fileId,
String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion,
HoodieLogBlock.HoodieLogBlockType logBlockType)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath()))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion)
@@ -314,7 +323,8 @@ public class InputFormatTestUtil {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString());
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
HoodieDataBlock dataBlock = (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) ? new HoodieHFileDataBlock(records, header) :
new HoodieAvroDataBlock(records, header);
writer.appendBlock(dataBlock);
return writer;
}