diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java index b22d45d72..7115b67a2 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java @@ -82,6 +82,7 @@ public class HoodieLogFileCommand implements CommandMarker { commitCountAndMetadata = Maps.newHashMap(); int totalEntries = 0; int numCorruptBlocks = 0; + int dummyInstantTimeCount = 0; for (String logFilePath : logFilePaths) { FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath)); @@ -108,6 +109,11 @@ public class HoodieLogFileCommand implements CommandMarker { } } else { instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); + if (instantTime == null) { + // This can happen when reading archived commit files since they were written without any instant time + dummyInstantTimeCount++; + instantTime = "dummy_instant_time_" + dummyInstantTimeCount; + } if (n instanceof HoodieAvroDataBlock) { recordCount = ((HoodieAvroDataBlock) n).getRecords().size(); } @@ -188,7 +194,8 @@ public class HoodieLogFileCommand implements CommandMarker { Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES), Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), - Integer.valueOf(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); + Integer.valueOf(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), + HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); for (HoodieRecord hoodieRecord : scanner) { Optional record = hoodieRecord.getData().getInsertValue(readerSchema); if (allRecords.size() >= limit) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java index 3fd6d6c4e..6efa70b0d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java @@ -48,7 +48,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { // Property to set the max memory for dfs inputstream buffer size public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size"; public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 16 * 1024 * 1024; // 16MB - + public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path"; + // Default file path prefix for spillable file + public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/"; private HoodieMemoryConfig(Properties props) { super(props); @@ -77,13 +79,13 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { return this; } - public Builder withMaxMemoryFractionPerPartitionMerge(long maxMemoryFractionPerPartitionMerge) { + public Builder withMaxMemoryFractionPerPartitionMerge(double maxMemoryFractionPerPartitionMerge) { props.setProperty(MAX_MEMORY_FRACTION_FOR_MERGE_PROP, String.valueOf(maxMemoryFractionPerPartitionMerge)); return this; } - public Builder withMaxMemoryFractionPerCompaction(long maxMemoryFractionPerCompaction) { + public Builder withMaxMemoryFractionPerCompaction(double maxMemoryFractionPerCompaction) { props.setProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, String.valueOf(maxMemoryFractionPerCompaction)); return this; @@ -155,6 +157,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(MAX_DFS_STREAM_BUFFER_SIZE_PROP), MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); + setDefaultOnCondition(props, + !props.containsKey(SPILLABLE_MAP_BASE_PATH_PROP), + SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH); return config; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 633e5695b..e0fe99379 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -375,6 +375,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP)); } + public String getSpillableMapBasePath() { + return props.getProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP); + } + public static class Builder { private final Properties props = new Properties(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index d5c6a90de..8e28b8d98 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -141,8 +141,9 @@ public class HoodieMergeHandle extends HoodieIOHa try { // Load the new records in a map logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge()); - this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(), Optional.empty(), - new StringConverter(), new HoodieRecordConverter(schema, config.getPayloadClass())); + this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(), + config.getSpillableMapBasePath(), new StringConverter(), + new HoodieRecordConverter(schema, config.getPayloadClass())); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 8983605c5..cbf06d936 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -118,7 +118,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, metaClient.getBasePath(), operation.getDeltaFilePaths(), readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), - config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize()); + config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), + config.getSpillableMapBasePath()); if (!scanner.iterator().hasNext()) { return Lists.newArrayList(); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java index 6e9699442..86da05b73 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java @@ -42,7 +42,6 @@ import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.avro.Schema; @@ -98,7 +97,7 @@ public class HoodieCompactedLogRecordScanner implements public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, - boolean readBlocksLazily, boolean reverseReader, int bufferSize) { + boolean readBlocksLazily, boolean reverseReader, int bufferSize, String spillableMapBasePath) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), basePath); @@ -109,7 +108,7 @@ public class HoodieCompactedLogRecordScanner implements try { // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize - this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, Optional.empty(), + this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new StringConverter(), new HoodieRecordConverter(readerSchema, payloadClassFQN)); // iterate over the paths HoodieLogFormatReader logFormatReaderWrapper = diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java index fa17711ba..e0644d69f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java @@ -22,12 +22,14 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.WriterBuilder; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.ipc.RemoteException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -70,35 +72,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { try { this.output = fs.append(path, bufferSize); } catch (RemoteException e) { - if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) { - // This issue happens when all replicas for a file are down and/or being decommissioned. - // The fs.append() API could append to the last block for a file. If the last block is full, a new block is - // appended to. In a scenario when a lot of DN's are decommissioned, it can happen that DN's holding all - // replicas for a block/file are decommissioned together. During this process, all these blocks will start to - // get replicated to other active DataNodes but this process might take time (can be of the order of few - // hours). During this time, if a fs.append() API is invoked for a file whose last block is eligible to be - // appended to, then the NN will throw an exception saying that it couldn't find any active replica with the - // last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325 - log.warn("Failed to open an append stream to the log file. Opening a new log file..", e); - createNewFile(); - } - // this happens when either another task executor writing to this file died or - // data node is going down - if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName()) - && fs instanceof DistributedFileSystem) { - log.warn("Trying to recover log on path " + path); - if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) { - log.warn("Recovered lease on path " + path); - // try again - this.output = fs.append(path, bufferSize); - } else { - log.warn("Failed to recover lease on path " + path); - throw new HoodieException(e); - } - } + handleAppendExceptionOrRecoverLease(path, e); } catch (IOException ioe) { if (ioe.getMessage().equalsIgnoreCase("Not supported")) { log.info("Append not supported. Opening a new log file.."); + this.logFile = logFile.rollOver(fs); createNewFile(); } else { throw ioe; @@ -107,8 +85,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { } else { log.info(logFile + " does not exist. Create a new file"); // Block size does not matter as we will always manually autoflush - this.output = fs.create(path, false, bufferSize, replication, - WriterBuilder.DEFAULT_SIZE_THRESHOLD, null); + createNewFile(); // TODO - append a file level meta block } } @@ -204,7 +181,6 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { } private void createNewFile() throws IOException { - this.logFile = logFile.rollOver(fs); this.output = fs.create(this.logFile.getPath(), false, bufferSize, replication, WriterBuilder.DEFAULT_SIZE_THRESHOLD, null); } @@ -221,7 +197,9 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { return; // Presume closed } output.flush(); - output.hflush(); + // NOTE : the following API call makes sure that the data is flushed to disk on DataNodes (akin to POSIX fsync()) + // See more details here : https://issues.apache.org/jira/browse/HDFS-744 + output.hsync(); } public long getCurrentSize() throws IOException { @@ -232,4 +210,38 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { return output.getPos(); } + private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e) throws IOException, + InterruptedException { + if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) { + // This issue happens when all replicas for a file are down and/or being decommissioned. + // The fs.append() API could append to the last block for a file. If the last block is full, a new block is + // appended to. In a scenario when a lot of DN's are decommissioned, it can happen that DN's holding all + // replicas for a block/file are decommissioned together. During this process, all these blocks will start to + // get replicated to other active DataNodes but this process might take time (can be of the order of few + // hours). During this time, if a fs.append() API is invoked for a file whose last block is eligible to be + // appended to, then the NN will throw an exception saying that it couldn't find any active replica with the + // last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325 + log.warn("Failed to open an append stream to the log file. Opening a new log file..", e); + // Rollover the current log file (since cannot get a stream handle) and create new one + this.logFile = logFile.rollOver(fs); + createNewFile(); + } else if ((e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) || e.getClassName() + .contentEquals(RecoveryInProgressException.class.getName())) && (fs instanceof DistributedFileSystem)) { + // this happens when either another task executor writing to this file died or + // data node is going down. Note that we can only try to recover lease for a DistributedFileSystem. + // ViewFileSystem unfortunately does not support this operation + log.warn("Trying to recover log on path " + path); + if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) { + log.warn("Recovered lease on path " + path); + // try again + this.output = fs.append(path, bufferSize); + } else { + log.warn("Failed to recover lease on path " + path); + throw new HoodieException(e); + } + } else { + throw new HoodieIOException("Failed to open an append stream ", e); + } + } + } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java index 7ae620ef6..598025471 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java @@ -34,7 +34,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -49,8 +48,6 @@ import org.apache.log4j.Logger; public final class DiskBasedMap implements Map { private static final Logger log = LogManager.getLogger(DiskBasedMap.class); - // Default file path prefix to put the spillable file - private static String DEFAULT_BASE_FILE_PATH = "/tmp/"; // Stores the key and corresponding value's latest metadata spilled to disk private final Map valueMetadataMap; // Key converter to convert key type to bytes @@ -70,17 +67,12 @@ public final class DiskBasedMap implements Map { private String filePath; - protected DiskBasedMap(Optional baseFilePath, + protected DiskBasedMap(String baseFilePath, Converter keyConverter, Converter valueConverter) throws IOException { this.valueMetadataMap = new HashMap<>(); - - if (!baseFilePath.isPresent()) { - baseFilePath = Optional.of(DEFAULT_BASE_FILE_PATH); - } - this.filePath = baseFilePath.get() + UUID.randomUUID().toString(); - File writeOnlyFileHandle = new File(filePath); + File writeOnlyFileHandle = new File(baseFilePath, UUID.randomUUID().toString()); + this.filePath = writeOnlyFileHandle.getPath(); initFile(writeOnlyFileHandle); - this.fileOutputStream = new FileOutputStream(writeOnlyFileHandle, true); this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream); this.filePosition = new AtomicLong(0L); @@ -93,8 +85,10 @@ public final class DiskBasedMap implements Map { if (writeOnlyFileHandle.exists()) { writeOnlyFileHandle.delete(); } + if (!writeOnlyFileHandle.getParentFile().exists()) { + writeOnlyFileHandle.getParentFile().mkdir(); + } writeOnlyFileHandle.createNewFile(); - log.info( "Spilling to file location " + writeOnlyFileHandle.getAbsolutePath() + " in host (" + InetAddress.getLocalHost() .getHostAddress() + ") with hostname (" + InetAddress.getLocalHost().getHostName() + ")"); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java index b3ff515c0..e061bf444 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.Optional; import java.util.Set; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -64,7 +63,7 @@ public class ExternalSpillableMap implements Map { // Flag to determine whether to stop re-estimating payload size private boolean shouldEstimatePayloadSize = true; - public ExternalSpillableMap(Long maxInMemorySizeInBytes, Optional baseFilePath, + public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, Converter keyConverter, Converter valueConverter) throws IOException { this.inMemoryMap = new HashMap<>(); this.diskBasedMap = new DiskBasedMap<>(baseFilePath, keyConverter, valueConverter); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index 93eed4279..192e6a51d 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -77,6 +77,7 @@ public class HoodieLogFormatTest { private Path partitionPath; private static String basePath; private int bufferSize = 4096; + private static final String BASE_OUTPUT_PATH = "/tmp/"; private Boolean readBlocksLazily = true; @@ -401,7 +402,7 @@ public class HoodieLogFormatTest { // scan all log blocks (across multiple log files) HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()), schema, "100", - 10240L, readBlocksLazily, false, bufferSize); + 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); List scannedRecords = new ArrayList<>(); for (HoodieRecord record : scanner) { @@ -527,7 +528,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "100", 10240L, readBlocksLazily, false, bufferSize); + "100", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); assertEquals("", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -587,7 +588,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "102", 10240L, readBlocksLazily, false, bufferSize); + "102", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); assertEquals("We read 200 records from 2 write batches", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -665,7 +666,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "103", 10240L, true, false, bufferSize); + "103", 10240L, true, false, bufferSize, BASE_OUTPUT_PATH); assertEquals("We would read 200 records", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -719,7 +720,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "102", 10240L, readBlocksLazily, false, bufferSize); + "102", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); assertEquals("We still would read 200 records", 200, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -739,7 +740,7 @@ public class HoodieLogFormatTest { readKeys.clear(); scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", 10240L, readBlocksLazily, - false, bufferSize); + false, bufferSize, BASE_OUTPUT_PATH); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records after rollback of delete", 200, readKeys.size()); } @@ -800,7 +801,7 @@ public class HoodieLogFormatTest { // all data must be rolled back before merge HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "100", 10240L, readBlocksLazily, false, bufferSize); + "100", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); assertEquals("We would have scanned 0 records because of rollback", 0, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(); @@ -849,7 +850,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "100", 10240L, readBlocksLazily, false, bufferSize); + "100", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } @@ -881,7 +882,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "100", 10240L, readBlocksLazily, false, bufferSize); + "100", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); assertEquals("We still would read 100 records", 100, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(100); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -931,7 +932,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "101", 10240L, readBlocksLazily, false, bufferSize); + "101", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } @@ -1019,7 +1020,7 @@ public class HoodieLogFormatTest { "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, - "101", 10240L, readBlocksLazily, false, bufferSize); + "101", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java index ee2c77b5e..fdf6ba7cd 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java @@ -51,12 +51,14 @@ import org.junit.Test; public class TestDiskBasedMap { + private static final String BASE_OUTPUT_PATH = "/tmp/"; + @Test public void testSimpleInsert() throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); String payloadClazz = HoodieAvroPayload.class.getName(); - DiskBasedMap records = new DiskBasedMap<>(Optional.empty(), + DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH, new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); @@ -78,7 +80,7 @@ public class TestDiskBasedMap { Schema schema = getSimpleSchema(); String payloadClazz = HoodieAvroPayload.class.getName(); - DiskBasedMap records = new DiskBasedMap<>(Optional.empty(), + DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH, new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); List hoodieRecords = SchemaTestUtil .generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000); @@ -105,7 +107,7 @@ public class TestDiskBasedMap { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); String payloadClazz = HoodieAvroPayload.class.getName(); - DiskBasedMap records = new DiskBasedMap<>(Optional.empty(), + DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH, new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); @@ -191,9 +193,7 @@ public class TestDiskBasedMap { } /** - * vb - Disabled this test after talking to Nishanth as this relies on timing and sometimes fails in my laptop. - * This specific test sometime takes more than 100 ms (In one case, saw 122 ms), - * @na: TODO: Please check if this can be removed. + * @na: Leaving this test here for a quick performance test */ @Ignore @Test diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java index bb385dc1a..419cd1b41 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java @@ -30,6 +30,7 @@ import com.uber.hoodie.common.util.SchemaTestUtil; import com.uber.hoodie.common.util.SpillableMapTestUtils; import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter; import com.uber.hoodie.common.util.collection.converter.StringConverter; +import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URISyntaxException; @@ -40,6 +41,7 @@ import java.util.Optional; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.junit.BeforeClass; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; @@ -48,13 +50,22 @@ import org.junit.runners.MethodSorters; public class TestExternalSpillableMap { private static final String FAILURE_OUTPUT_PATH = "/tmp/test_fail"; + private static final String BASE_OUTPUT_PATH = "/tmp/"; + + @BeforeClass + public static void cleanUp() { + File file = new File(BASE_OUTPUT_PATH); + file.delete(); + file = new File(FAILURE_OUTPUT_PATH); + file.delete(); + } @Test public void simpleInsertTest() throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, Optional.empty(), new StringConverter(), + new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); //16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); @@ -76,7 +87,7 @@ public class TestExternalSpillableMap { String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, Optional.of(FAILURE_OUTPUT_PATH), new StringConverter(), + new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); //16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); @@ -114,7 +125,7 @@ public class TestExternalSpillableMap { String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, Optional.empty(), new StringConverter(), + new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); //16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); @@ -169,7 +180,7 @@ public class TestExternalSpillableMap { String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, Optional.of(FAILURE_OUTPUT_PATH), new StringConverter(), + new ExternalSpillableMap<>(16L, FAILURE_OUTPUT_PATH, new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); //16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); @@ -188,7 +199,7 @@ public class TestExternalSpillableMap { String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, Optional.of(FAILURE_OUTPUT_PATH), new StringConverter(), + new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); //16B List recordKeys = new ArrayList<>(); @@ -241,7 +252,7 @@ public class TestExternalSpillableMap { String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, Optional.of(FAILURE_OUTPUT_PATH), new StringConverter(), + new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); //16B List recordKeys = new ArrayList<>(); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java index f96060b21..163cbe8f8 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -83,6 +83,10 @@ public class HoodieRealtimeRecordReader implements RecordReader the commit we are trying to read (if using // readCommit() API)