From 69ee790a47a5fa90a6acd954a9330cce3ae31c3b Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 29 Oct 2021 12:12:44 -0400 Subject: [PATCH] [HUDI-1294] Adding inline read and seek based read(batch get) for hfile log blocks in metadata table (#3762) --- .../storage/TestHoodieHFileReaderWriter.java | 134 +++++++++++++++++ .../functional/TestHoodieBackedMetadata.java | 38 +++-- .../functional/TestHoodieMetadataBase.java | 16 +- .../common/config/HoodieMetadataConfig.java | 23 +++ ...ava => AbstractHoodieLogRecordReader.java} | 99 ++++++++----- .../common/table/log/HoodieLogFileReader.java | 9 +- .../table/log/HoodieLogFormatReader.java | 9 +- .../log/HoodieMergedLogRecordScanner.java | 11 +- .../log/HoodieUnMergedLogRecordScanner.java | 4 +- .../table/log/block/HoodieDataBlock.java | 13 +- .../table/log/block/HoodieHFileDataBlock.java | 53 ++++++- .../hudi/io/storage/HoodieHFileReader.java | 34 ++++- .../hudi/metadata/BaseTableMetadata.java | 62 ++++++-- .../metadata/HoodieBackedTableMetadata.java | 138 ++++++++++++------ ... HoodieMetadataMergedLogRecordReader.java} | 55 +++++-- .../TestHoodieRealtimeRecordReader.java | 21 ++- .../hadoop/testutils/InputFormatTestUtil.java | 14 +- 17 files changed, 591 insertions(+), 142 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java rename hudi-common/src/main/java/org/apache/hudi/common/table/log/{AbstractHoodieLogRecordScanner.java => AbstractHoodieLogRecordReader.java} (83%) rename hudi-common/src/main/java/org/apache/hudi/metadata/{HoodieMetadataMergedLogRecordScanner.java => HoodieMetadataMergedLogRecordReader.java} (76%) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java new file mode 100644 index 000000000..049206356 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.engine.TaskContextSupplier; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieHFileReaderWriter { + @TempDir File tempDir; + private Path filePath; + + @BeforeEach + public void setup() throws IOException { + filePath = new Path(tempDir.toString() + "tempFile.txt"); + } + + @AfterEach + public void clearTempFile() { + File file = new File(filePath.toString()); + if (file.exists()) { + file.delete(); + } + } + + private HoodieHFileWriter createHFileWriter(Schema avroSchema) throws Exception { + BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name()); + Configuration conf = new Configuration(); + TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class); + String instantTime = "000"; + + HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024, + filter); + return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier); + } + + @Test + public void testWriteReadHFile() throws Exception { + Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); + HoodieHFileWriter writer = createHFileWriter(avroSchema); + List keys = new ArrayList<>(); + Map recordMap = new HashMap<>(); + for (int i = 0; i < 100; i++) { + GenericRecord record = new GenericData.Record(avroSchema); + String key = String.format("%s%04d", "key", i); + record.put("_row_key", key); + keys.add(key); + record.put("time", Integer.toString(RANDOM.nextInt())); + record.put("number", i); + writer.writeAvro(key, record); + recordMap.put(key, record); + } + writer.close(); + + Configuration conf = new Configuration(); + CacheConfig cacheConfig = new CacheConfig(conf); + HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf)); + List> records = hoodieHFileReader.readAllRecords(); + records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()))); + hoodieHFileReader.close(); + + for (int i = 0; i < 20; i++) { + int randomRowstoFetch = 5 + RANDOM.nextInt(50); + Set rowsToFetch = getRandomKeys(randomRowstoFetch, keys); + List rowsList = new ArrayList<>(rowsToFetch); + Collections.sort(rowsList); + hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf)); + List> result = hoodieHFileReader.readRecords(rowsList); + assertEquals(result.size(), randomRowstoFetch); + result.forEach(entry -> { + assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())); + }); + hoodieHFileReader.close(); + } + } + + private Set getRandomKeys(int count, List keys) { + Set rowKeys = new HashSet<>(); + int totalKeys = keys.size(); + while (rowKeys.size() < count) { + int index = RANDOM.nextInt(totalKeys); + if (!rowKeys.contains(index)) { + rowKeys.add(keys.get(index)); + } + } + return rowKeys; + } +} \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 7ea976617..e0c61e157 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -160,9 +160,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { doRollbackAndValidate(testTable, "0000003", "0000004"); } - doWriteOperationAndValidate(testTable, "0000005"); - - // trigger an upsert and validate + // trigger couple of upserts + doWriteOperation(testTable, "0000005"); doWriteOperation(testTable, "0000006"); validateMetadata(testTable, true); } @@ -222,9 +221,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { * Test various table operations sync to Metadata Table correctly. */ @ParameterizedTest - @EnumSource(HoodieTableType.class) - public void testTableOperations(HoodieTableType tableType) throws Exception { - init(tableType); + @MethodSource("bootstrapAndTableOperationTestArgs") + public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception { + init(tableType, true, enableFullScan); doWriteInsertAndUpsert(testTable); // trigger an upsert @@ -236,7 +235,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } // trigger an upsert - doWriteOperationAndValidate(testTable, "0000005"); + doWriteOperation(testTable, "0000005"); // trigger clean doCleanAndValidate(testTable, "0000006", singletonList("0000001")); @@ -255,7 +254,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { doWriteOperation(testTable, "0000002"); doCleanAndValidate(testTable, "0000003", Arrays.asList("0000001")); if (tableType == MERGE_ON_READ) { - doCompactionAndValidate(testTable, "0000004"); + doCompaction(testTable, "0000004"); } doWriteOperation(testTable, "0000005"); validateMetadata(testTable, emptyList(), true); @@ -288,7 +287,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { doWriteOperationAndValidate(testTable, "0000003"); // trigger a commit and rollback - doWriteOperationAndValidate(testTable, "0000004"); + doWriteOperation(testTable, "0000004"); doRollbackAndValidate(testTable, "0000004", "0000005"); // trigger few upserts and validate @@ -297,7 +296,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } validateMetadata(testTable); - doWriteOperationAndValidate(testTable, "0000010"); + doWriteOperation(testTable, "0000010"); // rollback last commit. and validate. doRollbackAndValidate(testTable, "0000010", "0000011"); @@ -309,7 +308,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } // roll back of delete - doWriteOperationAndValidate(testTable, "0000014", DELETE); + doWriteOperation(testTable, "0000014", DELETE); doRollbackAndValidate(testTable, "0000014", "0000015"); // rollback partial commit @@ -394,9 +393,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { syncTableMetadata(writeConfig); validateMetadata(testTable); - doWriteOperationAndValidate(testTable, "00000003", INSERT); - doWriteOperationAndValidate(testTable, "00000004", UPSERT); - doWriteOperationAndValidate(testTable, "00000005", UPSERT); + doWriteOperation(testTable, "00000003", INSERT); + doWriteOperation(testTable, "00000004", UPSERT); + doWriteOperation(testTable, "00000005", UPSERT); // trigger compaction if (MERGE_ON_READ.equals(tableType)) { @@ -404,13 +403,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } // trigger an upsert - doWriteOperationAndValidate(testTable, "00000008"); + doWriteOperation(testTable, "00000008"); // trigger delete - doWriteOperationAndValidate(testTable, "00000009", DELETE); + doWriteOperation(testTable, "00000009", DELETE); // trigger clean doCleanAndValidate(testTable, "00000010", asList("00000003", "00000004")); // trigger another upsert - doWriteOperationAndValidate(testTable, "00000011"); + doWriteOperation(testTable, "00000011"); // trigger clustering doClusterAndValidate(testTable, "00000012"); @@ -528,7 +527,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { records = dataGen.generateUniqueUpdates(newCommitTime, 10); writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); - validateMetadata(client); // Write 4 (updates and inserts) newCommitTime = "0000004"; @@ -552,7 +550,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { records = dataGen.generateUpdates(newCommitTime, 5); writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); - validateMetadata(client); // Compaction if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { @@ -568,7 +565,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { JavaRDD deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey()); client.startCommitWithTime(newCommitTime); client.delete(deleteKeys, newCommitTime); - validateMetadata(client); // Clean newCommitTime = "0000009"; @@ -1128,7 +1124,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { Collections.sort(fsFileNames); Collections.sort(metadataFilenames); - assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length); + assertEquals(fsStatuses.length, partitionToFilesMap.get(partitionPath.toString()).length); // File sizes should be valid Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 85f869f78..7a49dafee 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -72,6 +72,10 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { } public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException { + init(tableType, enableMetadataTable, true); + } + + public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan) throws IOException { this.tableType = tableType; initPath(); initSparkContexts("TestHoodieMetadata"); @@ -80,7 +84,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); - writeConfig = getWriteConfig(true, enableMetadataTable); + writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, false, + enableFullScan).build(); initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable); } @@ -256,7 +261,13 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics); } - protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { + protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, + boolean enableMetrics) { + return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true); + } + + protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, + boolean enableMetrics, boolean enableFullScan) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2) .withAutoCommit(autoCommit) @@ -271,6 +282,7 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .enable(useFileListingMetadata) + .enableFullScan(enableFullScan) .enableMetrics(enableMetrics).build()) .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) .withExecutorMetrics(true).build()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index b74a17ca8..d52629440 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -115,6 +115,20 @@ public final class HoodieMetadataConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation("Parallelism to use, when listing the table on lake storage."); + public static final ConfigProperty ENABLE_INLINE_READING = ConfigProperty + .key(METADATA_PREFIX + ".enable.inline.reading") + .defaultValue(true) + .sinceVersion("0.10.0") + .withDocumentation("Enable inline reading of Log files. By default log block contents are read as byte[] using regular input stream and records " + + "are deserialized from it. Enabling this will read each log block as an inline file and read records from the same. For instance, " + + "for HFileDataBlock, a inline file will be read using HFileReader."); + + public static final ConfigProperty ENABLE_FULL_SCAN_LOG_FILES = ConfigProperty + .key(METADATA_PREFIX + ".enable.full.scan.log.files") + .defaultValue(true) + .sinceVersion("0.10.0") + .withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries."); + private HoodieMetadataConfig() { super(); } @@ -143,6 +157,10 @@ public final class HoodieMetadataConfig extends HoodieConfig { return getString(DIR_FILTER_REGEX); } + public boolean enableFullScan() { + return getBoolean(ENABLE_FULL_SCAN_LOG_FILES); + } + public static class Builder { private final HoodieMetadataConfig metadataConfig = new HoodieMetadataConfig(); @@ -210,6 +228,11 @@ public final class HoodieMetadataConfig extends HoodieConfig { return this; } + public Builder enableFullScan(boolean enableFullScan) { + metadataConfig.setValue(ENABLE_FULL_SCAN_LOG_FILES, String.valueOf(enableFullScan)); + return this; + } + public HoodieMetadataConfig build() { metadataConfig.setDefaults(HoodieMetadataConfig.class.getName()); return metadataConfig; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java similarity index 83% rename from hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 868c7cb89..e2e76ad7d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -47,6 +47,7 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; import java.util.HashSet; @@ -71,9 +72,9 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlo *

* This results in two I/O passes over the log file. */ -public abstract class AbstractHoodieLogRecordScanner { +public abstract class AbstractHoodieLogRecordReader { - private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordScanner.class); + private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordReader.class); // Reader schema for the records protected final Schema readerSchema; @@ -114,12 +115,23 @@ public abstract class AbstractHoodieLogRecordScanner { private AtomicLong totalCorruptBlocks = new AtomicLong(0); // Store the last instant log blocks (needed to implement rollback) private Deque currentInstantLogBlocks = new ArrayDeque<>(); + // Enables full scan of log records + protected final boolean enableFullScan; + private int totalScannedLogFiles; // Progress private float progress = 0.0f; - protected AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, - int bufferSize, Option instantRange, boolean withOperationField) { + protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, + String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, + int bufferSize, Option instantRange, boolean withOperationField) { + this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField, + true); + } + + protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, + String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, + int bufferSize, Option instantRange, boolean withOperationField, + boolean enableFullScan) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build(); @@ -132,18 +144,27 @@ public abstract class AbstractHoodieLogRecordScanner { } this.totalLogFiles.addAndGet(logFilePaths.size()); this.logFilePaths = logFilePaths; - this.readBlocksLazily = readBlocksLazily; this.reverseReader = reverseReader; + this.readBlocksLazily = readBlocksLazily; this.fs = fs; this.bufferSize = bufferSize; this.instantRange = instantRange; this.withOperationField = withOperationField; + this.enableFullScan = enableFullScan; } - /** - * Scan Log files. - */ public void scan() { + scan(Option.empty()); + } + + public void scan(Option> keys) { + currentInstantLogBlocks = new ArrayDeque<>(); + progress = 0.0f; + totalLogFiles = new AtomicLong(0); + totalRollbacks = new AtomicLong(0); + totalCorruptBlocks = new AtomicLong(0); + totalLogBlocks = new AtomicLong(0); + totalLogRecords = new AtomicLong(0); HoodieLogFormatReader logFormatReaderWrapper = null; HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline(); HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants(); @@ -152,7 +173,7 @@ public abstract class AbstractHoodieLogRecordScanner { // iterate over the paths logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), - readerSchema, readBlocksLazily, reverseReader, bufferSize); + readerSchema, readBlocksLazily, reverseReader, bufferSize, !enableFullScan); Set scannedLogFiles = new HashSet<>(); while (logFormatReaderWrapper.hasNext()) { HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); @@ -160,16 +181,16 @@ public abstract class AbstractHoodieLogRecordScanner { scannedLogFiles.add(logFile); totalLogFiles.set(scannedLogFiles.size()); // Use the HoodieLogFileReader to iterate through the blocks in the log file - HoodieLogBlock r = logFormatReaderWrapper.next(); - final String instantTime = r.getLogBlockHeader().get(INSTANT_TIME); + HoodieLogBlock logBlock = logFormatReaderWrapper.next(); + final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); totalLogBlocks.incrementAndGet(); - if (r.getBlockType() != CORRUPT_BLOCK - && !HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime + if (logBlock.getBlockType() != CORRUPT_BLOCK + && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime )) { // hit a block with instant time greater than should be processed, stop processing further break; } - if (r.getBlockType() != CORRUPT_BLOCK && r.getBlockType() != COMMAND_BLOCK) { + if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) { if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) || inflightInstantsTimeline.containsInstant(instantTime)) { // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one @@ -180,28 +201,28 @@ public abstract class AbstractHoodieLogRecordScanner { continue; } } - switch (r.getBlockType()) { + switch (logBlock.getBlockType()) { case HFILE_DATA_BLOCK: case AVRO_DATA_BLOCK: LOG.info("Reading a data block from file " + logFile.getPath() + " at instant " - + r.getLogBlockHeader().get(INSTANT_TIME)); - if (isNewInstantBlock(r) && !readBlocksLazily) { + + logBlock.getLogBlockHeader().get(INSTANT_TIME)); + if (isNewInstantBlock(logBlock) && !readBlocksLazily) { // If this is an avro data block belonging to a different commit/instant, // then merge the last blocks and records into the main result - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size()); + processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys); } // store the current block - currentInstantLogBlocks.push(r); + currentInstantLogBlocks.push(logBlock); break; case DELETE_BLOCK: LOG.info("Reading a delete block from file " + logFile.getPath()); - if (isNewInstantBlock(r) && !readBlocksLazily) { + if (isNewInstantBlock(logBlock) && !readBlocksLazily) { // If this is a delete data block belonging to a different commit/instant, // then merge the last blocks and records into the main result - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size()); + processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys); } // store deletes so can be rolled back - currentInstantLogBlocks.push(r); + currentInstantLogBlocks.push(logBlock); break; case COMMAND_BLOCK: // Consider the following scenario @@ -218,9 +239,9 @@ public abstract class AbstractHoodieLogRecordScanner { // both B1 & B2 LOG.info("Reading a command block from file " + logFile.getPath()); // This is a command block - take appropriate action based on the command - HoodieCommandBlock commandBlock = (HoodieCommandBlock) r; + HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock; String targetInstantForCommandBlock = - r.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME); + logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME); switch (commandBlock.getType()) { // there can be different types of command blocks case ROLLBACK_PREVIOUS_BLOCK: // Rollback the last read log block @@ -264,7 +285,7 @@ public abstract class AbstractHoodieLogRecordScanner { LOG.info("Found a corrupt block in " + logFile.getPath()); totalCorruptBlocks.incrementAndGet(); // If there is a corrupt block - we will assume that this was the next data block - currentInstantLogBlocks.push(r); + currentInstantLogBlocks.push(logBlock); break; default: throw new UnsupportedOperationException("Block type not supported yet"); @@ -273,7 +294,7 @@ public abstract class AbstractHoodieLogRecordScanner { // merge the last read block when all the blocks are done reading if (!currentInstantLogBlocks.isEmpty()) { LOG.info("Merging the final data blocks"); - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size()); + processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys); } // Done progress = 1.0f; @@ -308,9 +329,14 @@ public abstract class AbstractHoodieLogRecordScanner { * Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to * handle it. */ - private void processDataBlock(HoodieDataBlock dataBlock) throws Exception { + private void processDataBlock(HoodieDataBlock dataBlock, Option> keys) throws Exception { // TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use that here - List recs = dataBlock.getRecords(); + List recs = new ArrayList<>(); + if (!keys.isPresent()) { + recs = dataBlock.getRecords(); + } else { + recs = dataBlock.getRecords(keys.get()); + } totalLogRecords.addAndGet(recs.size()); for (IndexedRecord rec : recs) { processNextRecord(createHoodieRecord(rec)); @@ -342,17 +368,18 @@ public abstract class AbstractHoodieLogRecordScanner { /** * Process the set of log blocks belonging to the last instant which is read fully. */ - private void processQueuedBlocksForInstant(Deque lastBlocks, int numLogFilesSeen) throws Exception { - while (!lastBlocks.isEmpty()) { - LOG.info("Number of remaining logblocks to merge " + lastBlocks.size()); + private void processQueuedBlocksForInstant(Deque logBlocks, int numLogFilesSeen, + Option> keys) throws Exception { + while (!logBlocks.isEmpty()) { + LOG.info("Number of remaining logblocks to merge " + logBlocks.size()); // poll the element at the bottom of the stack since that's the order it was inserted - HoodieLogBlock lastBlock = lastBlocks.pollLast(); + HoodieLogBlock lastBlock = logBlocks.pollLast(); switch (lastBlock.getBlockType()) { case AVRO_DATA_BLOCK: - processDataBlock((HoodieAvroDataBlock) lastBlock); + processDataBlock((HoodieAvroDataBlock) lastBlock, keys); break; case HFILE_DATA_BLOCK: - processDataBlock((HoodieHFileDataBlock) lastBlock); + processDataBlock((HoodieHFileDataBlock) lastBlock, keys); break; case DELETE_BLOCK: Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey); @@ -432,6 +459,6 @@ public abstract class AbstractHoodieLogRecordScanner { throw new UnsupportedOperationException(); } - public abstract AbstractHoodieLogRecordScanner build(); + public abstract AbstractHoodieLogRecordReader build(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index f0f3842e9..cdf306558 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -70,17 +70,24 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private long reverseLogFilePosition; private long lastReverseLogFilePosition; private boolean reverseReader; + private boolean enableInlineReading; private boolean closed = false; private transient Thread shutdownThread = null; public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { + this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false); + } + + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, + boolean readBlockLazily, boolean reverseReader, boolean enableInlineReading) throws IOException { FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); this.logFile = logFile; this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize); this.readerSchema = readerSchema; this.readBlockLazily = readBlockLazily; this.reverseReader = reverseReader; + this.enableInlineReading = enableInlineReading; if (this.reverseReader) { this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen(); } @@ -248,7 +255,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { } case HFILE_DATA_BLOCK: return new HoodieHFileDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, - contentPosition, contentLength, blockEndPos, readerSchema, header, footer); + contentPosition, contentLength, blockEndPos, readerSchema, header, footer, enableInlineReading); case DELETE_BLOCK: return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, contentPosition, contentLength, blockEndPos, header, footer); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index 72672278b..36fa187aa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -49,7 +49,12 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class); HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily, - boolean reverseLogReader, int bufferSize) throws IOException { + boolean reverseLogReader, int bufferSize) throws IOException { + this(fs, logFiles, readerSchema, readBlocksLazily, reverseLogReader, bufferSize, false); + } + + HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily, + boolean reverseLogReader, int bufferSize, boolean enableInlineReading) throws IOException { this.logFiles = logFiles; this.fs = fs; this.readerSchema = readerSchema; @@ -59,7 +64,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.prevReadersInOpenState = new ArrayList<>(); if (logFiles.size() > 0) { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 18b267294..a8d97ac1b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -54,7 +54,7 @@ import java.util.Map; * This results in two I/O passes over the log file. */ -public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner +public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader implements Iterable> { private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class); @@ -77,8 +77,9 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner boolean reverseReader, int bufferSize, String spillableMapBasePath, Option instantRange, boolean autoScan, ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled, - boolean withOperationField) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField); + boolean withOperationField, boolean enableFullScan) { + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField, + enableFullScan); try { // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), @@ -166,7 +167,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner /** * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ - public static class Builder extends AbstractHoodieLogRecordScanner.Builder { + public static class Builder extends AbstractHoodieLogRecordReader.Builder { protected FileSystem fs; protected String basePath; protected List logFilePaths; @@ -276,7 +277,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, bufferSize, spillableMapBasePath, instantRange, autoScan, - diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField); + diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 8b26f7257..f781a148a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -31,7 +31,7 @@ import java.util.List; /** * A scanner used to scan hoodie unmerged log records. */ -public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner { +public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReader { private final LogRecordScannerCallback callback; @@ -72,7 +72,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann /** * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ - public static class Builder extends AbstractHoodieLogRecordScanner.Builder { + public static class Builder extends AbstractHoodieLogRecordReader.Builder { private FileSystem fs; private String basePath; private List logFilePaths; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index 8f5b741f3..2e4338ef7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -111,6 +111,17 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { return records; } + /** + * Batch get of keys of interest. Implementation can choose to either do full scan and return matched entries or + * do a seek based parsing and return matched entries. + * @param keys keys of interest. + * @return List of IndexedRecords for the keys of interest. + * @throws IOException + */ + public List getRecords(List keys) throws IOException { + throw new UnsupportedOperationException("On demand batch get based on interested keys not supported"); + } + public Schema getSchema() { // if getSchema was invoked before converting byte [] to records if (records == null) { @@ -119,7 +130,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { return schema; } - private void createRecordsFromContentBytes() throws IOException { + protected void createRecordsFromContentBytes() throws IOException { if (readBlockLazily && !getContent().isPresent()) { // read log block contents from disk inflate(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 6d2682a4f..a1e0c1298 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -19,12 +19,16 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieHFileReader; + +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -44,6 +48,7 @@ import org.apache.hadoop.hbase.util.Pair; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -61,6 +66,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { private static final Logger LOG = LogManager.getLogger(HoodieHFileDataBlock.class); private static Compression.Algorithm compressionAlgorithm = Compression.Algorithm.GZ; private static int blockSize = 1 * 1024 * 1024; + private boolean enableInlineReading = false; public HoodieHFileDataBlock(@Nonnull Map logBlockHeader, @Nonnull Map logBlockFooter, @@ -71,10 +77,11 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option content, boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema, - Map header, Map footer) { + Map header, Map footer, boolean enableInlineReading) { super(content, inputStream, readBlockLazily, Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, footer); + this.enableInlineReading = enableInlineReading; } public HoodieHFileDataBlock(@Nonnull List records, @Nonnull Map header) { @@ -141,6 +148,50 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { return baos.toByteArray(); } + @Override + protected void createRecordsFromContentBytes() throws IOException { + if (enableInlineReading) { + getRecords(Collections.emptyList()); + } else { + super.createRecordsFromContentBytes(); + } + } + + @Override + public List getRecords(List keys) throws IOException { + readWithInlineFS(keys); + return records; + } + + private void readWithInlineFS(List keys) throws IOException { + boolean enableFullScan = keys.isEmpty(); + // Get schema from the header + Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); + // If readerSchema was not present, use writerSchema + if (schema == null) { + schema = writerSchema; + } + Configuration conf = new Configuration(); + CacheConfig cacheConf = new CacheConfig(conf); + Configuration inlineConf = new Configuration(); + inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName()); + + Path inlinePath = InLineFSUtils.getInlineFilePath( + getBlockContentLocation().get().getLogFile().getPath(), + getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(), + getBlockContentLocation().get().getContentPositionInLogFile(), + getBlockContentLocation().get().getBlockSize()); + if (!enableFullScan) { + // HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks. + Collections.sort(keys); + } + HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath, cacheConf, inlinePath.getFileSystem(inlineConf)); + List> logRecords = enableFullScan ? reader.readAllRecords(writerSchema, schema) : + reader.readRecords(keys, schema); + reader.close(); + this.records = logRecords.stream().map(t -> t.getSecond()).collect(Collectors.toList()); + } + @Override protected void deserializeRecords() throws IOException { // Get schema from the header diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index b954e57e7..7b80d1a58 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -21,6 +21,7 @@ package org.apache.hudi.io.storage; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; @@ -33,6 +34,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; @@ -55,6 +57,7 @@ public class HoodieHFileReader implements HoodieFileRea private Path path; private Configuration conf; private HFile.Reader reader; + private FSDataInputStream fsDataInputStream; private Schema schema; // Scanner used to read individual keys. This is cached to prevent the overhead of opening the scanner for each // key retrieval. @@ -72,6 +75,13 @@ public class HoodieHFileReader implements HoodieFileRea this.reader = HFile.createReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf); } + public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem inlineFs) throws IOException { + this.conf = configuration; + this.path = path; + this.fsDataInputStream = inlineFs.open(path); + this.reader = HFile.createReader(inlineFs, path, cacheConfig, configuration); + } + public HoodieHFileReader(byte[] content) throws IOException { Configuration conf = new Configuration(); Path path = new Path("hoodie"); @@ -164,6 +174,25 @@ public class HoodieHFileReader implements HoodieFileRea return readAllRecords(schema, schema); } + public List> readRecords(List keys) throws IOException { + reader.loadFileInfo(); + Schema schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes()))); + return readRecords(keys, schema); + } + + public List> readRecords(List keys, Schema schema) throws IOException { + this.schema = schema; + reader.loadFileInfo(); + List> records = new ArrayList<>(); + for (String key: keys) { + Option value = getRecordByKey(key, schema); + if (value.isPresent()) { + records.add(new Pair(key, value.get())); + } + } + return records; + } + @Override public Iterator getRecordIterator(Schema readerSchema) throws IOException { final HFileScanner scanner = reader.getScanner(false, false); @@ -217,7 +246,7 @@ public class HoodieHFileReader implements HoodieFileRea synchronized (this) { if (keyScanner == null) { - keyScanner = reader.getScanner(true, true); + keyScanner = reader.getScanner(false, true); } if (keyScanner.seekTo(kv) == 0) { @@ -250,6 +279,9 @@ public class HoodieHFileReader implements HoodieFileRea try { reader.close(); reader = null; + if (fsDataInputStream != null) { + fsDataInputStream.close(); + } keyScanner = null; } catch (IOException e) { throw new HoodieIOException("Error closing the hfile reader", e); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 1690c9a6b..b560b7694 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hadoop.fs.FileStatus; @@ -38,10 +39,13 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public abstract class BaseTableMetadata implements HoodieTableMetadata { @@ -126,15 +130,12 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { } @Override - public Map getAllFilesInPartitions(List partitionPaths) + public Map getAllFilesInPartitions(List partitions) throws IOException { if (enabled) { - Map partitionsFilesMap = new HashMap<>(); - try { - for (String partitionPath : partitionPaths) { - partitionsFilesMap.put(partitionPath, fetchAllFilesInPartition(new Path(partitionPath))); - } + List partitionPaths = partitions.stream().map(entry -> new Path(entry)).collect(Collectors.toList()); + Map partitionsFilesMap = fetchAllFilesInPartitionPaths(partitionPaths); return partitionsFilesMap; } catch (Exception e) { throw new HoodieMetadataException("Failed to retrieve files in partition from metadata", e); @@ -142,7 +143,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { } return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning()) - .getAllFilesInPartitions(partitionPaths); + .getAllFilesInPartitions(partitions); } /** @@ -150,7 +151,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { */ protected List fetchAllPartitionPaths() throws IOException { HoodieTimer timer = new HoodieTimer().startTimer(); - Option> hoodieRecord = getRecordByKeyFromMetadata(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath()); + Option> hoodieRecord = getRecordByKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath()); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer())); List partitions = Collections.emptyList(); @@ -184,7 +185,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { } HoodieTimer timer = new HoodieTimer().startTimer(); - Option> hoodieRecord = getRecordByKeyFromMetadata(partitionName, MetadataPartitionType.FILES.partitionPath()); + Option> hoodieRecord = getRecordByKey(partitionName, MetadataPartitionType.FILES.partitionPath()); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); FileStatus[] statuses = {}; @@ -200,7 +201,48 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { return statuses; } - protected abstract Option> getRecordByKeyFromMetadata(String key, String partitionName); + Map fetchAllFilesInPartitionPaths(List partitionPaths) throws IOException { + Map partitionInfo = new HashMap<>(); + boolean foundNonPartitionedPath = false; + for (Path partitionPath: partitionPaths) { + String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath); + if (partitionName.isEmpty()) { + if (partitionInfo.size() > 1) { + throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table"); + } + partitionInfo.put(NON_PARTITIONED_NAME, partitionPath); + foundNonPartitionedPath = true; + } else { + if (foundNonPartitionedPath) { + throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table"); + } + partitionInfo.put(partitionName, partitionPath); + } + } + + HoodieTimer timer = new HoodieTimer().startTimer(); + List>>> partitionsFileStatus = + getRecordsByKeys(new ArrayList<>(partitionInfo.keySet()), MetadataPartitionType.FILES.partitionPath()); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); + Map result = new HashMap<>(); + + for (Pair>> entry: partitionsFileStatus) { + if (entry.getValue().isPresent()) { + if (!entry.getValue().get().getData().getDeletions().isEmpty()) { + throw new HoodieMetadataException("Metadata record for partition " + entry.getKey() + " is inconsistent: " + + entry.getValue().get().getData()); + } + result.put(partitionInfo.get(entry.getKey()).toString(), entry.getValue().get().getData().getFileStatuses(hadoopConf.get(), partitionInfo.get(entry.getKey()))); + } + } + + LOG.info("Listed files in partitions from metadata: partition list =" + Arrays.toString(partitionPaths.toArray())); + return result; + } + + protected abstract Option> getRecordByKey(String key, String partitionName); + + protected abstract List>>> getRecordsByKeys(List key, String partitionName); protected HoodieEngineContext getEngineContext() { return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get()); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index b0940a7f3..bf0cf9219 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -81,7 +81,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { private final boolean reuse; // Readers for latest file slice corresponding to file groups in the metadata partition of interest - private Map> partitionReaders = new ConcurrentHashMap<>(); + private Map> partitionReaders = new ConcurrentHashMap<>(); public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapDirectory) { @@ -120,48 +120,25 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { } @Override - protected Option> getRecordByKeyFromMetadata(String key, String partitionName) { - Pair readers = openReadersIfNeeded(key, partitionName); + protected Option> getRecordByKey(String key, String partitionName) { + return getRecordsByKeys(Collections.singletonList(key), partitionName).get(0).getValue(); + } + + protected List>>> getRecordsByKeys(List keys, String partitionName) { + Pair readers = openReadersIfNeeded(keys.get(0), partitionName); try { List timings = new ArrayList<>(); - HoodieTimer timer = new HoodieTimer().startTimer(); HoodieFileReader baseFileReader = readers.getKey(); - HoodieMetadataMergedLogRecordScanner logRecordScanner = readers.getRight(); + HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); - // Retrieve record from base file - HoodieRecord hoodieRecord = null; - if (baseFileReader != null) { - HoodieTimer readTimer = new HoodieTimer().startTimer(); - Option baseRecord = baseFileReader.getRecordByKey(key); - if (baseRecord.isPresent()) { - hoodieRecord = metadataTableConfig.populateMetaFields() - ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false) - : SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), - Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()), false); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); - } - } - timings.add(timer.endTimer()); - - // Retrieve record from log file - timer.startTimer(); - if (logRecordScanner != null) { - Option> logHoodieRecord = logRecordScanner.getRecordByKey(key); - if (logHoodieRecord.isPresent()) { - if (hoodieRecord != null) { - // Merge the payloads - HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData()); - hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload); - } else { - hoodieRecord = logHoodieRecord.get(); - } - } - } - timings.add(timer.endTimer()); - LOG.info(String.format("Metadata read for key %s took [baseFileRead, logMerge] %s ms", key, timings)); - return Option.ofNullable(hoodieRecord); + // local map to assist in merging with base file records + Map>> logRecords = readLogRecords(logRecordScanner, keys, timings); + List>>> result = readFromBaseAndMergeWithLogRecords(baseFileReader, + keys, logRecords, timings); + LOG.info(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", keys.size(), timings)); + return result; } catch (IOException ioe) { - throw new HoodieIOException("Error merging records from metadata table for key :" + key, ioe); + throw new HoodieIOException("Error merging records from metadata table for " + keys.size() + " key : ", ioe); } finally { if (!reuse) { close(partitionName); @@ -169,16 +146,88 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { } } + private Map>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner, + List keys, List timings) { + HoodieTimer timer = new HoodieTimer().startTimer(); + Map>> logRecords = new HashMap<>(); + // Retrieve records from log file + timer.startTimer(); + if (logRecordScanner != null) { + if (metadataConfig.enableFullScan()) { + // path which does full scan of log files + for (String key : keys) { + logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue()); + } + } else { + // this path will do seeks pertaining to the keys passed in + List>>> logRecordsList = logRecordScanner.getRecordsByKeys(keys); + for (Pair>> entry : logRecordsList) { + logRecords.put(entry.getKey(), entry.getValue()); + } + } + } + timings.add(timer.endTimer()); + return logRecords; + } + + private List>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader, + List keys, Map>> logRecords, + List timings) throws IOException { + List>>> result = new ArrayList<>(); + // merge with base records + HoodieTimer timer = new HoodieTimer().startTimer(); + timer.startTimer(); + HoodieRecord hoodieRecord = null; + // Retrieve record from base file + if (baseFileReader != null) { + HoodieTimer readTimer = new HoodieTimer().startTimer(); + for (String key : keys) { + Option baseRecord = baseFileReader.getRecordByKey(key); + if (baseRecord.isPresent()) { + hoodieRecord = metadataTableConfig.populateMetaFields() + ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false) + : SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), + Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()), false); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); + // merge base file record w/ log record if present + if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) { + HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecord.getData()); + result.add(Pair.of(key, Option.of(new HoodieRecord(hoodieRecord.getKey(), mergedPayload)))); + } else { + // only base record + result.add(Pair.of(key, Option.of(hoodieRecord))); + } + } else { + // only log record + if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) { + HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecord.getData()); + result.add(Pair.of(key, Option.of(new HoodieRecord(hoodieRecord.getKey(), mergedPayload)))); + } else { // not found in both base file and log files + result.add(Pair.of(key, Option.empty())); + } + } + } + timings.add(timer.endTimer()); + } else { + // no base file at all + timings.add(timer.endTimer()); + for (Map.Entry>> entry : logRecords.entrySet()) { + result.add(Pair.of(entry.getKey(), entry.getValue())); + } + } + return result; + } + /** * Returns a new pair of readers to the base and log files. */ - private Pair openReadersIfNeeded(String key, String partitionName) { + private Pair openReadersIfNeeded(String key, String partitionName) { return partitionReaders.computeIfAbsent(partitionName, k -> { try { final long baseFileOpenMs; final long logScannerOpenMs; HoodieFileReader baseFileReader = null; - HoodieMetadataMergedLogRecordScanner logRecordScanner = null; + HoodieMetadataMergedLogRecordReader logRecordScanner = null; // Metadata is in sync till the latest completed instant on the dataset HoodieTimer timer = new HoodieTimer().startTimer(); @@ -192,7 +241,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { baseFileOpenMs = baseFileReaderOpenTimePair.getValue(); // Open the log record scanner using the log files from the latest file slice - Pair logRecordScannerOpenTimePair = getLogRecordScanner(slice); + Pair logRecordScannerOpenTimePair = getLogRecordScanner(slice); logRecordScanner = logRecordScannerOpenTimePair.getKey(); logScannerOpenMs = logRecordScannerOpenTimePair.getValue(); @@ -244,7 +293,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { return validInstantTimestamps; } - private Pair getLogRecordScanner(FileSlice slice) { + private Pair getLogRecordScanner(FileSlice slice) { HoodieTimer timer = new HoodieTimer().startTimer(); List logFilePaths = slice.getLogFiles() .sorted(HoodieLogFile.getLogFileComparator()) @@ -261,7 +310,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { // Load the schema Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build(); - HoodieMetadataMergedLogRecordScanner logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder() + HoodieMetadataMergedLogRecordReader logRecordScanner = HoodieMetadataMergedLogRecordReader.newBuilder() .withFileSystem(metadataMetaClient.getFs()) .withBasePath(metadataBasePath) .withLogFilePaths(logFilePaths) @@ -273,6 +322,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { .withDiskMapType(commonConfig.getSpillableDiskMapType()) .withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled()) .withLogBlockTimestamps(validInstantTimestamps) + .enableFullScan(metadataConfig.enableFullScan()) .build(); Long logScannerOpenMs = timer.endTimer(); @@ -319,7 +369,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { } private synchronized void close(String partitionName) { - Pair readers = partitionReaders.remove(partitionName); + Pair readers = partitionReaders.remove(partitionName); if (readers != null) { try { if (readers.getKey() != null) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java similarity index 76% rename from hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java rename to hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java index 3132ea634..131ca3b91 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java @@ -19,12 +19,16 @@ package org.apache.hudi.metadata; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -32,26 +36,30 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.common.util.collection.Pair; /** * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is * useful in limiting memory usage when only a small subset of updates records are to be read. */ -public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordScanner { +public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordScanner { + + private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class); // Set of all record keys that are to be read in memory private Set mergeKeyFilter; - private HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, + private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize, String spillableMapBasePath, Set mergeKeyFilter, ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled, - Option instantRange) { + Option instantRange, boolean enableFullScan) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize, - spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false); + spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false, enableFullScan); this.mergeKeyFilter = mergeKeyFilter; - - performScan(); + if (enableFullScan) { + performScan(); + } } @Override @@ -71,8 +79,8 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS /** * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}. */ - public static HoodieMetadataMergedLogRecordScanner.Builder newBuilder() { - return new HoodieMetadataMergedLogRecordScanner.Builder(); + public static HoodieMetadataMergedLogRecordReader.Builder newBuilder() { + return new HoodieMetadataMergedLogRecordReader.Builder(); } /** @@ -81,8 +89,22 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS * @param key Key of the record to retrieve * @return {@code HoodieRecord} if key was found else {@code Option.empty()} */ - public Option> getRecordByKey(String key) { - return Option.ofNullable((HoodieRecord) records.get(key)); + public List>>> getRecordByKey(String key) { + return Collections.singletonList(Pair.of(key, Option.ofNullable((HoodieRecord) records.get(key)))); + } + + public List>>> getRecordsByKeys(List keys) { + records.clear(); + scan(Option.of(keys)); + List>>> metadataRecords = new ArrayList<>(); + keys.forEach(entry -> { + if (records.containsKey(entry)) { + metadataRecords.add(Pair.of(entry, Option.ofNullable((HoodieRecord) records.get(entry)))); + } else { + metadataRecords.add(Pair.of(entry, Option.empty())); + } + }); + return metadataRecords; } /** @@ -90,6 +112,8 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS */ public static class Builder extends HoodieMergedLogRecordScanner.Builder { private Set mergeKeyFilter = Collections.emptySet(); + private boolean enableFullScan; + private boolean enableInlineReading; @Override public Builder withFileSystem(FileSystem fs) { @@ -171,11 +195,16 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS return this; } + public Builder enableFullScan(boolean enableFullScan) { + this.enableFullScan = enableFullScan; + return this; + } + @Override - public HoodieMetadataMergedLogRecordScanner build() { - return new HoodieMetadataMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, + public HoodieMetadataMergedLogRecordReader build() { + return new HoodieMetadataMergedLogRecordReader(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter, - diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange); + diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, enableFullScan); } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index a647da9b9..1771db056 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -137,6 +138,24 @@ public class TestHoodieRealtimeRecordReader { public void testReader(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean partitioned) throws Exception { + testReaderInternal(diskMapType, isCompressionEnabled, partitioned); + } + + @Test + public void testHFileInlineReader() throws Exception { + testReaderInternal(ExternalSpillableMap.DiskMapType.BITCASK, false, false, + HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK); + } + + private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean partitioned) throws Exception { + testReaderInternal(diskMapType, isCompressionEnabled, partitioned, HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK); + } + + private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean partitioned, HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception { // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); @@ -175,7 +194,7 @@ public class TestHoodieRealtimeRecordReader { } else { writer = InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant, - instantTime, 120, 0, logVersion); + instantTime, 120, 0, logVersion, logBlockType); } long size = writer.getCurrentSize(); writer.close(); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index d10ccfca9..13d921979 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -27,6 +27,8 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; @@ -301,7 +303,14 @@ public class InputFormatTestUtil { public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, FileSystem fs, Schema schema, String fileId, - String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) + String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) throws IOException, InterruptedException { + return writeDataBlockToLogFile(partitionDir, fs, schema, fileId, baseCommit, newCommit, numberOfRecords, offset, logVersion, HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK); + } + + public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, FileSystem fs, Schema schema, String + fileId, + String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion, + HoodieLogBlock.HoodieLogBlockType logBlockType) throws InterruptedException, IOException { HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion) @@ -314,7 +323,8 @@ public class InputFormatTestUtil { Map header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); - HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header); + HoodieDataBlock dataBlock = (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) ? new HoodieHFileDataBlock(records, header) : + new HoodieAvroDataBlock(records, header); writer.appendBlock(dataBlock); return writer; }