From c3c205fc02155d398e52d4395527c03c710c2bdb Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Mon, 2 Apr 2018 22:53:28 -0700 Subject: [PATCH] Using BufferedFsInputStream to wrap FSInputStream for FSDataInputStream --- .../cli/commands/HoodieLogFileCommand.java | 6 +- .../hoodie/config/HoodieMemoryConfig.java | 12 ++ .../uber/hoodie/config/HoodieWriteConfig.java | 12 ++ .../compact/HoodieRealtimeTableCompactor.java | 2 +- .../common/HoodieTestDataGenerator.java | 15 ++- .../hoodie/io/TestHoodieCommitArchiveLog.java | 118 +++++++++++------- .../uber/hoodie/io/TestHoodieCompactor.java | 2 + .../log/HoodieCompactedLogRecordScanner.java | 4 +- .../common/table/log/HoodieLogFileReader.java | 75 ++++++----- .../common/table/log/HoodieLogFormat.java | 2 +- .../table/log/HoodieLogFormatReader.java | 15 +-- .../hoodie/common/model/HoodieTestUtils.java | 22 +++- .../common/table/log/HoodieLogFormatTest.java | 42 ++++--- .../realtime/HoodieRealtimeRecordReader.java | 9 +- .../HoodieRealtimeRecordReaderTest.java | 1 + 15 files changed, 221 insertions(+), 116 deletions(-) diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java index 9d9790bba..78bd834db 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java @@ -92,6 +92,9 @@ public class HoodieLogFileCommand implements CommandMarker { if (n instanceof HoodieCorruptBlock) { try { instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); + if (instantTime == null) { + throw new Exception("Invalid instant time " + instantTime); + } } catch (Exception e) { numCorruptBlocks++; instantTime = "corrupt_block_" + numCorruptBlocks; @@ -172,7 +175,8 @@ public class HoodieLogFileCommand implements CommandMarker { .getTimestamp(), Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES), Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), - Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED)); + Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), + Integer.valueOf(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); for (HoodieRecord hoodieRecord : scanner) { Optional record = hoodieRecord.getData().getInsertValue(readerSchema); if (allRecords.size() >= limit) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java index aa5f28cc7..3fd6d6c4e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java @@ -45,6 +45,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size"; // Property to set the max memory for compaction public static final String MAX_MEMORY_FOR_COMPACTION_PROP = "hoodie.memory.compaction.max.size"; + // Property to set the max memory for dfs inputstream buffer size + public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size"; + public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 16 * 1024 * 1024; // 16MB private HoodieMemoryConfig(Properties props) { @@ -86,6 +89,12 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { return this; } + public Builder withMaxDFSStreamBufferSize(int maxStreamBufferSize) { + props.setProperty(MAX_DFS_STREAM_BUFFER_SIZE_PROP, + String.valueOf(maxStreamBufferSize)); + return this; + } + /** * Dynamic calculation of max memory to use for for spillable map. user.available.memory = spark.executor.memory * * (1 - spark.memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime @@ -143,6 +152,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { !props.containsKey(MAX_MEMORY_FOR_COMPACTION_PROP), MAX_MEMORY_FOR_COMPACTION_PROP, String.valueOf( getMaxMemoryAllowedForMerge(props.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP)))); + setDefaultOnCondition(props, + !props.containsKey(MAX_DFS_STREAM_BUFFER_SIZE_PROP), + MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); return config; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 683ec952d..633e5695b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -369,6 +369,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP)); } + public int getMaxDFSStreamBufferSize() { + return Integer + .valueOf( + props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP)); + } + public static class Builder { private final Properties props = new Properties(); @@ -469,6 +475,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withMemoryConfig(HoodieMemoryConfig memoryConfig) { + props.putAll(memoryConfig.getProps()); + isMemoryConfigSet = true; + return this; + } + public Builder withAutoCommit(boolean autoCommit) { props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit)); return this; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 8d4f9b7a3..9d68fe1ba 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -105,7 +105,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, metaClient.getBasePath(), operation.getDeltaFilePaths(), readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), - config.getCompactionReverseLogReadEnabled()); + config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize()); if (!scanner.iterator().hasNext()) { return Lists.newArrayList(); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index b6dc2871d..99af0e2e3 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -38,6 +38,7 @@ import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -116,9 +117,14 @@ public class HoodieTestDataGenerator { } public static void createCommitFile(String basePath, String commitTime) throws IOException { + createCommitFile(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf()); + } + + public static void createCommitFile(String basePath, String commitTime, Configuration configuration) + throws IOException { Path commitFile = new Path( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(commitTime)); - FileSystem fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); + FileSystem fs = FSUtils.getFs(basePath, configuration); FSDataOutputStream os = fs.create(commitFile, true); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); try { @@ -130,9 +136,14 @@ public class HoodieTestDataGenerator { } public static void createSavepointFile(String basePath, String commitTime) throws IOException { + createSavepointFile(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf()); + } + + public static void createSavepointFile(String basePath, String commitTime, Configuration configuration) + throws IOException { Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeSavePointFileName(commitTime)); - FileSystem fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); + FileSystem fs = FSUtils.getFs(basePath, configuration); FSDataOutputStream os = fs.create(commitFile, true); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); try { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index b099d8873..0b30faf74 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import com.google.common.collect.Sets; import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry; import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.minicluster.HdfsTestService; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -29,7 +30,6 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieWriteConfig; import java.io.IOException; @@ -42,23 +42,57 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.rules.TemporaryFolder; public class TestHoodieCommitArchiveLog { private String basePath; - private FileSystem fs; private Configuration hadoopConf; + //NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class) + //The implementation and gurantees of many API's differ, for example check rename(src,dst) + // We need to use DFS here instead of LocalFs since the FsDataInputStream.getWrappedStream() returns a + // FsDataInputStream instead of a InputStream and thus throws java.lang.ClassCastException: + // org.apache.hadoop.fs.FSDataInputStream cannot be cast to org.apache.hadoop.fs.FSInputStream + private static MiniDFSCluster dfsCluster; + private static DistributedFileSystem dfs; + private static HdfsTestService hdfsTestService; + + @AfterClass + public static void cleanUp() throws Exception { + if (hdfsTestService != null) { + hdfsTestService.stop(); + dfsCluster.shutdown(); + } + // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the + // same JVM + FileSystem.closeAll(); + } + + @BeforeClass + public static void setUpDFS() throws IOException { + // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the + // same JVM + FileSystem.closeAll(); + if (hdfsTestService == null) { + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + // Create a temp folder as the base path + dfs = dfsCluster.getFileSystem(); + } + } @Before public void init() throws Exception { TemporaryFolder folder = new TemporaryFolder(); folder.create(); basePath = folder.getRoot().getAbsolutePath(); - hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); - fs = FSUtils.getFs(basePath, hadoopConf); + hadoopConf = dfs.getConf(); HoodieTestUtils.init(hadoopConf, basePath); } @@ -68,7 +102,7 @@ public class TestHoodieCommitArchiveLog { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").build(); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, - new HoodieTableMetaClient(fs.getConf(), cfg.getBasePath(), true)); + new HoodieTableMetaClient(dfs.getConf(), cfg.getBasePath(), true)); boolean result = archiveLog.archiveIfRequired(); assertTrue(result); } @@ -81,26 +115,26 @@ public class TestHoodieCommitArchiveLog { HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build()) .forTable("test-trip-table").build(); HoodieTestUtils.init(hadoopConf, basePath); - HoodieTestDataGenerator.createCommitFile(basePath, "100"); - HoodieTestDataGenerator.createCommitFile(basePath, "101"); - HoodieTestDataGenerator.createCommitFile(basePath, "102"); - HoodieTestDataGenerator.createCommitFile(basePath, "103"); - HoodieTestDataGenerator.createCommitFile(basePath, "104"); - HoodieTestDataGenerator.createCommitFile(basePath, "105"); + HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); - HoodieTestUtils.createCleanFiles(basePath, "100"); - HoodieTestUtils.createInflightCleanFiles(basePath, "101"); - HoodieTestUtils.createCleanFiles(basePath, "101"); - HoodieTestUtils.createCleanFiles(basePath, "102"); - HoodieTestUtils.createCleanFiles(basePath, "103"); - HoodieTestUtils.createCleanFiles(basePath, "104"); - HoodieTestUtils.createCleanFiles(basePath, "105"); - HoodieTestUtils.createInflightCleanFiles(basePath, "106", "107"); + HoodieTestUtils.createCleanFiles(basePath, "100", dfs.getConf()); + HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "101"); + HoodieTestUtils.createCleanFiles(basePath, "101", dfs.getConf()); + HoodieTestUtils.createCleanFiles(basePath, "102", dfs.getConf()); + HoodieTestUtils.createCleanFiles(basePath, "103", dfs.getConf()); + HoodieTestUtils.createCleanFiles(basePath, "104", dfs.getConf()); + HoodieTestUtils.createCleanFiles(basePath, "105", dfs.getConf()); + HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "106", "107"); //reload the timeline and get all the commmits before archive timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); @@ -112,7 +146,7 @@ public class TestHoodieCommitArchiveLog { verifyInflightInstants(metaClient, 3); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, - new HoodieTableMetaClient(fs.getConf(), basePath, true)); + new HoodieTableMetaClient(dfs.getConf(), basePath, true)); assertTrue(archiveLog.archiveIfRequired()); @@ -121,7 +155,7 @@ public class TestHoodieCommitArchiveLog { originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList())); //read the file - HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, + HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(dfs, new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")), HoodieArchivedMetaEntry.getClassSchema()); @@ -156,12 +190,12 @@ public class TestHoodieCommitArchiveLog { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); - HoodieTestDataGenerator.createCommitFile(basePath, "100"); - HoodieTestDataGenerator.createCommitFile(basePath, "101"); - HoodieTestDataGenerator.createCommitFile(basePath, "102"); - HoodieTestDataGenerator.createCommitFile(basePath, "103"); + HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf()); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants()); @@ -177,14 +211,14 @@ public class TestHoodieCommitArchiveLog { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); - HoodieTestDataGenerator.createCommitFile(basePath, "100"); - HoodieTestDataGenerator.createCommitFile(basePath, "101"); - HoodieTestDataGenerator.createCommitFile(basePath, "102"); - HoodieTestDataGenerator.createCommitFile(basePath, "103"); - HoodieTestDataGenerator.createCommitFile(basePath, "104"); - HoodieTestDataGenerator.createCommitFile(basePath, "105"); + HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf()); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); @@ -203,15 +237,15 @@ public class TestHoodieCommitArchiveLog { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); - HoodieTestDataGenerator.createCommitFile(basePath, "100"); - HoodieTestDataGenerator.createCommitFile(basePath, "101"); - HoodieTestDataGenerator.createSavepointFile(basePath, "101"); - HoodieTestDataGenerator.createCommitFile(basePath, "102"); - HoodieTestDataGenerator.createCommitFile(basePath, "103"); - HoodieTestDataGenerator.createCommitFile(basePath, "104"); - HoodieTestDataGenerator.createCommitFile(basePath, "105"); + HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf()); + HoodieTestDataGenerator.createSavepointFile(basePath, "101", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf()); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index 2c93f3d18..83c005a9c 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -32,6 +32,7 @@ import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieMemoryConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; @@ -97,6 +98,7 @@ public class TestHoodieCompactor { .withParallelism(2, 2).withCompactionConfig( HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).withInlineCompaction(false) .build()).withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) + .withMemoryConfig(HoodieMemoryConfig.newBuilder().withMaxDFSStreamBufferSize(1 * 1024 * 1024).build()) .forTable("test-trip-table") .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java index 7cd4ee307..d23792fc7 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java @@ -87,7 +87,7 @@ public class HoodieCompactedLogRecordScanner implements public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, - boolean readBlocksLazily, boolean reverseReader) { + boolean readBlocksLazily, boolean reverseReader, int bufferSize) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), basePath); @@ -102,7 +102,7 @@ public class HoodieCompactedLogRecordScanner implements HoodieLogFormatReader logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))) - .collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader); + .collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize); while (logFormatReaderWrapper.hasNext()) { HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); log.info("Scanning log file " + logFile); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java index 1ccf43cfa..dd0348b90 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java @@ -35,7 +35,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; import org.apache.avro.Schema; +import org.apache.hadoop.fs.BufferedFSInputStream; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -47,7 +49,7 @@ import org.apache.log4j.Logger; */ class HoodieLogFileReader implements HoodieLogFormat.Reader { - private static final int DEFAULT_BUFFER_SIZE = 4096; + public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB private static final Logger log = LogManager.getLogger(HoodieLogFileReader.class); private final FSDataInputStream inputStream; @@ -63,7 +65,9 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { - this.inputStream = fs.open(logFile.getPath(), bufferSize); + this.inputStream = new FSDataInputStream( + new BufferedFSInputStream((FSInputStream) fs.open(logFile.getPath(), bufferSize).getWrappedStream(), + bufferSize)); this.logFile = logFile; this.readerSchema = readerSchema; this.readBlockLazily = readBlockLazily; @@ -116,7 +120,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { Map header = null; try { - if (isOldMagic()) { // 1 Read the block type for a log block type = inputStream.readInt(); @@ -131,8 +134,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { // 1 Read the total size of the block blocksize = (int) inputStream.readLong(); } - - } catch (Exception e) { + } catch (EOFException | CorruptedLogFileException e) { // An exception reading any of the above indicates a corrupt block // Create a corrupt block by finding the next OLD_MAGIC marker or EOF return createCorruptBlock(); @@ -237,6 +239,10 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { inputStream.seek(currentPos + blocksize); } catch (EOFException e) { // this is corrupt + // This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream + // release-3.1.0-RC1/DFSInputStream.java#L1455 + // release-3.1.0-RC1/BufferedFSInputStream.java#L73 + inputStream.seek(currentPos); return true; } @@ -256,11 +262,15 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { while (true) { long currentPos = inputStream.getPos(); try { - boolean isEOF = readMagic(); - return isEOF ? inputStream.getPos() : currentPos; - } catch (CorruptedLogFileException e) { - // No luck - advance and try again - inputStream.seek(currentPos + 1); + boolean hasNextMagic = hasNextMagic(); + if (hasNextMagic) { + return currentPos; + } else { + // No luck - advance and try again + inputStream.seek(currentPos + 1); + } + } catch (EOFException e) { + return inputStream.getPos(); } } } @@ -276,12 +286,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { */ public boolean hasNext() { try { - boolean isEOF = readMagic(); - if (isEOF) { - return false; - } - // If not hasNext(), we either we reach EOF or throw an exception on invalid magic header - return true; + return readMagic(); } catch (IOException e) { throw new HoodieIOException("IOException when reading logfile " + logFile, e); } @@ -307,27 +312,35 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { private boolean readMagic() throws IOException { try { - long pos = inputStream.getPos(); - // 1. Read magic header from the start of the block - inputStream.readFully(magicBuffer, 0, 6); - if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) { - inputStream.seek(pos); - // 1. Read old magic header from the start of the block - // (for backwards compatibility of older log files written without log version) - inputStream.readFully(oldMagicBuffer, 0, 4); - if (!Arrays.equals(oldMagicBuffer, HoodieLogFormat.OLD_MAGIC)) { - throw new CorruptedLogFileException( - logFile - + "could not be read. Did not find the magic bytes at the start of the block"); - } + boolean hasMagic = hasNextMagic(); + if (!hasMagic) { + throw new CorruptedLogFileException( + logFile + + "could not be read. Did not find the magic bytes at the start of the block"); } - return false; + return hasMagic; } catch (EOFException e) { // We have reached the EOF - return true; + return false; } } + private boolean hasNextMagic() throws IOException { + long pos = inputStream.getPos(); + // 1. Read magic header from the start of the block + inputStream.readFully(magicBuffer, 0, 6); + if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) { + inputStream.seek(pos); + // 1. Read old magic header from the start of the block + // (for backwards compatibility of older log files written without log version) + inputStream.readFully(oldMagicBuffer, 0, 4); + if (!Arrays.equals(oldMagicBuffer, HoodieLogFormat.OLD_MAGIC)) { + return false; + } + } + return true; + } + @Override public HoodieLogBlock next() { try { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java index cb12f9ef8..30d0093d8 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java @@ -212,7 +212,7 @@ public interface HoodieLogFormat { static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { - return new HoodieLogFileReader(fs, logFile, readerSchema, false, false); + return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false); } /** diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java index 00a3a7498..8e8033b5b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java @@ -34,28 +34,24 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private final Schema readerSchema; private final boolean readBlocksLazily; private final boolean reverseLogReader; + private int bufferSize; private static final Logger log = LogManager.getLogger(HoodieLogFormatReader.class); HoodieLogFormatReader(FileSystem fs, List logFiles, - Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader) throws IOException { + Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader, int bufferSize) throws IOException { this.logFiles = logFiles; this.fs = fs; this.readerSchema = readerSchema; this.readBlocksLazily = readBlocksLazily; this.reverseLogReader = reverseLogReader; + this.bufferSize = bufferSize; if (logFiles.size() > 0) { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, readBlocksLazily, - false); + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); } } - HoodieLogFormatReader(FileSystem fs, List logFiles, - Schema readerSchema) throws IOException { - this(fs, logFiles, readerSchema, false, false); - } - @Override public void close() throws IOException { if (currentReader != null) { @@ -73,8 +69,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { } else if (logFiles.size() > 0) { try { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, - readBlocksLazily, + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 742b05b3f..6aab1e1cd 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -122,14 +122,21 @@ public class HoodieTestUtils { } } - public static final void createInflightCleanFiles(String basePath, String... commitTimes) throws IOException { + public static final void createInflightCleanFiles(String basePath, Configuration configuration, String... commitTimes) + throws IOException { for (String commitTime : commitTimes) { - new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline + Path commitFile = new Path((basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline .makeInflightCleanerFileName( - commitTime)).createNewFile(); + commitTime))); + FileSystem fs = FSUtils.getFs(basePath, configuration); + FSDataOutputStream os = fs.create(commitFile, true); } } + public static final void createInflightCleanFiles(String basePath, String... commitTimes) throws IOException { + createInflightCleanFiles(basePath, HoodieTestUtils.getDefaultHadoopConf(), commitTimes); + } + public static final String createNewDataFile(String basePath, String partitionPath, String commitTime) throws IOException { String fileID = UUID.randomUUID().toString(); @@ -214,10 +221,11 @@ public class HoodieTestUtils { return instant + TEST_EXTENSION + HoodieTimeline.INFLIGHT_EXTENSION; } - public static void createCleanFiles(String basePath, String commitTime) throws IOException { + public static void createCleanFiles(String basePath, String commitTime, Configuration configuration) + throws IOException { Path commitFile = new Path( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(commitTime)); - FileSystem fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); + FileSystem fs = FSUtils.getFs(basePath, configuration); FSDataOutputStream os = fs.create(commitFile, true); try { HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, @@ -233,6 +241,10 @@ public class HoodieTestUtils { } } + public static void createCleanFiles(String basePath, String commitTime) throws IOException { + createCleanFiles(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf()); + } + public static String makeTestFileName(String instant) { return instant + TEST_EXTENSION; } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index 3d37e6816..93eed4279 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -36,6 +36,7 @@ import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; import com.uber.hoodie.common.table.log.block.HoodieCorruptBlock; import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; @@ -75,6 +76,7 @@ public class HoodieLogFormatTest { private FileSystem fs; private Path partitionPath; private static String basePath; + private int bufferSize = 4096; private Boolean readBlocksLazily = true; @@ -399,7 +401,7 @@ public class HoodieLogFormatTest { // scan all log blocks (across multiple log files) HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()), schema, "100", - 10240L, readBlocksLazily, false); + 10240L, readBlocksLazily, false, bufferSize); List scannedRecords = new ArrayList<>(); for (HoodieRecord record : scanner) { @@ -525,7 +527,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "100", 10240L, readBlocksLazily, false); + "100", 10240L, readBlocksLazily, false, bufferSize); assertEquals("", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -585,7 +587,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "102", 10240L, readBlocksLazily, false); + "102", 10240L, readBlocksLazily, false, bufferSize); assertEquals("We read 200 records from 2 write batches", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -663,7 +665,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "103", 10240L, true, false); + "103", 10240L, true, false, bufferSize); assertEquals("We would read 200 records", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -717,7 +719,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "102", 10240L, readBlocksLazily, false); + "102", 10240L, readBlocksLazily, false, bufferSize); assertEquals("We still would read 200 records", 200, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -737,7 +739,7 @@ public class HoodieLogFormatTest { readKeys.clear(); scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", 10240L, readBlocksLazily, - false); + false, bufferSize); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records after rollback of delete", 200, readKeys.size()); } @@ -798,7 +800,7 @@ public class HoodieLogFormatTest { // all data must be rolled back before merge HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "100", 10240L, readBlocksLazily, false); + "100", 10240L, readBlocksLazily, false, bufferSize); assertEquals("We would have scanned 0 records because of rollback", 0, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(); @@ -847,7 +849,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "100", 10240L, readBlocksLazily, false); + "100", 10240L, readBlocksLazily, false, bufferSize); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } @@ -879,7 +881,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "100", 10240L, readBlocksLazily, false); + "100", 10240L, readBlocksLazily, false, bufferSize); assertEquals("We still would read 100 records", 100, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(100); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -929,7 +931,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "101", 10240L, readBlocksLazily, false); + "101", 10240L, readBlocksLazily, false, bufferSize); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } @@ -948,7 +950,6 @@ public class HoodieLogFormatTest { List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); Map header = Maps.newHashMap(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); - header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header); writer = writer.appendBlock(dataBlock); @@ -956,7 +957,7 @@ public class HoodieLogFormatTest { writer = writer.appendBlock(dataBlock); writer.close(); - // Append some arbit byte[] to thee end of the log (mimics a partially written commit) + // Append some arbit byte[] to the end of the log (mimics a partially written commit) fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); // create a block with @@ -969,7 +970,7 @@ public class HoodieLogFormatTest { outputStream.flush(); outputStream.close(); - // Append some arbit byte[] to thee end of the log (mimics a partially written commit) + // Append some arbit byte[] to the end of the log (mimics a partially written commit) fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); outputStream = fs.append(writer.getLogFile().getPath()); // create a block with @@ -989,7 +990,7 @@ public class HoodieLogFormatTest { writer = writer.appendBlock(dataBlock); writer.close(); - // Append some arbit byte[] to thee end of the log (mimics a partially written commit) + // Append some arbit byte[] to the end of the log (mimics a partially written commit) fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); outputStream = fs.append(writer.getLogFile().getPath()); // create a block with @@ -1005,18 +1006,20 @@ public class HoodieLogFormatTest { writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100") .withFs(fs).build(); - // Write 1 rollback block for a failed write + // Write 1 rollback block for the last commit instant header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); + header.put(HeaderMetadataType.TARGET_INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); writer = writer.appendBlock(commandBlock); + writer.close(); List allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "101", 10240L, readBlocksLazily, false); + "101", 10240L, readBlocksLazily, false, bufferSize); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } @@ -1136,7 +1139,7 @@ public class HoodieLogFormatTest { writer.close(); HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), - readBlocksLazily, true); + bufferSize, readBlocksLazily, true); assertTrue("Last block should be available", reader.hasPrev()); HoodieLogBlock prevBlock = reader.prev(); @@ -1207,7 +1210,8 @@ public class HoodieLogFormatTest { writer.close(); // First round of reads - we should be able to read the first block and then EOF - HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), schema, readBlocksLazily, true); + HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), schema, bufferSize, + readBlocksLazily, true); assertTrue("Last block should be available", reader.hasPrev()); HoodieLogBlock block = reader.prev(); @@ -1261,7 +1265,7 @@ public class HoodieLogFormatTest { writer.close(); HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), - readBlocksLazily, true); + bufferSize, readBlocksLazily, true); assertTrue("Third block should be available", reader.hasPrev()); reader.moveToPrev(); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java index 357023b4b..f96060b21 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -76,9 +76,14 @@ public class HoodieRealtimeRecordReader implements RecordReader deltaRecordMap; @@ -136,7 +141,7 @@ public class HoodieRealtimeRecordReader implements RecordReader the commit we are trying to read (if using // readCommit() API) diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index 8e43aa25e..ea9016bb9 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -71,6 +71,7 @@ public class HoodieRealtimeRecordReaderTest { @Before public void setUp() { jobConf = new JobConf(); + jobConf.set(HoodieRealtimeRecordReader.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1 * 1024 * 1024)); hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); fs = FSUtils.getFs(basePath.getRoot().getAbsolutePath(), hadoopConf); }