Minor fixes for MergeOnRead MVP release readiness
This commit is contained in:
committed by
vinoth chandar
parent
75df72f575
commit
93f345a032
@@ -42,7 +42,6 @@ import java.util.Deque;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.avro.Schema;
|
||||
@@ -98,7 +97,7 @@ public class HoodieCompactedLogRecordScanner implements
|
||||
|
||||
public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths,
|
||||
Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes,
|
||||
boolean readBlocksLazily, boolean reverseReader, int bufferSize) {
|
||||
boolean readBlocksLazily, boolean reverseReader, int bufferSize, String spillableMapBasePath) {
|
||||
this.readerSchema = readerSchema;
|
||||
this.latestInstantTime = latestInstantTime;
|
||||
this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
|
||||
@@ -109,7 +108,7 @@ public class HoodieCompactedLogRecordScanner implements
|
||||
|
||||
try {
|
||||
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
|
||||
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, Optional.empty(),
|
||||
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath,
|
||||
new StringConverter(), new HoodieRecordConverter(readerSchema, payloadClassFQN));
|
||||
// iterate over the paths
|
||||
HoodieLogFormatReader logFormatReaderWrapper =
|
||||
|
||||
@@ -22,12 +22,14 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.WriterBuilder;
|
||||
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.exception.HoodieException;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
@@ -70,35 +72,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
try {
|
||||
this.output = fs.append(path, bufferSize);
|
||||
} catch (RemoteException e) {
|
||||
if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) {
|
||||
// This issue happens when all replicas for a file are down and/or being decommissioned.
|
||||
// The fs.append() API could append to the last block for a file. If the last block is full, a new block is
|
||||
// appended to. In a scenario when a lot of DN's are decommissioned, it can happen that DN's holding all
|
||||
// replicas for a block/file are decommissioned together. During this process, all these blocks will start to
|
||||
// get replicated to other active DataNodes but this process might take time (can be of the order of few
|
||||
// hours). During this time, if a fs.append() API is invoked for a file whose last block is eligible to be
|
||||
// appended to, then the NN will throw an exception saying that it couldn't find any active replica with the
|
||||
// last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325
|
||||
log.warn("Failed to open an append stream to the log file. Opening a new log file..", e);
|
||||
createNewFile();
|
||||
}
|
||||
// this happens when either another task executor writing to this file died or
|
||||
// data node is going down
|
||||
if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName())
|
||||
&& fs instanceof DistributedFileSystem) {
|
||||
log.warn("Trying to recover log on path " + path);
|
||||
if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
|
||||
log.warn("Recovered lease on path " + path);
|
||||
// try again
|
||||
this.output = fs.append(path, bufferSize);
|
||||
} else {
|
||||
log.warn("Failed to recover lease on path " + path);
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
}
|
||||
handleAppendExceptionOrRecoverLease(path, e);
|
||||
} catch (IOException ioe) {
|
||||
if (ioe.getMessage().equalsIgnoreCase("Not supported")) {
|
||||
log.info("Append not supported. Opening a new log file..");
|
||||
this.logFile = logFile.rollOver(fs);
|
||||
createNewFile();
|
||||
} else {
|
||||
throw ioe;
|
||||
@@ -107,8 +85,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
} else {
|
||||
log.info(logFile + " does not exist. Create a new file");
|
||||
// Block size does not matter as we will always manually autoflush
|
||||
this.output = fs.create(path, false, bufferSize, replication,
|
||||
WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
|
||||
createNewFile();
|
||||
// TODO - append a file level meta block
|
||||
}
|
||||
}
|
||||
@@ -204,7 +181,6 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
}
|
||||
|
||||
private void createNewFile() throws IOException {
|
||||
this.logFile = logFile.rollOver(fs);
|
||||
this.output = fs.create(this.logFile.getPath(), false, bufferSize, replication,
|
||||
WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
|
||||
}
|
||||
@@ -221,7 +197,9 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
return; // Presume closed
|
||||
}
|
||||
output.flush();
|
||||
output.hflush();
|
||||
// NOTE : the following API call makes sure that the data is flushed to disk on DataNodes (akin to POSIX fsync())
|
||||
// See more details here : https://issues.apache.org/jira/browse/HDFS-744
|
||||
output.hsync();
|
||||
}
|
||||
|
||||
public long getCurrentSize() throws IOException {
|
||||
@@ -232,4 +210,38 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
return output.getPos();
|
||||
}
|
||||
|
||||
private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e) throws IOException,
|
||||
InterruptedException {
|
||||
if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) {
|
||||
// This issue happens when all replicas for a file are down and/or being decommissioned.
|
||||
// The fs.append() API could append to the last block for a file. If the last block is full, a new block is
|
||||
// appended to. In a scenario when a lot of DN's are decommissioned, it can happen that DN's holding all
|
||||
// replicas for a block/file are decommissioned together. During this process, all these blocks will start to
|
||||
// get replicated to other active DataNodes but this process might take time (can be of the order of few
|
||||
// hours). During this time, if a fs.append() API is invoked for a file whose last block is eligible to be
|
||||
// appended to, then the NN will throw an exception saying that it couldn't find any active replica with the
|
||||
// last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325
|
||||
log.warn("Failed to open an append stream to the log file. Opening a new log file..", e);
|
||||
// Rollover the current log file (since cannot get a stream handle) and create new one
|
||||
this.logFile = logFile.rollOver(fs);
|
||||
createNewFile();
|
||||
} else if ((e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) || e.getClassName()
|
||||
.contentEquals(RecoveryInProgressException.class.getName())) && (fs instanceof DistributedFileSystem)) {
|
||||
// this happens when either another task executor writing to this file died or
|
||||
// data node is going down. Note that we can only try to recover lease for a DistributedFileSystem.
|
||||
// ViewFileSystem unfortunately does not support this operation
|
||||
log.warn("Trying to recover log on path " + path);
|
||||
if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
|
||||
log.warn("Recovered lease on path " + path);
|
||||
// try again
|
||||
this.output = fs.append(path, bufferSize);
|
||||
} else {
|
||||
log.warn("Failed to recover lease on path " + path);
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
} else {
|
||||
throw new HoodieIOException("Failed to open an append stream ", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -34,7 +34,6 @@ import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
@@ -49,8 +48,6 @@ import org.apache.log4j.Logger;
|
||||
public final class DiskBasedMap<T, R> implements Map<T, R> {
|
||||
|
||||
private static final Logger log = LogManager.getLogger(DiskBasedMap.class);
|
||||
// Default file path prefix to put the spillable file
|
||||
private static String DEFAULT_BASE_FILE_PATH = "/tmp/";
|
||||
// Stores the key and corresponding value's latest metadata spilled to disk
|
||||
private final Map<T, ValueMetadata> valueMetadataMap;
|
||||
// Key converter to convert key type to bytes
|
||||
@@ -70,17 +67,12 @@ public final class DiskBasedMap<T, R> implements Map<T, R> {
|
||||
private String filePath;
|
||||
|
||||
|
||||
protected DiskBasedMap(Optional<String> baseFilePath,
|
||||
protected DiskBasedMap(String baseFilePath,
|
||||
Converter<T> keyConverter, Converter<R> valueConverter) throws IOException {
|
||||
this.valueMetadataMap = new HashMap<>();
|
||||
|
||||
if (!baseFilePath.isPresent()) {
|
||||
baseFilePath = Optional.of(DEFAULT_BASE_FILE_PATH);
|
||||
}
|
||||
this.filePath = baseFilePath.get() + UUID.randomUUID().toString();
|
||||
File writeOnlyFileHandle = new File(filePath);
|
||||
File writeOnlyFileHandle = new File(baseFilePath, UUID.randomUUID().toString());
|
||||
this.filePath = writeOnlyFileHandle.getPath();
|
||||
initFile(writeOnlyFileHandle);
|
||||
|
||||
this.fileOutputStream = new FileOutputStream(writeOnlyFileHandle, true);
|
||||
this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream);
|
||||
this.filePosition = new AtomicLong(0L);
|
||||
@@ -93,8 +85,10 @@ public final class DiskBasedMap<T, R> implements Map<T, R> {
|
||||
if (writeOnlyFileHandle.exists()) {
|
||||
writeOnlyFileHandle.delete();
|
||||
}
|
||||
if (!writeOnlyFileHandle.getParentFile().exists()) {
|
||||
writeOnlyFileHandle.getParentFile().mkdir();
|
||||
}
|
||||
writeOnlyFileHandle.createNewFile();
|
||||
|
||||
log.info(
|
||||
"Spilling to file location " + writeOnlyFileHandle.getAbsolutePath() + " in host (" + InetAddress.getLocalHost()
|
||||
.getHostAddress() + ") with hostname (" + InetAddress.getLocalHost().getHostName() + ")");
|
||||
|
||||
@@ -25,7 +25,6 @@ import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
@@ -64,7 +63,7 @@ public class ExternalSpillableMap<T, R> implements Map<T, R> {
|
||||
// Flag to determine whether to stop re-estimating payload size
|
||||
private boolean shouldEstimatePayloadSize = true;
|
||||
|
||||
public ExternalSpillableMap(Long maxInMemorySizeInBytes, Optional<String> baseFilePath,
|
||||
public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath,
|
||||
Converter<T> keyConverter, Converter<R> valueConverter) throws IOException {
|
||||
this.inMemoryMap = new HashMap<>();
|
||||
this.diskBasedMap = new DiskBasedMap<>(baseFilePath, keyConverter, valueConverter);
|
||||
|
||||
Reference in New Issue
Block a user