1
0

[HUDI-1496] Fixing input stream detection of GCS FileSystem (#2500)

* Adding SchemeAwareFSDataInputStream for abstract out special handling for GCSFileSystem
* Moving wrapping of fsDataInputStream to separate method in HoodieLogFileReader

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Sivabalan Narayanan
2021-03-14 04:57:57 -04:00
committed by GitHub
parent f5e31be086
commit e93c6a5693
4 changed files with 112 additions and 48 deletions

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.common.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -542,14 +541,12 @@ public class FSUtils {
/**
* This is due to HUDI-140 GCS has a different behavior for detecting EOF during seek().
*
* @param inputStream FSDataInputStream
*
* @param fs fileSystem instance.
* @return true if the inputstream or the wrapped one is of type GoogleHadoopFSInputStream
*/
public static boolean isGCSInputStream(FSDataInputStream inputStream) {
return inputStream.getClass().getCanonicalName().equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream")
|| inputStream.getWrappedStream().getClass().getCanonicalName()
.equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream");
public static boolean isGCSFileSystem(FileSystem fs) {
return fs.getScheme().equals(StorageSchemes.GCS.getScheme());
}
public static Configuration registerFileSystem(Path file, Configuration conf) {

View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.fs;
import org.apache.hadoop.fs.FSDataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
/**
* Scheme aware FSDataInputStream so that we manipulate seeks for GS filesystem.
*/
public class SchemeAwareFSDataInputStream extends FSDataInputStream {
private final boolean isGCSFileSystem;
public SchemeAwareFSDataInputStream(InputStream in, boolean isGCSFileSystem) {
super(in);
this.isGCSFileSystem = isGCSFileSystem;
}
@Override
public void seek(long desired) throws IOException {
try {
super.seek(desired);
} catch (EOFException e) {
// with GCSFileSystem, accessing the last byte might throw EOFException and hence this fix.
if (isGCSFileSystem) {
super.seek(desired - 1);
} else {
throw e;
}
}
}
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.common.fs.TimedFSDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -75,20 +76,8 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
if (FSUtils.isGCSInputStream(fsDataInputStream)) {
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) ((
(FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
this.inputStream = fsDataInputStream;
}
this.logFile = logFile;
this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
@@ -107,6 +96,56 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false);
}
/**
* Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams.
* @param fsDataInputStream original instance of {@link FSDataInputStream}.
* @param fs instance of {@link FileSystem} in use.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private FSDataInputStream getFSDataInputStream(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) {
if (FSUtils.isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get EOF exception
return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, bufferSize), true);
}
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
return fsDataInputStream;
}
/**
* GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be
* used by wrapping with required input streams.
* @param fsDataInputStream original instance of {@link FSDataInputStream}.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream, int bufferSize) {
// incase of GCS FS, there are two flows.
// a. fsDataInputStream.getWrappedStream() instanceof FSInputStream
// b. fsDataInputStream.getWrappedStream() not an instanceof FSInputStream, but an instance of FSDataInputStream.
// (a) is handled in the first if block and (b) is handled in the second if block. If not, we fallback to original fsDataInputStream
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}
if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
&& ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream();
return new TimedFSDataInputStream(logFile.getPath(),
new FSDataInputStream(new BufferedFSInputStream(inputStream, bufferSize)));
}
return fsDataInputStream;
}
@Override
public HoodieLogFile getLogFile() {
return logFile;
@@ -238,11 +277,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private boolean isBlockCorrupt(int blocksize) throws IOException {
long currentPos = inputStream.getPos();
try {
if (FSUtils.isGCSInputStream(inputStream)) {
inputStream.seek(currentPos + blocksize - 1);
} else {
inputStream.seek(currentPos + blocksize);
}
inputStream.seek(currentPos + blocksize);
} catch (EOFException e) {
LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
// this is corrupt

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
@@ -220,7 +219,7 @@ public abstract class HoodieLogBlock {
inputStream.readFully(content, 0, contentLength);
} else {
// Seek to the end of the content block
safeSeek(inputStream, inputStream.getPos() + contentLength);
inputStream.seek(inputStream.getPos() + contentLength);
}
return content;
}
@@ -232,9 +231,9 @@ public abstract class HoodieLogBlock {
try {
content = Option.of(new byte[(int) this.getBlockContentLocation().get().getBlockSize()]);
safeSeek(inputStream, this.getBlockContentLocation().get().getContentPositionInLogFile());
inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile());
inputStream.readFully(content.get(), 0, content.get().length);
safeSeek(inputStream, this.getBlockContentLocation().get().getBlockEndPos());
inputStream.seek(this.getBlockContentLocation().get().getBlockEndPos());
} catch (IOException e) {
// TODO : fs.open() and return inputstream again, need to pass FS configuration
// because the inputstream might close/timeout for large number of log blocks to be merged
@@ -249,23 +248,4 @@ public abstract class HoodieLogBlock {
protected void deflate() {
content = Option.empty();
}
/**
* Handles difference in seek behavior for GCS and non-GCS input stream.
*
* @param inputStream Input Stream
* @param pos Position to seek
* @throws IOException -
*/
private static void safeSeek(FSDataInputStream inputStream, long pos) throws IOException {
try {
inputStream.seek(pos);
} catch (EOFException e) {
if (FSUtils.isGCSInputStream(inputStream)) {
inputStream.seek(pos - 1);
} else {
throw e;
}
}
}
}