[HUDI-2716] InLineFS support for S3FS logs (#3977)
This commit is contained in:
committed by
GitHub
parent
1ee12cfa6f
commit
f715cf607f
@@ -19,6 +19,9 @@
|
|||||||
package org.apache.hudi.common.fs.inline;
|
package org.apache.hudi.common.fs.inline;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utils to parse InLineFileSystem paths.
|
* Utils to parse InLineFileSystem paths.
|
||||||
@@ -29,46 +32,58 @@ import org.apache.hadoop.fs.Path;
|
|||||||
public class InLineFSUtils {
|
public class InLineFSUtils {
|
||||||
private static final String START_OFFSET_STR = "start_offset";
|
private static final String START_OFFSET_STR = "start_offset";
|
||||||
private static final String LENGTH_STR = "length";
|
private static final String LENGTH_STR = "length";
|
||||||
|
private static final String PATH_SEPARATOR = "/";
|
||||||
|
private static final String SCHEME_SEPARATOR = ":";
|
||||||
private static final String EQUALS_STR = "=";
|
private static final String EQUALS_STR = "=";
|
||||||
|
private static final String LOCAL_FILESYSTEM_SCHEME = "file";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch inline file path from outer path.
|
* Get the InlineFS Path for a given schema and its Path.
|
||||||
* Eg
|
* <p>
|
||||||
* Input:
|
* Examples:
|
||||||
* Path = s3a://file1, origScheme: file, startOffset = 20, length = 40
|
* Input Path: s3a://file1, origScheme: file, startOffset = 20, length = 40
|
||||||
* Output: "inlinefs:/file1/s3a/?start_offset=20&length=40"
|
* Output: "inlinefs://file1/s3a/?start_offset=20&length=40"
|
||||||
*
|
*
|
||||||
* @param outerPath
|
* @param outerPath The outer file Path
|
||||||
* @param origScheme
|
* @param origScheme The file schema
|
||||||
* @param inLineStartOffset
|
* @param inLineStartOffset Start offset for the inline file
|
||||||
* @param inLineLength
|
* @param inLineLength Length for the inline file
|
||||||
* @return
|
* @return InlineFS Path for the requested outer path and schema
|
||||||
*/
|
*/
|
||||||
public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) {
|
public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) {
|
||||||
String subPath = outerPath.toString().substring(outerPath.toString().indexOf(":") + 1);
|
final String subPath = new File(outerPath.toString().substring(outerPath.toString().indexOf(":") + 1)).getPath();
|
||||||
return new Path(
|
return new Path(
|
||||||
InLineFileSystem.SCHEME + "://" + subPath + "/" + origScheme
|
InLineFileSystem.SCHEME + SCHEME_SEPARATOR + PATH_SEPARATOR + subPath + PATH_SEPARATOR + origScheme
|
||||||
+ "/" + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
|
+ PATH_SEPARATOR + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
|
||||||
+ "&" + LENGTH_STR + EQUALS_STR + inLineLength
|
+ "&" + LENGTH_STR + EQUALS_STR + inLineLength
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Inline file format
|
* InlineFS Path format:
|
||||||
* "inlinefs://<path_to_outer_file>/<outer_file_scheme>/?start_offset=start_offset>&length=<length>"
|
* "inlinefs://path/to/outer/file/outer_file_schema/?start_offset=start_offset>&length=<length>"
|
||||||
* Outer File format
|
|
||||||
* "<outer_file_scheme>://<path_to_outer_file>"
|
|
||||||
* <p>
|
* <p>
|
||||||
* Eg input : "inlinefs://file1/sa3/?start_offset=20&length=40".
|
* Outer File Path format:
|
||||||
* Output : "sa3://file1"
|
* "outer_file_schema://path/to/outer/file"
|
||||||
|
* <p>
|
||||||
|
* Example
|
||||||
|
* Input: "inlinefs://file1/s3a/?start_offset=20&length=40".
|
||||||
|
* Output: "s3a://file1"
|
||||||
*
|
*
|
||||||
* @param inlinePath inline file system path
|
* @param inlineFSPath InLineFS Path to get the outer file Path
|
||||||
* @return
|
* @return Outer file Path from the InLineFS Path
|
||||||
*/
|
*/
|
||||||
public static Path getOuterfilePathFromInlinePath(Path inlinePath) {
|
public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
|
||||||
String scheme = inlinePath.getParent().getName();
|
final String scheme = inlineFSPath.getParent().getName();
|
||||||
Path basePath = inlinePath.getParent().getParent();
|
final Path basePath = inlineFSPath.getParent().getParent();
|
||||||
return new Path(basePath.toString().replaceFirst(InLineFileSystem.SCHEME, scheme));
|
ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
|
||||||
|
"Invalid InLineFSPath: " + inlineFSPath);
|
||||||
|
|
||||||
|
final String pathExceptScheme = basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) + 1);
|
||||||
|
final String fullPath = scheme + SCHEME_SEPARATOR
|
||||||
|
+ (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
|
||||||
|
+ pathExceptScheme;
|
||||||
|
return new Path(fullPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ public class InLineFileSystem extends FileSystem {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException {
|
public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException {
|
||||||
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
|
Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
|
||||||
FileSystem outerFs = outerPath.getFileSystem(conf);
|
FileSystem outerFs = outerPath.getFileSystem(conf);
|
||||||
FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize);
|
FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize);
|
||||||
return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath));
|
return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath));
|
||||||
@@ -80,7 +80,7 @@ public class InLineFileSystem extends FileSystem {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FileStatus getFileStatus(Path inlinePath) throws IOException {
|
public FileStatus getFileStatus(Path inlinePath) throws IOException {
|
||||||
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
|
Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
|
||||||
FileSystem outerFs = outerPath.getFileSystem(conf);
|
FileSystem outerFs = outerPath.getFileSystem(conf);
|
||||||
FileStatus status = outerFs.getFileStatus(outerPath);
|
FileStatus status = outerFs.getFileStatus(outerPath);
|
||||||
FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
|
FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
|
|||||||
private final Schema readerSchema;
|
private final Schema readerSchema;
|
||||||
private final boolean readBlocksLazily;
|
private final boolean readBlocksLazily;
|
||||||
private final boolean reverseLogReader;
|
private final boolean reverseLogReader;
|
||||||
|
private final boolean enableInLineReading;
|
||||||
private int bufferSize;
|
private int bufferSize;
|
||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
|
private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
|
||||||
@@ -62,6 +63,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
|
|||||||
this.reverseLogReader = reverseLogReader;
|
this.reverseLogReader = reverseLogReader;
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
this.prevReadersInOpenState = new ArrayList<>();
|
this.prevReadersInOpenState = new ArrayList<>();
|
||||||
|
this.enableInLineReading = enableInlineReading;
|
||||||
if (logFiles.size() > 0) {
|
if (logFiles.size() > 0) {
|
||||||
HoodieLogFile nextLogFile = logFiles.remove(0);
|
HoodieLogFile nextLogFile = logFiles.remove(0);
|
||||||
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
|
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
|
||||||
@@ -104,7 +106,8 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
|
|||||||
this.prevReadersInOpenState.add(currentReader);
|
this.prevReadersInOpenState.add(currentReader);
|
||||||
}
|
}
|
||||||
this.currentReader =
|
this.currentReader =
|
||||||
new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
|
new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
|
||||||
|
this.enableInLineReading);
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
throw new HoodieIOException("unable to initialize read with log file ", io);
|
throw new HoodieIOException("unable to initialize read with log file ", io);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -296,6 +296,64 @@ public class TestInLineFileSystem {
|
|||||||
}, "Should have thrown exception");
|
}, "Should have thrown exception");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class TestFSPath {
|
||||||
|
final Path inputPath;
|
||||||
|
final Path expectedInLineFSPath;
|
||||||
|
final Path transformedInputPath;
|
||||||
|
|
||||||
|
TestFSPath(final Path inputPath, final Path expectedInLineFSPath, final Path transformedInputPath) {
|
||||||
|
this.inputPath = inputPath;
|
||||||
|
this.expectedInLineFSPath = expectedInLineFSPath;
|
||||||
|
this.transformedInputPath = transformedInputPath;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInLineFSPathConversions() {
|
||||||
|
final List<TestFSPath> expectedInLinePaths = Arrays.asList(
|
||||||
|
new TestFSPath(
|
||||||
|
new Path("/zero/524bae7e-f01d-47ae-b7cd-910400a81336"),
|
||||||
|
new Path("inlinefs://zero/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
|
||||||
|
new Path("file:/zero/524bae7e-f01d-47ae-b7cd-910400a81336")),
|
||||||
|
new TestFSPath(
|
||||||
|
new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336"),
|
||||||
|
new Path("inlinefs://one/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
|
||||||
|
new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336")),
|
||||||
|
new TestFSPath(
|
||||||
|
new Path("file://two/524bae7e-f01d-47ae-b7cd-910400a81336"),
|
||||||
|
new Path("inlinefs://two/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
|
||||||
|
new Path("file:/two/524bae7e-f01d-47ae-b7cd-910400a81336")),
|
||||||
|
new TestFSPath(
|
||||||
|
new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336"),
|
||||||
|
new Path("inlinefs://three/524bae7e-f01d-47ae-b7cd-910400a81336/hdfs/?start_offset=10&length=10"),
|
||||||
|
new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336")),
|
||||||
|
new TestFSPath(
|
||||||
|
new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336"),
|
||||||
|
new Path("inlinefs://four/524bae7e-f01d-47ae-b7cd-910400a81336/s3/?start_offset=10&length=10"),
|
||||||
|
new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336")),
|
||||||
|
new TestFSPath(
|
||||||
|
new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"),
|
||||||
|
new Path("inlinefs://five/524bae7e-f01d-47ae-b7cd-910400a81336/s3a/?start_offset=10&length=10"),
|
||||||
|
new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"))
|
||||||
|
);
|
||||||
|
|
||||||
|
for (TestFSPath entry : expectedInLinePaths) {
|
||||||
|
final Path inputPath = entry.inputPath;
|
||||||
|
final Path expectedInLineFSPath = entry.expectedInLineFSPath;
|
||||||
|
final Path expectedTransformedInputPath = entry.transformedInputPath;
|
||||||
|
|
||||||
|
String scheme = "file";
|
||||||
|
if (inputPath.toString().contains(":")) {
|
||||||
|
scheme = inputPath.toString().split(":")[0];
|
||||||
|
}
|
||||||
|
final Path actualInLineFSPath = InLineFSUtils.getInlineFilePath(inputPath, scheme, 10, 10);
|
||||||
|
assertEquals(expectedInLineFSPath, actualInLineFSPath);
|
||||||
|
|
||||||
|
final Path actualOuterFilePath = InLineFSUtils.getOuterFilePathFromInlinePath(actualInLineFSPath);
|
||||||
|
assertEquals(expectedTransformedInputPath, actualOuterFilePath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExists() throws IOException {
|
public void testExists() throws IOException {
|
||||||
Path inlinePath = getRandomInlinePath();
|
Path inlinePath = getRandomInlinePath();
|
||||||
|
|||||||
Reference in New Issue
Block a user