diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java index 1d316cd6a..4e8701244 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java @@ -37,15 +37,19 @@ public class InLineFsDataInputStream extends FSDataInputStream { private final FSDataInputStream outerStream; private final int length; - public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) { + public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) throws IOException { super(outerStream.getWrappedStream()); this.startOffset = startOffset; this.outerStream = outerStream; this.length = length; + outerStream.seek(startOffset); } @Override public void seek(long desired) throws IOException { + if (desired > length) { + throw new IOException("Attempting to seek past inline content"); + } outerStream.seek(startOffset + desired); } @@ -56,24 +60,32 @@ public class InLineFsDataInputStream extends FSDataInputStream { @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { + if ((length + offset) > this.length) { + throw new IOException("Attempting to read past inline content"); + } return outerStream.read(startOffset + position, buffer, offset, length); } @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + if ((length + offset) > this.length) { + throw new IOException("Attempting to read past inline content"); + } outerStream.readFully(startOffset + position, buffer, offset, length); } @Override public void readFully(long position, byte[] buffer) throws IOException { - outerStream.readFully(startOffset + position, buffer, 0, buffer.length); + readFully(position, buffer, 0, buffer.length); } @Override public boolean seekToNewSource(long targetPos) throws IOException { - boolean toReturn = outerStream.seekToNewSource(startOffset + targetPos); - return toReturn; + if (targetPos > this.length) { + throw new IOException("Attempting to seek past inline content"); + } + return outerStream.seekToNewSource(startOffset + targetPos); } @Override @@ -88,6 +100,9 @@ public class InLineFsDataInputStream extends FSDataInputStream { @Override public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException { + if (readahead > this.length) { + throw new IOException("Attempting to set read ahead past inline content"); + } outerStream.setReadahead(readahead); } @@ -99,6 +114,9 @@ public class InLineFsDataInputStream extends FSDataInputStream { @Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet opts) throws IOException, UnsupportedOperationException { + if (maxLength > this.length) { + throw new IOException("Attempting to read max length beyond inline content"); + } return outerStream.read(bufferPool, maxLength, opts); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java index 32394cb19..4553aa5a9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.File; @@ -122,7 +121,6 @@ public class TestInLineFileSystem { } @Test - @Disabled("Disabling flaky test for now https://issues.apache.org/jira/browse/HUDI-786") public void testFileSystemApis() throws IOException { OuterPathInfo outerPathInfo = generateOuterFileAndGetInfo(1000); Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPathInfo.outerPath, outerPathInfo.startOffset, outerPathInfo.length); @@ -130,10 +128,22 @@ public class TestInLineFileSystem { final FSDataInputStream fsDataInputStream = inlineFileSystem.open(inlinePath); byte[] actualBytes = new byte[outerPathInfo.expectedBytes.length]; // verify pos - assertEquals(0 - outerPathInfo.startOffset, fsDataInputStream.getPos()); + assertEquals(0, fsDataInputStream.getPos()); fsDataInputStream.readFully(0, actualBytes); assertArrayEquals(outerPathInfo.expectedBytes, actualBytes); + // test seek + int[] validPositions = {1, 100, 290, 520, 990, 999, 1000}; + for (int pos : validPositions) { + fsDataInputStream.seek(pos); + } + int[] invalidPositions = {1001, 1100, 10000}; + for (int pos : invalidPositions) { + assertThrows(IOException.class, () -> { + fsDataInputStream.seek(pos); + }, "Should have thrown IOException"); + } + // read partial data // test read(long position, byte[] buffer, int offset, int length) actualBytes = new byte[100]; @@ -143,9 +153,12 @@ public class TestInLineFileSystem { fsDataInputStream.read(25, actualBytes, 100, 210); verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210); // give length to read > than actual inline content - assertThrows(IndexOutOfBoundsException.class, () -> { + assertThrows(IOException.class, () -> { fsDataInputStream.read(0, new byte[1100], 0, 1101); - }, "Should have thrown IndexOutOfBoundsException"); + }, "Should have thrown IOException"); + assertThrows(IOException.class, () -> { + fsDataInputStream.read(0, new byte[10], 991, 10); + }, "Should have thrown IOException"); // test readFully(long position, byte[] buffer, int offset, int length) actualBytes = new byte[100]; @@ -155,9 +168,12 @@ public class TestInLineFileSystem { fsDataInputStream.readFully(25, actualBytes, 100, 210); verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210); // give length to read > than actual inline content - assertThrows(IndexOutOfBoundsException.class, () -> { + assertThrows(IOException.class, () -> { fsDataInputStream.readFully(0, new byte[1100], 0, 1101); - }, "Should have thrown IndexOutOfBoundsException"); + }, "Should have thrown IOException"); + assertThrows(IOException.class, () -> { + fsDataInputStream.readFully(0, new byte[100], 910, 100); + }, "Should have thrown IOException"); // test readFully(long position, byte[] buffer) actualBytes = new byte[100]; @@ -167,19 +183,10 @@ public class TestInLineFileSystem { fsDataInputStream.readFully(25, actualBytes); verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 0, 210); // give length to read > than actual inline content - actualBytes = new byte[1100]; - fsDataInputStream.readFully(0, actualBytes); - verifyArrayEquality(outerPathInfo.expectedBytes, 0, 1000, actualBytes, 0, 1000); + assertThrows(IOException.class, () -> { + fsDataInputStream.readFully(0, new byte[1100]); + }, "Should have thrown IOException"); - // TODO. seek does not move the position. need to investigate. - // test seekToNewSource(long targetPos) - /* fsDataInputStream.seekToNewSource(75); - Assert.assertEquals(outerPathInfo.startOffset + 75, fsDataInputStream.getPos()); - fsDataInputStream.seekToNewSource(180); - Assert.assertEquals(outerPathInfo.startOffset + 180, fsDataInputStream.getPos()); - fsDataInputStream.seekToNewSource(910); - Assert.assertEquals(outerPathInfo.startOffset + 910, fsDataInputStream.getPos()); - */ // test read(ByteBuffer buf) ByteBuffer actualByteBuffer = ByteBuffer.allocate(100); assertThrows(UnsupportedOperationException.class, () -> { @@ -197,11 +204,6 @@ public class TestInLineFileSystem { fsDataInputStream.setDropBehind(true); }, "Should have thrown exception"); - // yet to test - // read(ByteBufferPool bufferPool, int maxLength, EnumSet opts) - // releaseBuffer(ByteBuffer buffer) - // unbuffer() - fsDataInputStream.close(); }