1
0

Using BufferedFsInputStream to wrap FSInputStream for FSDataInputStream

This commit is contained in:
Nishith Agarwal
2018-04-02 22:53:28 -07:00
committed by vinoth chandar
parent 720e42f52a
commit c3c205fc02
15 changed files with 221 additions and 116 deletions

View File

@@ -92,6 +92,9 @@ public class HoodieLogFileCommand implements CommandMarker {
if (n instanceof HoodieCorruptBlock) { if (n instanceof HoodieCorruptBlock) {
try { try {
instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
if (instantTime == null) {
throw new Exception("Invalid instant time " + instantTime);
}
} catch (Exception e) { } catch (Exception e) {
numCorruptBlocks++; numCorruptBlocks++;
instantTime = "corrupt_block_" + numCorruptBlocks; instantTime = "corrupt_block_" + numCorruptBlocks;
@@ -172,7 +175,8 @@ public class HoodieLogFileCommand implements CommandMarker {
.getTimestamp(), .getTimestamp(),
Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES), Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES),
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED)); Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
Integer.valueOf(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) { for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Optional<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema); Optional<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
if (allRecords.size() >= limit) { if (allRecords.size() >= limit) {

View File

@@ -45,6 +45,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size"; public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size";
// Property to set the max memory for compaction // Property to set the max memory for compaction
public static final String MAX_MEMORY_FOR_COMPACTION_PROP = "hoodie.memory.compaction.max.size"; public static final String MAX_MEMORY_FOR_COMPACTION_PROP = "hoodie.memory.compaction.max.size";
// Property to set the max memory for dfs inputstream buffer size
public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 16 * 1024 * 1024; // 16MB
private HoodieMemoryConfig(Properties props) { private HoodieMemoryConfig(Properties props) {
@@ -86,6 +89,12 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
return this; return this;
} }
public Builder withMaxDFSStreamBufferSize(int maxStreamBufferSize) {
props.setProperty(MAX_DFS_STREAM_BUFFER_SIZE_PROP,
String.valueOf(maxStreamBufferSize));
return this;
}
/** /**
* Dynamic calculation of max memory to use for for spillable map. user.available.memory = spark.executor.memory * * Dynamic calculation of max memory to use for for spillable map. user.available.memory = spark.executor.memory *
* (1 - spark.memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime * (1 - spark.memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime
@@ -143,6 +152,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
!props.containsKey(MAX_MEMORY_FOR_COMPACTION_PROP), !props.containsKey(MAX_MEMORY_FOR_COMPACTION_PROP),
MAX_MEMORY_FOR_COMPACTION_PROP, String.valueOf( MAX_MEMORY_FOR_COMPACTION_PROP, String.valueOf(
getMaxMemoryAllowedForMerge(props.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP)))); getMaxMemoryAllowedForMerge(props.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP))));
setDefaultOnCondition(props,
!props.containsKey(MAX_DFS_STREAM_BUFFER_SIZE_PROP),
MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
return config; return config;
} }
} }

View File

@@ -369,6 +369,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP)); props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP));
} }
public int getMaxDFSStreamBufferSize() {
return Integer
.valueOf(
props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP));
}
public static class Builder { public static class Builder {
private final Properties props = new Properties(); private final Properties props = new Properties();
@@ -469,6 +475,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this; return this;
} }
public Builder withMemoryConfig(HoodieMemoryConfig memoryConfig) {
props.putAll(memoryConfig.getProps());
isMemoryConfigSet = true;
return this;
}
public Builder withAutoCommit(boolean autoCommit) { public Builder withAutoCommit(boolean autoCommit) {
props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit)); props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
return this; return this;

View File

@@ -105,7 +105,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs,
metaClient.getBasePath(), operation.getDeltaFilePaths(), readerSchema, maxInstantTime, metaClient.getBasePath(), operation.getDeltaFilePaths(), readerSchema, maxInstantTime,
config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(),
config.getCompactionReverseLogReadEnabled()); config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize());
if (!scanner.iterator().hasNext()) { if (!scanner.iterator().hasNext()) {
return Lists.<WriteStatus>newArrayList(); return Lists.<WriteStatus>newArrayList();
} }

View File

@@ -38,6 +38,7 @@ import java.util.UUID;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@@ -116,9 +117,14 @@ public class HoodieTestDataGenerator {
} }
public static void createCommitFile(String basePath, String commitTime) throws IOException { public static void createCommitFile(String basePath, String commitTime) throws IOException {
createCommitFile(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf());
}
public static void createCommitFile(String basePath, String commitTime, Configuration configuration)
throws IOException {
Path commitFile = new Path( Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(commitTime)); basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(commitTime));
FileSystem fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true); FSDataOutputStream os = fs.create(commitFile, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
try { try {
@@ -130,9 +136,14 @@ public class HoodieTestDataGenerator {
} }
public static void createSavepointFile(String basePath, String commitTime) throws IOException { public static void createSavepointFile(String basePath, String commitTime) throws IOException {
createSavepointFile(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf());
}
public static void createSavepointFile(String basePath, String commitTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME
+ "/" + HoodieTimeline.makeSavePointFileName(commitTime)); + "/" + HoodieTimeline.makeSavePointFileName(commitTime));
FileSystem fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true); FSDataOutputStream os = fs.create(commitFile, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
try { try {

View File

@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry; import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry;
import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.minicluster.HdfsTestService;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -29,7 +30,6 @@ import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import java.io.IOException; import java.io.IOException;
@@ -42,23 +42,57 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
public class TestHoodieCommitArchiveLog { public class TestHoodieCommitArchiveLog {
private String basePath; private String basePath;
private FileSystem fs;
private Configuration hadoopConf; private Configuration hadoopConf;
//NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class)
//The implementation and gurantees of many API's differ, for example check rename(src,dst)
// We need to use DFS here instead of LocalFs since the FsDataInputStream.getWrappedStream() returns a
// FsDataInputStream instead of a InputStream and thus throws java.lang.ClassCastException:
// org.apache.hadoop.fs.FSDataInputStream cannot be cast to org.apache.hadoop.fs.FSInputStream
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private static HdfsTestService hdfsTestService;
@AfterClass
public static void cleanUp() throws Exception {
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
}
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
}
@BeforeClass
public static void setUpDFS() throws IOException {
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
if (hdfsTestService == null) {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
}
}
@Before @Before
public void init() throws Exception { public void init() throws Exception {
TemporaryFolder folder = new TemporaryFolder(); TemporaryFolder folder = new TemporaryFolder();
folder.create(); folder.create();
basePath = folder.getRoot().getAbsolutePath(); basePath = folder.getRoot().getAbsolutePath();
hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); hadoopConf = dfs.getConf();
fs = FSUtils.getFs(basePath, hadoopConf);
HoodieTestUtils.init(hadoopConf, basePath); HoodieTestUtils.init(hadoopConf, basePath);
} }
@@ -68,7 +102,7 @@ public class TestHoodieCommitArchiveLog {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").build(); .forTable("test-trip-table").build();
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
new HoodieTableMetaClient(fs.getConf(), cfg.getBasePath(), true)); new HoodieTableMetaClient(dfs.getConf(), cfg.getBasePath(), true));
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired();
assertTrue(result); assertTrue(result);
} }
@@ -81,26 +115,26 @@ public class TestHoodieCommitArchiveLog {
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build()) HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build())
.forTable("test-trip-table").build(); .forTable("test-trip-table").build();
HoodieTestUtils.init(hadoopConf, basePath); HoodieTestUtils.init(hadoopConf, basePath);
HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102"); HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103"); HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104"); HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105"); HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
HoodieTestUtils.createCleanFiles(basePath, "100"); HoodieTestUtils.createCleanFiles(basePath, "100", dfs.getConf());
HoodieTestUtils.createInflightCleanFiles(basePath, "101"); HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "101");
HoodieTestUtils.createCleanFiles(basePath, "101"); HoodieTestUtils.createCleanFiles(basePath, "101", dfs.getConf());
HoodieTestUtils.createCleanFiles(basePath, "102"); HoodieTestUtils.createCleanFiles(basePath, "102", dfs.getConf());
HoodieTestUtils.createCleanFiles(basePath, "103"); HoodieTestUtils.createCleanFiles(basePath, "103", dfs.getConf());
HoodieTestUtils.createCleanFiles(basePath, "104"); HoodieTestUtils.createCleanFiles(basePath, "104", dfs.getConf());
HoodieTestUtils.createCleanFiles(basePath, "105"); HoodieTestUtils.createCleanFiles(basePath, "105", dfs.getConf());
HoodieTestUtils.createInflightCleanFiles(basePath, "106", "107"); HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "106", "107");
//reload the timeline and get all the commmits before archive //reload the timeline and get all the commmits before archive
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
@@ -112,7 +146,7 @@ public class TestHoodieCommitArchiveLog {
verifyInflightInstants(metaClient, 3); verifyInflightInstants(metaClient, 3);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
new HoodieTableMetaClient(fs.getConf(), basePath, true)); new HoodieTableMetaClient(dfs.getConf(), basePath, true));
assertTrue(archiveLog.archiveIfRequired()); assertTrue(archiveLog.archiveIfRequired());
@@ -121,7 +155,7 @@ public class TestHoodieCommitArchiveLog {
originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList())); originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList()));
//read the file //read the file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(dfs,
new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")), new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")),
HoodieArchivedMetaEntry.getClassSchema()); HoodieArchivedMetaEntry.getClassSchema());
@@ -156,12 +190,12 @@ public class TestHoodieCommitArchiveLog {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig( .forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102"); HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103"); HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants()); assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants());
@@ -177,14 +211,14 @@ public class TestHoodieCommitArchiveLog {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig( .forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102"); HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103"); HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104"); HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105"); HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
@@ -203,15 +237,15 @@ public class TestHoodieCommitArchiveLog {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig( .forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createSavepointFile(basePath, "101"); HoodieTestDataGenerator.createSavepointFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102"); HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103"); HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104"); HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105"); HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());

View File

@@ -32,6 +32,7 @@ import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieMemoryConfig;
import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.index.HoodieIndex;
@@ -97,6 +98,7 @@ public class TestHoodieCompactor {
.withParallelism(2, 2).withCompactionConfig( .withParallelism(2, 2).withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).withInlineCompaction(false) HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).withInlineCompaction(false)
.build()).withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) .build()).withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.withMemoryConfig(HoodieMemoryConfig.newBuilder().withMaxDFSStreamBufferSize(1 * 1024 * 1024).build())
.forTable("test-trip-table") .forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());
} }

View File

@@ -87,7 +87,7 @@ public class HoodieCompactedLogRecordScanner implements
public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes,
boolean readBlocksLazily, boolean reverseReader) { boolean readBlocksLazily, boolean reverseReader, int bufferSize) {
this.readerSchema = readerSchema; this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime; this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), basePath); this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
@@ -102,7 +102,7 @@ public class HoodieCompactedLogRecordScanner implements
HoodieLogFormatReader logFormatReaderWrapper = HoodieLogFormatReader logFormatReaderWrapper =
new HoodieLogFormatReader(fs, new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))) logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile)))
.collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader); .collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize);
while (logFormatReaderWrapper.hasNext()) { while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
log.info("Scanning log file " + logFile); log.info("Scanning log file " + logFile);

View File

@@ -35,7 +35,9 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -47,7 +49,7 @@ import org.apache.log4j.Logger;
*/ */
class HoodieLogFileReader implements HoodieLogFormat.Reader { class HoodieLogFileReader implements HoodieLogFormat.Reader {
private static final int DEFAULT_BUFFER_SIZE = 4096; public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
private static final Logger log = LogManager.getLogger(HoodieLogFileReader.class); private static final Logger log = LogManager.getLogger(HoodieLogFileReader.class);
private final FSDataInputStream inputStream; private final FSDataInputStream inputStream;
@@ -63,7 +65,9 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException { boolean readBlockLazily, boolean reverseReader) throws IOException {
this.inputStream = fs.open(logFile.getPath(), bufferSize); this.inputStream = new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fs.open(logFile.getPath(), bufferSize).getWrappedStream(),
bufferSize));
this.logFile = logFile; this.logFile = logFile;
this.readerSchema = readerSchema; this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily; this.readBlockLazily = readBlockLazily;
@@ -116,7 +120,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
Map<HeaderMetadataType, String> header = null; Map<HeaderMetadataType, String> header = null;
try { try {
if (isOldMagic()) { if (isOldMagic()) {
// 1 Read the block type for a log block // 1 Read the block type for a log block
type = inputStream.readInt(); type = inputStream.readInt();
@@ -131,8 +134,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
// 1 Read the total size of the block // 1 Read the total size of the block
blocksize = (int) inputStream.readLong(); blocksize = (int) inputStream.readLong();
} }
} catch (EOFException | CorruptedLogFileException e) {
} catch (Exception e) {
// An exception reading any of the above indicates a corrupt block // An exception reading any of the above indicates a corrupt block
// Create a corrupt block by finding the next OLD_MAGIC marker or EOF // Create a corrupt block by finding the next OLD_MAGIC marker or EOF
return createCorruptBlock(); return createCorruptBlock();
@@ -237,6 +239,10 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
inputStream.seek(currentPos + blocksize); inputStream.seek(currentPos + blocksize);
} catch (EOFException e) { } catch (EOFException e) {
// this is corrupt // this is corrupt
// This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream
// release-3.1.0-RC1/DFSInputStream.java#L1455
// release-3.1.0-RC1/BufferedFSInputStream.java#L73
inputStream.seek(currentPos);
return true; return true;
} }
@@ -256,11 +262,15 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
while (true) { while (true) {
long currentPos = inputStream.getPos(); long currentPos = inputStream.getPos();
try { try {
boolean isEOF = readMagic(); boolean hasNextMagic = hasNextMagic();
return isEOF ? inputStream.getPos() : currentPos; if (hasNextMagic) {
} catch (CorruptedLogFileException e) { return currentPos;
// No luck - advance and try again } else {
inputStream.seek(currentPos + 1); // No luck - advance and try again
inputStream.seek(currentPos + 1);
}
} catch (EOFException e) {
return inputStream.getPos();
} }
} }
} }
@@ -276,12 +286,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
*/ */
public boolean hasNext() { public boolean hasNext() {
try { try {
boolean isEOF = readMagic(); return readMagic();
if (isEOF) {
return false;
}
// If not hasNext(), we either we reach EOF or throw an exception on invalid magic header
return true;
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("IOException when reading logfile " + logFile, e); throw new HoodieIOException("IOException when reading logfile " + logFile, e);
} }
@@ -307,27 +312,35 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
private boolean readMagic() throws IOException { private boolean readMagic() throws IOException {
try { try {
long pos = inputStream.getPos(); boolean hasMagic = hasNextMagic();
// 1. Read magic header from the start of the block if (!hasMagic) {
inputStream.readFully(magicBuffer, 0, 6); throw new CorruptedLogFileException(
if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) { logFile
inputStream.seek(pos); + "could not be read. Did not find the magic bytes at the start of the block");
// 1. Read old magic header from the start of the block
// (for backwards compatibility of older log files written without log version)
inputStream.readFully(oldMagicBuffer, 0, 4);
if (!Arrays.equals(oldMagicBuffer, HoodieLogFormat.OLD_MAGIC)) {
throw new CorruptedLogFileException(
logFile
+ "could not be read. Did not find the magic bytes at the start of the block");
}
} }
return false; return hasMagic;
} catch (EOFException e) { } catch (EOFException e) {
// We have reached the EOF // We have reached the EOF
return true; return false;
} }
} }
private boolean hasNextMagic() throws IOException {
long pos = inputStream.getPos();
// 1. Read magic header from the start of the block
inputStream.readFully(magicBuffer, 0, 6);
if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) {
inputStream.seek(pos);
// 1. Read old magic header from the start of the block
// (for backwards compatibility of older log files written without log version)
inputStream.readFully(oldMagicBuffer, 0, 4);
if (!Arrays.equals(oldMagicBuffer, HoodieLogFormat.OLD_MAGIC)) {
return false;
}
}
return true;
}
@Override @Override
public HoodieLogBlock next() { public HoodieLogBlock next() {
try { try {

View File

@@ -212,7 +212,7 @@ public interface HoodieLogFormat {
static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema)
throws IOException { throws IOException {
return new HoodieLogFileReader(fs, logFile, readerSchema, false, false); return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false);
} }
/** /**

View File

@@ -34,28 +34,24 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final Schema readerSchema; private final Schema readerSchema;
private final boolean readBlocksLazily; private final boolean readBlocksLazily;
private final boolean reverseLogReader; private final boolean reverseLogReader;
private int bufferSize;
private static final Logger log = LogManager.getLogger(HoodieLogFormatReader.class); private static final Logger log = LogManager.getLogger(HoodieLogFormatReader.class);
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles,
Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader) throws IOException { Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader, int bufferSize) throws IOException {
this.logFiles = logFiles; this.logFiles = logFiles;
this.fs = fs; this.fs = fs;
this.readerSchema = readerSchema; this.readerSchema = readerSchema;
this.readBlocksLazily = readBlocksLazily; this.readBlocksLazily = readBlocksLazily;
this.reverseLogReader = reverseLogReader; this.reverseLogReader = reverseLogReader;
this.bufferSize = bufferSize;
if (logFiles.size() > 0) { if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0); HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, readBlocksLazily, this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
false);
} }
} }
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles,
Schema readerSchema) throws IOException {
this(fs, logFiles, readerSchema, false, false);
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (currentReader != null) { if (currentReader != null) {
@@ -73,8 +69,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
} else if (logFiles.size() > 0) { } else if (logFiles.size() > 0) {
try { try {
HoodieLogFile nextLogFile = logFiles.remove(0); HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily,
readBlocksLazily,
false); false);
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io); throw new HoodieIOException("unable to initialize read with log file ", io);

View File

@@ -122,14 +122,21 @@ public class HoodieTestUtils {
} }
} }
public static final void createInflightCleanFiles(String basePath, String... commitTimes) throws IOException { public static final void createInflightCleanFiles(String basePath, Configuration configuration, String... commitTimes)
throws IOException {
for (String commitTime : commitTimes) { for (String commitTime : commitTimes) {
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline Path commitFile = new Path((basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline
.makeInflightCleanerFileName( .makeInflightCleanerFileName(
commitTime)).createNewFile(); commitTime)));
FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true);
} }
} }
public static final void createInflightCleanFiles(String basePath, String... commitTimes) throws IOException {
createInflightCleanFiles(basePath, HoodieTestUtils.getDefaultHadoopConf(), commitTimes);
}
public static final String createNewDataFile(String basePath, String partitionPath, String commitTime) public static final String createNewDataFile(String basePath, String partitionPath, String commitTime)
throws IOException { throws IOException {
String fileID = UUID.randomUUID().toString(); String fileID = UUID.randomUUID().toString();
@@ -214,10 +221,11 @@ public class HoodieTestUtils {
return instant + TEST_EXTENSION + HoodieTimeline.INFLIGHT_EXTENSION; return instant + TEST_EXTENSION + HoodieTimeline.INFLIGHT_EXTENSION;
} }
public static void createCleanFiles(String basePath, String commitTime) throws IOException { public static void createCleanFiles(String basePath, String commitTime, Configuration configuration)
throws IOException {
Path commitFile = new Path( Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(commitTime)); basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(commitTime));
FileSystem fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true); FSDataOutputStream os = fs.create(commitFile, true);
try { try {
HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
@@ -233,6 +241,10 @@ public class HoodieTestUtils {
} }
} }
public static void createCleanFiles(String basePath, String commitTime) throws IOException {
createCleanFiles(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf());
}
public static String makeTestFileName(String instant) { public static String makeTestFileName(String instant) {
return instant + TEST_EXTENSION; return instant + TEST_EXTENSION;
} }

View File

@@ -36,6 +36,7 @@ import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieCorruptBlock; import com.uber.hoodie.common.table.log.block.HoodieCorruptBlock;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.HoodieAvroUtils;
@@ -75,6 +76,7 @@ public class HoodieLogFormatTest {
private FileSystem fs; private FileSystem fs;
private Path partitionPath; private Path partitionPath;
private static String basePath; private static String basePath;
private int bufferSize = 4096;
private Boolean readBlocksLazily = true; private Boolean readBlocksLazily = true;
@@ -399,7 +401,7 @@ public class HoodieLogFormatTest {
// scan all log blocks (across multiple log files) // scan all log blocks (across multiple log files)
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath,
logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()), schema, "100", logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()), schema, "100",
10240L, readBlocksLazily, false); 10240L, readBlocksLazily, false, bufferSize);
List<IndexedRecord> scannedRecords = new ArrayList<>(); List<IndexedRecord> scannedRecords = new ArrayList<>();
for (HoodieRecord record : scanner) { for (HoodieRecord record : scanner) {
@@ -525,7 +527,7 @@ public class HoodieLogFormatTest {
"100").map(s -> s.getPath().toString()).collect(Collectors.toList()); "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema,
"100", 10240L, readBlocksLazily, false); "100", 10240L, readBlocksLazily, false, bufferSize);
assertEquals("", 200, scanner.getTotalLogRecords()); assertEquals("", 200, scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -585,7 +587,7 @@ public class HoodieLogFormatTest {
"100").map(s -> s.getPath().toString()).collect(Collectors.toList()); "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema,
"102", 10240L, readBlocksLazily, false); "102", 10240L, readBlocksLazily, false, bufferSize);
assertEquals("We read 200 records from 2 write batches", 200, scanner.getTotalLogRecords()); assertEquals("We read 200 records from 2 write batches", 200, scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -663,7 +665,7 @@ public class HoodieLogFormatTest {
"100").map(s -> s.getPath().toString()).collect(Collectors.toList()); "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema,
"103", 10240L, true, false); "103", 10240L, true, false, bufferSize);
assertEquals("We would read 200 records", 200, scanner.getTotalLogRecords()); assertEquals("We would read 200 records", 200, scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -717,7 +719,7 @@ public class HoodieLogFormatTest {
"100").map(s -> s.getPath().toString()).collect(Collectors.toList()); "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema,
"102", 10240L, readBlocksLazily, false); "102", 10240L, readBlocksLazily, false, bufferSize);
assertEquals("We still would read 200 records", 200, scanner.getTotalLogRecords()); assertEquals("We still would read 200 records", 200, scanner.getTotalLogRecords());
final List<String> readKeys = new ArrayList<>(200); final List<String> readKeys = new ArrayList<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -737,7 +739,7 @@ public class HoodieLogFormatTest {
readKeys.clear(); readKeys.clear();
scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", 10240L, readBlocksLazily, scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", 10240L, readBlocksLazily,
false); false, bufferSize);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals("Stream collect should return all 200 records after rollback of delete", 200, readKeys.size()); assertEquals("Stream collect should return all 200 records after rollback of delete", 200, readKeys.size());
} }
@@ -798,7 +800,7 @@ public class HoodieLogFormatTest {
// all data must be rolled back before merge // all data must be rolled back before merge
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema,
"100", 10240L, readBlocksLazily, false); "100", 10240L, readBlocksLazily, false, bufferSize);
assertEquals("We would have scanned 0 records because of rollback", 0, scanner.getTotalLogRecords()); assertEquals("We would have scanned 0 records because of rollback", 0, scanner.getTotalLogRecords());
final List<String> readKeys = new ArrayList<>(); final List<String> readKeys = new ArrayList<>();
@@ -847,7 +849,7 @@ public class HoodieLogFormatTest {
"100").map(s -> s.getPath().toString()).collect(Collectors.toList()); "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema,
"100", 10240L, readBlocksLazily, false); "100", 10240L, readBlocksLazily, false, bufferSize);
assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords());
} }
@@ -879,7 +881,7 @@ public class HoodieLogFormatTest {
"100").map(s -> s.getPath().toString()).collect(Collectors.toList()); "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema,
"100", 10240L, readBlocksLazily, false); "100", 10240L, readBlocksLazily, false, bufferSize);
assertEquals("We still would read 100 records", 100, scanner.getTotalLogRecords()); assertEquals("We still would read 100 records", 100, scanner.getTotalLogRecords());
final List<String> readKeys = new ArrayList<>(100); final List<String> readKeys = new ArrayList<>(100);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -929,7 +931,7 @@ public class HoodieLogFormatTest {
"100").map(s -> s.getPath().toString()).collect(Collectors.toList()); "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema,
"101", 10240L, readBlocksLazily, false); "101", 10240L, readBlocksLazily, false, bufferSize);
assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords());
} }
@@ -948,7 +950,6 @@ public class HoodieLogFormatTest {
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap(); Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
@@ -956,7 +957,7 @@ public class HoodieLogFormatTest {
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
writer.close(); writer.close();
// Append some arbit byte[] to thee end of the log (mimics a partially written commit) // Append some arbit byte[] to the end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
// create a block with // create a block with
@@ -969,7 +970,7 @@ public class HoodieLogFormatTest {
outputStream.flush(); outputStream.flush();
outputStream.close(); outputStream.close();
// Append some arbit byte[] to thee end of the log (mimics a partially written commit) // Append some arbit byte[] to the end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
outputStream = fs.append(writer.getLogFile().getPath()); outputStream = fs.append(writer.getLogFile().getPath());
// create a block with // create a block with
@@ -989,7 +990,7 @@ public class HoodieLogFormatTest {
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
writer.close(); writer.close();
// Append some arbit byte[] to thee end of the log (mimics a partially written commit) // Append some arbit byte[] to the end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
outputStream = fs.append(writer.getLogFile().getPath()); outputStream = fs.append(writer.getLogFile().getPath());
// create a block with // create a block with
@@ -1005,18 +1006,20 @@ public class HoodieLogFormatTest {
writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100") .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100")
.withFs(fs).build(); .withFs(fs).build();
// Write 1 rollback block for a failed write // Write 1 rollback block for the last commit instant
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
writer = writer.appendBlock(commandBlock); writer = writer.appendBlock(commandBlock);
writer.close();
List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION,
"100").map(s -> s.getPath().toString()).collect(Collectors.toList()); "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema,
"101", 10240L, readBlocksLazily, false); "101", 10240L, readBlocksLazily, false, bufferSize);
assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords());
} }
@@ -1136,7 +1139,7 @@ public class HoodieLogFormatTest {
writer.close(); writer.close();
HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(),
readBlocksLazily, true); bufferSize, readBlocksLazily, true);
assertTrue("Last block should be available", reader.hasPrev()); assertTrue("Last block should be available", reader.hasPrev());
HoodieLogBlock prevBlock = reader.prev(); HoodieLogBlock prevBlock = reader.prev();
@@ -1207,7 +1210,8 @@ public class HoodieLogFormatTest {
writer.close(); writer.close();
// First round of reads - we should be able to read the first block and then EOF // First round of reads - we should be able to read the first block and then EOF
HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), schema, readBlocksLazily, true); HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), schema, bufferSize,
readBlocksLazily, true);
assertTrue("Last block should be available", reader.hasPrev()); assertTrue("Last block should be available", reader.hasPrev());
HoodieLogBlock block = reader.prev(); HoodieLogBlock block = reader.prev();
@@ -1261,7 +1265,7 @@ public class HoodieLogFormatTest {
writer.close(); writer.close();
HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(),
readBlocksLazily, true); bufferSize, readBlocksLazily, true);
assertTrue("Third block should be available", reader.hasPrev()); assertTrue("Third block should be available", reader.hasPrev());
reader.moveToPrev(); reader.moveToPrev();

View File

@@ -76,9 +76,14 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
// Depending on outputfile size and memory provided, choose true to avoid OOM for large file // Depending on outputfile size and memory provided, choose true to avoid OOM for large file
// size + small memory // size + small memory
public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP =
"compaction.lazy.block" + ".read.enabled"; "compaction.lazy.block.read.enabled";
public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true"; public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
// Property to set the max memory for dfs inputstream buffer size
public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
// Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper
public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1 * 1024 * 1024; // 1 MB
public static final Log LOG = LogFactory.getLog(HoodieRealtimeRecordReader.class); public static final Log LOG = LogFactory.getLog(HoodieRealtimeRecordReader.class);
private final HashMap<String, ArrayWritable> deltaRecordMap; private final HashMap<String, ArrayWritable> deltaRecordMap;
@@ -136,7 +141,7 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
.valueOf(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION)) .valueOf(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION))
* jobConf.getMemoryForMapTask()), Boolean.valueOf(jobConf * jobConf.getMemoryForMapTask()), Boolean.valueOf(jobConf
.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), .get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
false); false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using // but can return records for completed commits > the commit we are trying to read (if using
// readCommit() API) // readCommit() API)

View File

@@ -71,6 +71,7 @@ public class HoodieRealtimeRecordReaderTest {
@Before @Before
public void setUp() { public void setUp() {
jobConf = new JobConf(); jobConf = new JobConf();
jobConf.set(HoodieRealtimeRecordReader.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1 * 1024 * 1024));
hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath.getRoot().getAbsolutePath(), hadoopConf); fs = FSUtils.getFs(basePath.getRoot().getAbsolutePath(), hadoopConf);
} }