1
0

HUDI-140 : GCS: Log File Reading not working due to difference in seek() behavior for EOF

This commit is contained in:
Balaji Varadarajan
2019-07-15 17:29:34 -07:00
committed by Balaji Varadarajan
parent 9857c4b21c
commit 0b451b3a58
3 changed files with 39 additions and 4 deletions

View File

@@ -27,6 +27,7 @@ import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.CorruptedLogFileException; import com.uber.hoodie.exception.CorruptedLogFileException;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException; import com.uber.hoodie.exception.HoodieNotSupportedException;
@@ -234,7 +235,11 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
private boolean isBlockCorrupt(int blocksize) throws IOException { private boolean isBlockCorrupt(int blocksize) throws IOException {
long currentPos = inputStream.getPos(); long currentPos = inputStream.getPos();
try { try {
inputStream.seek(currentPos + blocksize); if (FSUtils.isGCSInputStream(inputStream)) {
inputStream.seek(currentPos + blocksize - 1);
} else {
inputStream.seek(currentPos + blocksize);
}
} catch (EOFException e) { } catch (EOFException e) {
// this is corrupt // this is corrupt
// This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream // This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream

View File

@@ -21,6 +21,7 @@ package com.uber.hoodie.common.table.log.block;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner; import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@@ -233,7 +234,7 @@ public abstract class HoodieLogBlock {
inputStream.readFully(content, 0, contentLength); inputStream.readFully(content, 0, contentLength);
} else { } else {
// Seek to the end of the content block // Seek to the end of the content block
inputStream.seek(inputStream.getPos() + contentLength); safeSeek(inputStream, inputStream.getPos() + contentLength);
} }
return content; return content;
} }
@@ -245,9 +246,9 @@ public abstract class HoodieLogBlock {
try { try {
content = Optional.of(new byte[(int) this.getBlockContentLocation().get().getBlockSize()]); content = Optional.of(new byte[(int) this.getBlockContentLocation().get().getBlockSize()]);
inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile()); safeSeek(inputStream, this.getBlockContentLocation().get().getContentPositionInLogFile());
inputStream.readFully(content.get(), 0, content.get().length); inputStream.readFully(content.get(), 0, content.get().length);
inputStream.seek(this.getBlockContentLocation().get().getBlockEndPos()); safeSeek(inputStream, this.getBlockContentLocation().get().getBlockEndPos());
} catch (IOException e) { } catch (IOException e) {
try { try {
// TODO : fs.open() and return inputstream again, need to pass FS configuration // TODO : fs.open() and return inputstream again, need to pass FS configuration
@@ -268,4 +269,21 @@ public abstract class HoodieLogBlock {
content = Optional.empty(); content = Optional.empty();
} }
/**
* Handles difference in seek behavior for GCS and non-GCS input stream
* @param inputStream Input Stream
* @param pos Position to seek
* @throws IOException
*/
private static void safeSeek(FSDataInputStream inputStream, long pos) throws IOException {
try {
inputStream.seek(pos);
} catch (EOFException e) {
if (FSUtils.isGCSInputStream(inputStream)) {
inputStream.seek(pos - 1);
} else {
throw e;
}
}
}
} }

View File

@@ -45,6 +45,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
@@ -552,4 +553,15 @@ public class FSUtils {
return ((partitionPath == null) || (partitionPath.isEmpty())) ? basePath : return ((partitionPath == null) || (partitionPath.isEmpty())) ? basePath :
new Path(basePath, partitionPath); new Path(basePath, partitionPath);
} }
/**
* This is due to HUDI-140 GCS has a different behavior for detecting EOF during seek().
* @param inputStream FSDataInputStream
* @return true if the inputstream or the wrapped one is of type GoogleHadoopFSInputStream
*/
public static boolean isGCSInputStream(FSDataInputStream inputStream) {
return inputStream.getClass().getCanonicalName().equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream")
|| inputStream.getWrappedStream().getClass().getCanonicalName()
.equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream");
}
} }