diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java index e4570f942..a2c60bc31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java @@ -19,6 +19,9 @@ package org.apache.hudi.common.fs.inline; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.util.ValidationUtils; + +import java.io.File; /** * Utils to parse InLineFileSystem paths. @@ -29,46 +32,58 @@ import org.apache.hadoop.fs.Path; public class InLineFSUtils { private static final String START_OFFSET_STR = "start_offset"; private static final String LENGTH_STR = "length"; + private static final String PATH_SEPARATOR = "/"; + private static final String SCHEME_SEPARATOR = ":"; private static final String EQUALS_STR = "="; + private static final String LOCAL_FILESYSTEM_SCHEME = "file"; /** - * Fetch inline file path from outer path. - * Eg - * Input: - * Path = s3a://file1, origScheme: file, startOffset = 20, length = 40 - * Output: "inlinefs:/file1/s3a/?start_offset=20&length=40" + * Get the InlineFS Path for a given schema and its Path. + *
+ * Examples:
+ * Input Path: s3a://file1, origScheme: file, startOffset = 20, length = 40
+ * Output: "inlinefs://file1/s3a/?start_offset=20&length=40"
*
- * @param outerPath
- * @param origScheme
- * @param inLineStartOffset
- * @param inLineLength
- * @return
+ * @param outerPath The outer file Path
+ * @param origScheme The file schema
+ * @param inLineStartOffset Start offset for the inline file
+ * @param inLineLength Length for the inline file
+ * @return InlineFS Path for the requested outer path and schema
*/
public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) {
- String subPath = outerPath.toString().substring(outerPath.toString().indexOf(":") + 1);
+ final String subPath = new File(outerPath.toString().substring(outerPath.toString().indexOf(":") + 1)).getPath();
return new Path(
- InLineFileSystem.SCHEME + "://" + subPath + "/" + origScheme
- + "/" + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
+ InLineFileSystem.SCHEME + SCHEME_SEPARATOR + PATH_SEPARATOR + subPath + PATH_SEPARATOR + origScheme
+ + PATH_SEPARATOR + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
+ "&" + LENGTH_STR + EQUALS_STR + inLineLength
);
}
/**
- * Inline file format
- * "inlinefs://
- * Eg input : "inlinefs://file1/sa3/?start_offset=20&length=40".
- * Output : "sa3://file1"
+ * Outer File Path format:
+ * "outer_file_schema://path/to/outer/file"
+ *
+ * Example
+ * Input: "inlinefs://file1/s3a/?start_offset=20&length=40".
+ * Output: "s3a://file1"
*
- * @param inlinePath inline file system path
- * @return
+ * @param inlineFSPath InLineFS Path to get the outer file Path
+ * @return Outer file Path from the InLineFS Path
*/
- public static Path getOuterfilePathFromInlinePath(Path inlinePath) {
- String scheme = inlinePath.getParent().getName();
- Path basePath = inlinePath.getParent().getParent();
- return new Path(basePath.toString().replaceFirst(InLineFileSystem.SCHEME, scheme));
+ public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
+ final String scheme = inlineFSPath.getParent().getName();
+ final Path basePath = inlineFSPath.getParent().getParent();
+ ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
+ "Invalid InLineFSPath: " + inlineFSPath);
+
+ final String pathExceptScheme = basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) + 1);
+ final String fullPath = scheme + SCHEME_SEPARATOR
+ + (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
+ + pathExceptScheme;
+ return new Path(fullPath);
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java
index 4c693c5c5..712b6c7ff 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java
@@ -63,7 +63,7 @@ public class InLineFileSystem extends FileSystem {
@Override
public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException {
- Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
+ Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize);
return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath));
@@ -80,7 +80,7 @@ public class InLineFileSystem extends FileSystem {
@Override
public FileStatus getFileStatus(Path inlinePath) throws IOException {
- Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
+ Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FileStatus status = outerFs.getFileStatus(outerPath);
FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 36fa187aa..febdbf806 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -44,6 +44,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final Schema readerSchema;
private final boolean readBlocksLazily;
private final boolean reverseLogReader;
+ private final boolean enableInLineReading;
private int bufferSize;
private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
@@ -62,6 +63,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.reverseLogReader = reverseLogReader;
this.bufferSize = bufferSize;
this.prevReadersInOpenState = new ArrayList<>();
+ this.enableInLineReading = enableInlineReading;
if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
@@ -104,7 +106,8 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.prevReadersInOpenState.add(currentReader);
}
this.currentReader =
- new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
+ new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
+ this.enableInLineReading);
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io);
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
index 4553aa5a9..92f83aad7 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
@@ -296,6 +296,64 @@ public class TestInLineFileSystem {
}, "Should have thrown exception");
}
+ static class TestFSPath {
+ final Path inputPath;
+ final Path expectedInLineFSPath;
+ final Path transformedInputPath;
+
+ TestFSPath(final Path inputPath, final Path expectedInLineFSPath, final Path transformedInputPath) {
+ this.inputPath = inputPath;
+ this.expectedInLineFSPath = expectedInLineFSPath;
+ this.transformedInputPath = transformedInputPath;
+ }
+ }
+
+ @Test
+ public void testInLineFSPathConversions() {
+ final List