[HUDI-786] Fixing read beyond inline length in InlineFS (#1616)
This commit is contained in:
committed by
GitHub
parent
3574a89232
commit
5a0d3f1cf9
@@ -37,15 +37,19 @@ public class InLineFsDataInputStream extends FSDataInputStream {
|
|||||||
private final FSDataInputStream outerStream;
|
private final FSDataInputStream outerStream;
|
||||||
private final int length;
|
private final int length;
|
||||||
|
|
||||||
public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) {
|
public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) throws IOException {
|
||||||
super(outerStream.getWrappedStream());
|
super(outerStream.getWrappedStream());
|
||||||
this.startOffset = startOffset;
|
this.startOffset = startOffset;
|
||||||
this.outerStream = outerStream;
|
this.outerStream = outerStream;
|
||||||
this.length = length;
|
this.length = length;
|
||||||
|
outerStream.seek(startOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void seek(long desired) throws IOException {
|
public void seek(long desired) throws IOException {
|
||||||
|
if (desired > length) {
|
||||||
|
throw new IOException("Attempting to seek past inline content");
|
||||||
|
}
|
||||||
outerStream.seek(startOffset + desired);
|
outerStream.seek(startOffset + desired);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,24 +60,32 @@ public class InLineFsDataInputStream extends FSDataInputStream {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
|
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
|
||||||
|
if ((length + offset) > this.length) {
|
||||||
|
throw new IOException("Attempting to read past inline content");
|
||||||
|
}
|
||||||
return outerStream.read(startOffset + position, buffer, offset, length);
|
return outerStream.read(startOffset + position, buffer, offset, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
|
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
|
||||||
|
if ((length + offset) > this.length) {
|
||||||
|
throw new IOException("Attempting to read past inline content");
|
||||||
|
}
|
||||||
outerStream.readFully(startOffset + position, buffer, offset, length);
|
outerStream.readFully(startOffset + position, buffer, offset, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFully(long position, byte[] buffer)
|
public void readFully(long position, byte[] buffer)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
outerStream.readFully(startOffset + position, buffer, 0, buffer.length);
|
readFully(position, buffer, 0, buffer.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||||
boolean toReturn = outerStream.seekToNewSource(startOffset + targetPos);
|
if (targetPos > this.length) {
|
||||||
return toReturn;
|
throw new IOException("Attempting to seek past inline content");
|
||||||
|
}
|
||||||
|
return outerStream.seekToNewSource(startOffset + targetPos);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -88,6 +100,9 @@ public class InLineFsDataInputStream extends FSDataInputStream {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
|
public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
|
||||||
|
if (readahead > this.length) {
|
||||||
|
throw new IOException("Attempting to set read ahead past inline content");
|
||||||
|
}
|
||||||
outerStream.setReadahead(readahead);
|
outerStream.setReadahead(readahead);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -99,6 +114,9 @@ public class InLineFsDataInputStream extends FSDataInputStream {
|
|||||||
@Override
|
@Override
|
||||||
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
|
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
|
||||||
throws IOException, UnsupportedOperationException {
|
throws IOException, UnsupportedOperationException {
|
||||||
|
if (maxLength > this.length) {
|
||||||
|
throw new IOException("Attempting to read max length beyond inline content");
|
||||||
|
}
|
||||||
return outerStream.read(bufferPool, maxLength, opts);
|
return outerStream.read(bufferPool, maxLength, opts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.Disabled;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@@ -122,7 +121,6 @@ public class TestInLineFileSystem {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Disabled("Disabling flaky test for now https://issues.apache.org/jira/browse/HUDI-786")
|
|
||||||
public void testFileSystemApis() throws IOException {
|
public void testFileSystemApis() throws IOException {
|
||||||
OuterPathInfo outerPathInfo = generateOuterFileAndGetInfo(1000);
|
OuterPathInfo outerPathInfo = generateOuterFileAndGetInfo(1000);
|
||||||
Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPathInfo.outerPath, outerPathInfo.startOffset, outerPathInfo.length);
|
Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPathInfo.outerPath, outerPathInfo.startOffset, outerPathInfo.length);
|
||||||
@@ -130,10 +128,22 @@ public class TestInLineFileSystem {
|
|||||||
final FSDataInputStream fsDataInputStream = inlineFileSystem.open(inlinePath);
|
final FSDataInputStream fsDataInputStream = inlineFileSystem.open(inlinePath);
|
||||||
byte[] actualBytes = new byte[outerPathInfo.expectedBytes.length];
|
byte[] actualBytes = new byte[outerPathInfo.expectedBytes.length];
|
||||||
// verify pos
|
// verify pos
|
||||||
assertEquals(0 - outerPathInfo.startOffset, fsDataInputStream.getPos());
|
assertEquals(0, fsDataInputStream.getPos());
|
||||||
fsDataInputStream.readFully(0, actualBytes);
|
fsDataInputStream.readFully(0, actualBytes);
|
||||||
assertArrayEquals(outerPathInfo.expectedBytes, actualBytes);
|
assertArrayEquals(outerPathInfo.expectedBytes, actualBytes);
|
||||||
|
|
||||||
|
// test seek
|
||||||
|
int[] validPositions = {1, 100, 290, 520, 990, 999, 1000};
|
||||||
|
for (int pos : validPositions) {
|
||||||
|
fsDataInputStream.seek(pos);
|
||||||
|
}
|
||||||
|
int[] invalidPositions = {1001, 1100, 10000};
|
||||||
|
for (int pos : invalidPositions) {
|
||||||
|
assertThrows(IOException.class, () -> {
|
||||||
|
fsDataInputStream.seek(pos);
|
||||||
|
}, "Should have thrown IOException");
|
||||||
|
}
|
||||||
|
|
||||||
// read partial data
|
// read partial data
|
||||||
// test read(long position, byte[] buffer, int offset, int length)
|
// test read(long position, byte[] buffer, int offset, int length)
|
||||||
actualBytes = new byte[100];
|
actualBytes = new byte[100];
|
||||||
@@ -143,9 +153,12 @@ public class TestInLineFileSystem {
|
|||||||
fsDataInputStream.read(25, actualBytes, 100, 210);
|
fsDataInputStream.read(25, actualBytes, 100, 210);
|
||||||
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210);
|
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210);
|
||||||
// give length to read > than actual inline content
|
// give length to read > than actual inline content
|
||||||
assertThrows(IndexOutOfBoundsException.class, () -> {
|
assertThrows(IOException.class, () -> {
|
||||||
fsDataInputStream.read(0, new byte[1100], 0, 1101);
|
fsDataInputStream.read(0, new byte[1100], 0, 1101);
|
||||||
}, "Should have thrown IndexOutOfBoundsException");
|
}, "Should have thrown IOException");
|
||||||
|
assertThrows(IOException.class, () -> {
|
||||||
|
fsDataInputStream.read(0, new byte[10], 991, 10);
|
||||||
|
}, "Should have thrown IOException");
|
||||||
|
|
||||||
// test readFully(long position, byte[] buffer, int offset, int length)
|
// test readFully(long position, byte[] buffer, int offset, int length)
|
||||||
actualBytes = new byte[100];
|
actualBytes = new byte[100];
|
||||||
@@ -155,9 +168,12 @@ public class TestInLineFileSystem {
|
|||||||
fsDataInputStream.readFully(25, actualBytes, 100, 210);
|
fsDataInputStream.readFully(25, actualBytes, 100, 210);
|
||||||
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210);
|
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210);
|
||||||
// give length to read > than actual inline content
|
// give length to read > than actual inline content
|
||||||
assertThrows(IndexOutOfBoundsException.class, () -> {
|
assertThrows(IOException.class, () -> {
|
||||||
fsDataInputStream.readFully(0, new byte[1100], 0, 1101);
|
fsDataInputStream.readFully(0, new byte[1100], 0, 1101);
|
||||||
}, "Should have thrown IndexOutOfBoundsException");
|
}, "Should have thrown IOException");
|
||||||
|
assertThrows(IOException.class, () -> {
|
||||||
|
fsDataInputStream.readFully(0, new byte[100], 910, 100);
|
||||||
|
}, "Should have thrown IOException");
|
||||||
|
|
||||||
// test readFully(long position, byte[] buffer)
|
// test readFully(long position, byte[] buffer)
|
||||||
actualBytes = new byte[100];
|
actualBytes = new byte[100];
|
||||||
@@ -167,19 +183,10 @@ public class TestInLineFileSystem {
|
|||||||
fsDataInputStream.readFully(25, actualBytes);
|
fsDataInputStream.readFully(25, actualBytes);
|
||||||
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 0, 210);
|
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 0, 210);
|
||||||
// give length to read > than actual inline content
|
// give length to read > than actual inline content
|
||||||
actualBytes = new byte[1100];
|
assertThrows(IOException.class, () -> {
|
||||||
fsDataInputStream.readFully(0, actualBytes);
|
fsDataInputStream.readFully(0, new byte[1100]);
|
||||||
verifyArrayEquality(outerPathInfo.expectedBytes, 0, 1000, actualBytes, 0, 1000);
|
}, "Should have thrown IOException");
|
||||||
|
|
||||||
// TODO. seek does not move the position. need to investigate.
|
|
||||||
// test seekToNewSource(long targetPos)
|
|
||||||
/* fsDataInputStream.seekToNewSource(75);
|
|
||||||
Assert.assertEquals(outerPathInfo.startOffset + 75, fsDataInputStream.getPos());
|
|
||||||
fsDataInputStream.seekToNewSource(180);
|
|
||||||
Assert.assertEquals(outerPathInfo.startOffset + 180, fsDataInputStream.getPos());
|
|
||||||
fsDataInputStream.seekToNewSource(910);
|
|
||||||
Assert.assertEquals(outerPathInfo.startOffset + 910, fsDataInputStream.getPos());
|
|
||||||
*/
|
|
||||||
// test read(ByteBuffer buf)
|
// test read(ByteBuffer buf)
|
||||||
ByteBuffer actualByteBuffer = ByteBuffer.allocate(100);
|
ByteBuffer actualByteBuffer = ByteBuffer.allocate(100);
|
||||||
assertThrows(UnsupportedOperationException.class, () -> {
|
assertThrows(UnsupportedOperationException.class, () -> {
|
||||||
@@ -197,11 +204,6 @@ public class TestInLineFileSystem {
|
|||||||
fsDataInputStream.setDropBehind(true);
|
fsDataInputStream.setDropBehind(true);
|
||||||
}, "Should have thrown exception");
|
}, "Should have thrown exception");
|
||||||
|
|
||||||
// yet to test
|
|
||||||
// read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
|
|
||||||
// releaseBuffer(ByteBuffer buffer)
|
|
||||||
// unbuffer()
|
|
||||||
|
|
||||||
fsDataInputStream.close();
|
fsDataInputStream.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user