[HUDI-3341] Fix log file reader for S3 with hadoop-aws 2.7.x (#4897)
This commit is contained in:
@@ -18,13 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.table.log;
|
package org.apache.hudi.common.table.log;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.BufferedFSInputStream;
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FSInputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
|
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
|
||||||
import org.apache.hudi.common.fs.TimedFSDataInputStream;
|
import org.apache.hudi.common.fs.TimedFSDataInputStream;
|
||||||
@@ -43,10 +36,19 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.exception.CorruptedLogFileException;
|
import org.apache.hudi.exception.CorruptedLogFileException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.BufferedFSInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@@ -254,8 +256,17 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
|
|
||||||
private boolean isBlockCorrupted(int blocksize) throws IOException {
|
private boolean isBlockCorrupted(int blocksize) throws IOException {
|
||||||
long currentPos = inputStream.getPos();
|
long currentPos = inputStream.getPos();
|
||||||
|
long blockSizeFromFooter;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
inputStream.seek(currentPos + blocksize);
|
// check if the blocksize mentioned in the footer is the same as the header;
|
||||||
|
// by seeking and checking the length of a long. We do not seek `currentPos + blocksize`
|
||||||
|
// which can be the file size for the last block in the file, causing EOFException
|
||||||
|
// for some FSDataInputStream implementation
|
||||||
|
inputStream.seek(currentPos + blocksize - Long.BYTES);
|
||||||
|
// Block size in the footer includes the magic header, which the header does not include.
|
||||||
|
// So we have to shorten the footer block size by the size of magic hash
|
||||||
|
blockSizeFromFooter = inputStream.readLong() - magicBuffer.length;
|
||||||
} catch (EOFException e) {
|
} catch (EOFException e) {
|
||||||
LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
|
LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
|
||||||
// this is corrupt
|
// this is corrupt
|
||||||
@@ -266,19 +277,13 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if the blocksize mentioned in the footer is the same as the header; by seeking back the length of a long
|
|
||||||
// the backward seek does not incur additional IO as {@link org.apache.hadoop.hdfs.DFSInputStream#seek()}
|
|
||||||
// only moves the index. actual IO happens on the next read operation
|
|
||||||
inputStream.seek(inputStream.getPos() - Long.BYTES);
|
|
||||||
// Block size in the footer includes the magic header, which the header does not include.
|
|
||||||
// So we have to shorten the footer block size by the size of magic hash
|
|
||||||
long blockSizeFromFooter = inputStream.readLong() - magicBuffer.length;
|
|
||||||
if (blocksize != blockSizeFromFooter) {
|
if (blocksize != blockSizeFromFooter) {
|
||||||
LOG.info("Found corrupted block in file " + logFile + ". Header block size(" + blocksize
|
LOG.info("Found corrupted block in file " + logFile + ". Header block size(" + blocksize
|
||||||
+ ") did not match the footer block size(" + blockSizeFromFooter + ")");
|
+ ") did not match the footer block size(" + blockSizeFromFooter + ")");
|
||||||
inputStream.seek(currentPos);
|
inputStream.seek(currentPos);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
readMagic();
|
readMagic();
|
||||||
// all good - either we found the sync marker or EOF. Reset position and continue
|
// all good - either we found the sync marker or EOF. Reset position and continue
|
||||||
|
|||||||
Reference in New Issue
Block a user