From 0b451b3a58cabe25c0cecd3fd8847a8597e2313a Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Mon, 15 Jul 2019 17:29:34 -0700 Subject: [PATCH] HUDI-140 : GCS: Log File Reading not working due to difference in seek() behavior for EOF --- .../common/table/log/HoodieLogFileReader.java | 7 +++++- .../table/log/block/HoodieLogBlock.java | 24 ++++++++++++++++--- .../com/uber/hoodie/common/util/FSUtils.java | 12 ++++++++++ 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java index 70afe2208..21ffc6040 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java @@ -27,6 +27,7 @@ import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; +import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.CorruptedLogFileException; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieNotSupportedException; @@ -234,7 +235,11 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { private boolean isBlockCorrupt(int blocksize) throws IOException { long currentPos = inputStream.getPos(); try { - inputStream.seek(currentPos + blocksize); + if (FSUtils.isGCSInputStream(inputStream)) { + inputStream.seek(currentPos + blocksize - 1); + } else { + inputStream.seek(currentPos + blocksize); + } } catch (EOFException e) { // this is corrupt // This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java index 011c83b6e..ebd978a7c 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java @@ -21,6 +21,7 @@ package com.uber.hoodie.common.table.log.block; import com.google.common.collect.Maps; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner; +import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import java.io.ByteArrayOutputStream; @@ -233,7 +234,7 @@ public abstract class HoodieLogBlock { inputStream.readFully(content, 0, contentLength); } else { // Seek to the end of the content block - inputStream.seek(inputStream.getPos() + contentLength); + safeSeek(inputStream, inputStream.getPos() + contentLength); } return content; } @@ -245,9 +246,9 @@ public abstract class HoodieLogBlock { try { content = Optional.of(new byte[(int) this.getBlockContentLocation().get().getBlockSize()]); - inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile()); + safeSeek(inputStream, this.getBlockContentLocation().get().getContentPositionInLogFile()); inputStream.readFully(content.get(), 0, content.get().length); - inputStream.seek(this.getBlockContentLocation().get().getBlockEndPos()); + safeSeek(inputStream, this.getBlockContentLocation().get().getBlockEndPos()); } catch (IOException e) { try { // TODO : fs.open() and return inputstream again, need to pass FS configuration @@ -268,4 +269,21 @@ public abstract class HoodieLogBlock { content = Optional.empty(); } + /** + * Handles difference in seek behavior for GCS and non-GCS input stream + * @param inputStream Input Stream + * @param pos Position to seek + * @throws IOException + */ + private static void safeSeek(FSDataInputStream inputStream, long pos) throws IOException { + try { + inputStream.seek(pos); + } catch (EOFException e) { + if (FSUtils.isGCSInputStream(inputStream)) { + inputStream.seek(pos - 1); + } else { + throw e; + } + } + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index d9069a156..0d67d034c 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -45,6 +45,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -552,4 +553,15 @@ public class FSUtils { return ((partitionPath == null) || (partitionPath.isEmpty())) ? basePath : new Path(basePath, partitionPath); } + + /** + * This is due to HUDI-140 GCS has a different behavior for detecting EOF during seek(). + * @param inputStream FSDataInputStream + * @return true if the inputstream or the wrapped one is of type GoogleHadoopFSInputStream + */ + public static boolean isGCSInputStream(FSDataInputStream inputStream) { + return inputStream.getClass().getCanonicalName().equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream") + || inputStream.getWrappedStream().getClass().getCanonicalName() + .equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream"); + } }