[HUDI-1532] Fixed suboptimal implementation of a magic sequence search (#2440)
* Fixed suboptimal implementation of a magic sequence search on GCS. * Fix comparison. * Added buffered reader around plugged storage plugin such as GCS. * 1. Corrected some comments 2. Refactored GCS input stream check Co-authored-by: volodymyr.burenin <volodymyr.burenin@cloudkitchens.com> Co-authored-by: Nishith Agarwal <nagarwal@uber.com>
This commit is contained in:
committed by
GitHub
parent
684e12e9fc
commit
a38612b10f
@@ -18,6 +18,15 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.fs;
|
package org.apache.hudi.common.fs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
@@ -31,16 +40,6 @@ import org.apache.hudi.exception.HoodieException;
|
|||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.exception.InvalidHoodiePathException;
|
import org.apache.hudi.exception.InvalidHoodiePathException;
|
||||||
import org.apache.hudi.metadata.HoodieTableMetadata;
|
import org.apache.hudi.metadata.HoodieTableMetadata;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.PathFilter;
|
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -50,11 +49,10 @@ import java.io.IOException;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
|
|||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSInputStream;
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -57,6 +58,7 @@ import java.util.Objects;
|
|||||||
public class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
public class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
||||||
|
|
||||||
public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
|
public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
|
||||||
|
private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
|
||||||
private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
|
private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
|
||||||
|
|
||||||
private final FSDataInputStream inputStream;
|
private final FSDataInputStream inputStream;
|
||||||
@@ -71,9 +73,13 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
private transient Thread shutdownThread = null;
|
private transient Thread shutdownThread = null;
|
||||||
|
|
||||||
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
|
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
|
||||||
boolean readBlockLazily, boolean reverseReader) throws IOException {
|
boolean readBlockLazily, boolean reverseReader) throws IOException {
|
||||||
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
|
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
|
||||||
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
|
if (FSUtils.isGCSInputStream(fsDataInputStream)) {
|
||||||
|
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
|
||||||
|
new BufferedFSInputStream((FSInputStream) ((
|
||||||
|
(FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
|
||||||
|
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
|
||||||
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
|
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
|
||||||
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
|
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
|
||||||
} else {
|
} else {
|
||||||
@@ -274,19 +280,25 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private long scanForNextAvailableBlockOffset() throws IOException {
|
private long scanForNextAvailableBlockOffset() throws IOException {
|
||||||
|
// Make buffer large enough to scan through the file as quick as possible especially if it is on S3/GCS.
|
||||||
|
byte[] dataBuf = new byte[BLOCK_SCAN_READ_BUFFER_SIZE];
|
||||||
|
boolean eof = false;
|
||||||
while (true) {
|
while (true) {
|
||||||
long currentPos = inputStream.getPos();
|
long currentPos = inputStream.getPos();
|
||||||
try {
|
try {
|
||||||
boolean hasNextMagic = hasNextMagic();
|
Arrays.fill(dataBuf, (byte) 0);
|
||||||
if (hasNextMagic) {
|
inputStream.readFully(dataBuf, 0, dataBuf.length);
|
||||||
return currentPos;
|
|
||||||
} else {
|
|
||||||
// No luck - advance and try again
|
|
||||||
inputStream.seek(currentPos + 1);
|
|
||||||
}
|
|
||||||
} catch (EOFException e) {
|
} catch (EOFException e) {
|
||||||
|
eof = true;
|
||||||
|
}
|
||||||
|
long pos = Bytes.indexOf(dataBuf, HoodieLogFormat.MAGIC);
|
||||||
|
if (pos >= 0) {
|
||||||
|
return currentPos + pos;
|
||||||
|
}
|
||||||
|
if (eof) {
|
||||||
return inputStream.getPos();
|
return inputStream.getPos();
|
||||||
}
|
}
|
||||||
|
inputStream.seek(currentPos + dataBuf.length - HoodieLogFormat.MAGIC.length);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user