From f715cf607f0692e6500a295a0ab5604f3979df3a Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Wed, 17 Nov 2021 10:59:38 -0800 Subject: [PATCH] [HUDI-2716] InLineFS support for S3FS logs (#3977) --- .../hudi/common/fs/inline/InLineFSUtils.java | 65 ++++++++++++------- .../common/fs/inline/InLineFileSystem.java | 4 +- .../table/log/HoodieLogFormatReader.java | 5 +- .../fs/inline/TestInLineFileSystem.java | 58 +++++++++++++++++ 4 files changed, 104 insertions(+), 28 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java index e4570f942..a2c60bc31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java @@ -19,6 +19,9 @@ package org.apache.hudi.common.fs.inline; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.util.ValidationUtils; + +import java.io.File; /** * Utils to parse InLineFileSystem paths. @@ -29,46 +32,58 @@ import org.apache.hadoop.fs.Path; public class InLineFSUtils { private static final String START_OFFSET_STR = "start_offset"; private static final String LENGTH_STR = "length"; + private static final String PATH_SEPARATOR = "/"; + private static final String SCHEME_SEPARATOR = ":"; private static final String EQUALS_STR = "="; + private static final String LOCAL_FILESYSTEM_SCHEME = "file"; /** - * Fetch inline file path from outer path. - * Eg - * Input: - * Path = s3a://file1, origScheme: file, startOffset = 20, length = 40 - * Output: "inlinefs:/file1/s3a/?start_offset=20&length=40" + * Get the InlineFS Path for a given schema and its Path. + *

+ * Examples: + * Input Path: s3a://file1, origScheme: file, startOffset = 20, length = 40 + * Output: "inlinefs://file1/s3a/?start_offset=20&length=40" * - * @param outerPath - * @param origScheme - * @param inLineStartOffset - * @param inLineLength - * @return + * @param outerPath The outer file Path + * @param origScheme The file schema + * @param inLineStartOffset Start offset for the inline file + * @param inLineLength Length for the inline file + * @return InlineFS Path for the requested outer path and schema */ public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) { - String subPath = outerPath.toString().substring(outerPath.toString().indexOf(":") + 1); + final String subPath = new File(outerPath.toString().substring(outerPath.toString().indexOf(":") + 1)).getPath(); return new Path( - InLineFileSystem.SCHEME + "://" + subPath + "/" + origScheme - + "/" + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset + InLineFileSystem.SCHEME + SCHEME_SEPARATOR + PATH_SEPARATOR + subPath + PATH_SEPARATOR + origScheme + + PATH_SEPARATOR + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset + "&" + LENGTH_STR + EQUALS_STR + inLineLength ); } /** - * Inline file format - * "inlinefs:////?start_offset=start_offset>&length=" - * Outer File format - * "://" + * InlineFS Path format: + * "inlinefs://path/to/outer/file/outer_file_schema/?start_offset=start_offset>&length=" *

- * Eg input : "inlinefs://file1/sa3/?start_offset=20&length=40". - * Output : "sa3://file1" + * Outer File Path format: + * "outer_file_schema://path/to/outer/file" + *

+ * Example + * Input: "inlinefs://file1/s3a/?start_offset=20&length=40". + * Output: "s3a://file1" * - * @param inlinePath inline file system path - * @return + * @param inlineFSPath InLineFS Path to get the outer file Path + * @return Outer file Path from the InLineFS Path */ - public static Path getOuterfilePathFromInlinePath(Path inlinePath) { - String scheme = inlinePath.getParent().getName(); - Path basePath = inlinePath.getParent().getParent(); - return new Path(basePath.toString().replaceFirst(InLineFileSystem.SCHEME, scheme)); + public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) { + final String scheme = inlineFSPath.getParent().getName(); + final Path basePath = inlineFSPath.getParent().getParent(); + ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR), + "Invalid InLineFSPath: " + inlineFSPath); + + final String pathExceptScheme = basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) + 1); + final String fullPath = scheme + SCHEME_SEPARATOR + + (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "") + + pathExceptScheme; + return new Path(fullPath); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java index 4c693c5c5..712b6c7ff 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java @@ -63,7 +63,7 @@ public class InLineFileSystem extends FileSystem { @Override public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException { - Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath); + Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath); FileSystem outerFs = outerPath.getFileSystem(conf); FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize); return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath)); @@ -80,7 +80,7 @@ public class InLineFileSystem extends FileSystem { @Override public FileStatus getFileStatus(Path inlinePath) throws IOException { - Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath); + Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath); FileSystem outerFs = outerPath.getFileSystem(conf); FileStatus status = outerFs.getFileStatus(outerPath); FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(), diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index 36fa187aa..febdbf806 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -44,6 +44,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private final Schema readerSchema; private final boolean readBlocksLazily; private final boolean reverseLogReader; + private final boolean enableInLineReading; private int bufferSize; private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class); @@ -62,6 +63,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.reverseLogReader = reverseLogReader; this.bufferSize = bufferSize; this.prevReadersInOpenState = new ArrayList<>(); + this.enableInLineReading = enableInlineReading; if (logFiles.size() > 0) { HoodieLogFile nextLogFile = logFiles.remove(0); this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading); @@ -104,7 +106,8 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.prevReadersInOpenState.add(currentReader); } this.currentReader = - new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); + new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, + this.enableInLineReading); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java index 4553aa5a9..92f83aad7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java @@ -296,6 +296,64 @@ public class TestInLineFileSystem { }, "Should have thrown exception"); } + static class TestFSPath { + final Path inputPath; + final Path expectedInLineFSPath; + final Path transformedInputPath; + + TestFSPath(final Path inputPath, final Path expectedInLineFSPath, final Path transformedInputPath) { + this.inputPath = inputPath; + this.expectedInLineFSPath = expectedInLineFSPath; + this.transformedInputPath = transformedInputPath; + } + } + + @Test + public void testInLineFSPathConversions() { + final List expectedInLinePaths = Arrays.asList( + new TestFSPath( + new Path("/zero/524bae7e-f01d-47ae-b7cd-910400a81336"), + new Path("inlinefs://zero/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"), + new Path("file:/zero/524bae7e-f01d-47ae-b7cd-910400a81336")), + new TestFSPath( + new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336"), + new Path("inlinefs://one/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"), + new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336")), + new TestFSPath( + new Path("file://two/524bae7e-f01d-47ae-b7cd-910400a81336"), + new Path("inlinefs://two/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"), + new Path("file:/two/524bae7e-f01d-47ae-b7cd-910400a81336")), + new TestFSPath( + new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336"), + new Path("inlinefs://three/524bae7e-f01d-47ae-b7cd-910400a81336/hdfs/?start_offset=10&length=10"), + new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336")), + new TestFSPath( + new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336"), + new Path("inlinefs://four/524bae7e-f01d-47ae-b7cd-910400a81336/s3/?start_offset=10&length=10"), + new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336")), + new TestFSPath( + new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"), + new Path("inlinefs://five/524bae7e-f01d-47ae-b7cd-910400a81336/s3a/?start_offset=10&length=10"), + new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336")) + ); + + for (TestFSPath entry : expectedInLinePaths) { + final Path inputPath = entry.inputPath; + final Path expectedInLineFSPath = entry.expectedInLineFSPath; + final Path expectedTransformedInputPath = entry.transformedInputPath; + + String scheme = "file"; + if (inputPath.toString().contains(":")) { + scheme = inputPath.toString().split(":")[0]; + } + final Path actualInLineFSPath = InLineFSUtils.getInlineFilePath(inputPath, scheme, 10, 10); + assertEquals(expectedInLineFSPath, actualInLineFSPath); + + final Path actualOuterFilePath = InLineFSUtils.getOuterFilePathFromInlinePath(actualInLineFSPath); + assertEquals(expectedTransformedInputPath, actualOuterFilePath); + } + } + @Test public void testExists() throws IOException { Path inlinePath = getRandomInlinePath();