diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml index 3cab43a63..765055532 100644 --- a/hudi-client/pom.xml +++ b/hudi-client/pom.xml @@ -183,6 +183,7 @@ org.apache.hbase hbase-client + ${hbase.version} diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index 45534e733..71904aaac 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -177,5 +177,20 @@ + + + + org.apache.hbase + hbase-client + ${hbase.version} + test + + + org.apache.hbase + hbase-server + ${hbase.version} + test + + diff --git a/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InLineFSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InLineFSUtils.java new file mode 100644 index 000000000..dbc64dd64 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InLineFSUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.inline.fs; + +import org.apache.hadoop.fs.Path; + +/** + * Utils to parse InLineFileSystem paths. + * Inline FS format: + * "inlinefs:////?start_offset=start_offset>&length=" + * Eg: "inlinefs:///s3a/?start_offset=20&length=40" + */ +public class InLineFSUtils { + private static final String START_OFFSET_STR = "start_offset"; + private static final String LENGTH_STR = "length"; + private static final String EQUALS_STR = "="; + + /** + * Fetch inline file path from outer path. + * Eg + * Input: + * Path = s3a://file1, origScheme: file, startOffset = 20, length = 40 + * Output: "inlinefs:/file1/s3a/?start_offset=20&length=40" + * + * @param outerPath + * @param origScheme + * @param inLineStartOffset + * @param inLineLength + * @return + */ + public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) { + String subPath = outerPath.toString().substring(outerPath.toString().indexOf(":") + 1); + return new Path( + InLineFileSystem.SCHEME + "://" + subPath + "/" + origScheme + + "/" + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset + + "&" + LENGTH_STR + EQUALS_STR + inLineLength + ); + } + + /** + * Inline file format + * "inlinefs:////?start_offset=start_offset>&length=" + * Outer File format + * "://" + *

+ * Eg input : "inlinefs://file1/sa3/?start_offset=20&length=40". + * Output : "sa3://file1" + * + * @param inlinePath inline file system path + * @return + */ + public static Path getOuterfilePathFromInlinePath(Path inlinePath) { + String scheme = inlinePath.getParent().getName(); + Path basePath = inlinePath.getParent().getParent(); + return new Path(basePath.toString().replaceFirst(InLineFileSystem.SCHEME, scheme)); + } + + /** + * Eg input : "inlinefs://file1/s3a/?start_offset=20&length=40". + * output: 20 + * + * @param inlinePath + * @return + */ + public static int startOffset(Path inlinePath) { + String[] slices = inlinePath.toString().split("[?&=]"); + return Integer.parseInt(slices[slices.length - 3]); + } + + /** + * Eg input : "inlinefs:/file1/s3a/?start_offset=20&length=40". + * Output: 40 + * + * @param inlinePath + * @return + */ + public static int length(Path inlinePath) { + String[] slices = inlinePath.toString().split("[?&=]"); + return Integer.parseInt(slices[slices.length - 1]); + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InLineFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InLineFileSystem.java new file mode 100644 index 000000000..480a126e6 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InLineFileSystem.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.inline.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import java.io.IOException; +import java.net.URI; + +/** + * Enables reading any inline file at a given offset and length. This {@link FileSystem} is used only in read path and does not support + * any write apis. + *

+ * - Reading an inlined file at a given offset, length, read it out as if it were an independent file of that length + * - Inlined path is of the form "inlinefs:///path/to/outer/file//?start_offset=&length= + *

+ * TODO: The reader/writer may try to use relative paths based on the inlinepath and it may not work. Need to handle + * this gracefully eg. the parquet summary metadata reading. TODO: If this shows promise, also support directly writing + * the inlined file to the underneath file without buffer + */ +public class InLineFileSystem extends FileSystem { + + public static final String SCHEME = "inlinefs"; + private Configuration conf = null; + + @Override + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + this.conf = conf; + } + + @Override + public URI getUri() { + return URI.create(getScheme()); + } + + public String getScheme() { + return SCHEME; + } + + @Override + public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException { + Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath); + FileSystem outerFs = outerPath.getFileSystem(conf); + FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize); + return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath)); + } + + @Override + public boolean exists(Path f) { + try { + return getFileStatus(f) != null; + } catch (Exception e) { + return false; + } + } + + @Override + public FileStatus getFileStatus(Path inlinePath) throws IOException { + Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath); + FileSystem outerFs = outerPath.getFileSystem(conf); + FileStatus status = outerFs.getFileStatus(outerPath); + FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(), + status.getModificationTime(), status.getAccessTime(), status.getPermission(), status.getOwner(), + status.getGroup(), inlinePath); + return toReturn; + } + + @Override + public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l, + Progressable progressable) throws IOException { + throw new UnsupportedOperationException("Can't rename files"); + } + + @Override + public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException { + throw new UnsupportedOperationException("Can't rename files"); + } + + @Override + public boolean rename(Path path, Path path1) throws IOException { + throw new UnsupportedOperationException("Can't rename files"); + } + + @Override + public boolean delete(Path path, boolean b) throws IOException { + throw new UnsupportedOperationException("Can't delete files"); + } + + @Override + public FileStatus[] listStatus(Path inlinePath) throws IOException { + return new FileStatus[] {getFileStatus(inlinePath)}; + } + + @Override + public void setWorkingDirectory(Path path) { + throw new UnsupportedOperationException("Can't set working directory"); + } + + @Override + public Path getWorkingDirectory() { + throw new UnsupportedOperationException("Can't get working directory"); + } + + @Override + public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { + throw new UnsupportedOperationException("Can't set working directory"); + } + +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InLineFsDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InLineFsDataInputStream.java new file mode 100644 index 000000000..cd5ca0993 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InLineFsDataInputStream.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.inline.fs; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.io.ByteBufferPool; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +/** + * Inline {@link FSDataInputStream}. A startOffset that is passed in is assumed to be the start of the InputStream. + * All operations are handled having the {@code startOffset} as starting point. + */ +public class InLineFsDataInputStream extends FSDataInputStream { + + private final int startOffset; + private final FSDataInputStream outerStream; + private final int length; + + public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) { + super(outerStream.getWrappedStream()); + this.startOffset = startOffset; + this.outerStream = outerStream; + this.length = length; + } + + @Override + public void seek(long desired) throws IOException { + outerStream.seek(startOffset + desired); + } + + @Override + public long getPos() throws IOException { + return outerStream.getPos() - startOffset; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + return outerStream.read(startOffset + position, buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + outerStream.readFully(startOffset + position, buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer) + throws IOException { + outerStream.readFully(startOffset + position, buffer, 0, buffer.length); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + boolean toReturn = outerStream.seekToNewSource(startOffset + targetPos); + return toReturn; + } + + @Override + public int read(ByteBuffer buf) throws IOException { + return outerStream.read(buf); + } + + @Override + public FileDescriptor getFileDescriptor() throws IOException { + return outerStream.getFileDescriptor(); + } + + @Override + public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException { + outerStream.setReadahead(readahead); + } + + @Override + public void setDropBehind(Boolean dropBehind) throws IOException, UnsupportedOperationException { + outerStream.setDropBehind(dropBehind); + } + + @Override + public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet opts) + throws IOException, UnsupportedOperationException { + return outerStream.read(bufferPool, maxLength, opts); + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + outerStream.releaseBuffer(buffer); + } + + @Override + public void unbuffer() { + outerStream.unbuffer(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InMemoryFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InMemoryFileSystem.java new file mode 100644 index 000000000..c1f33a7c5 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/inline/fs/InMemoryFileSystem.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.inline.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * A FileSystem which stores all content in memory and returns a byte[] when {@link #getFileAsBytes()} is called + * This FileSystem is used only in write path. Does not support any read apis except {@link #getFileAsBytes()}. + */ +public class InMemoryFileSystem extends FileSystem { + + // TODO: this needs to be per path to support num_cores > 1, and we should release the buffer once done + private ByteArrayOutputStream bos; + private Configuration conf = null; + public static final String SCHEME = "inmemfs"; + private URI uri; + + InMemoryFileSystem() { + } + + @Override + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + this.conf = conf; + this.uri = name; + } + + @Override + public URI getUri() { + return uri; + } + + public String getScheme() { + return SCHEME; + } + + @Override + public FSDataInputStream open(Path inlinePath, int bufferSize) { + return null; + } + + @Override + public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l, + Progressable progressable) throws IOException { + bos = new ByteArrayOutputStream(); + try { + this.uri = new URI(path.toString()); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Path not parsable as URI " + path); + } + return new FSDataOutputStream(bos, new Statistics(getScheme())); + } + + public byte[] getFileAsBytes() { + return bos.toByteArray(); + } + + @Override + public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException { + return null; + } + + @Override + public boolean rename(Path path, Path path1) throws IOException { + throw new UnsupportedOperationException("Can't rename files"); + } + + @Override + public boolean delete(Path path, boolean b) throws IOException { + throw new UnsupportedOperationException("Can't delete files"); + } + + @Override + public FileStatus[] listStatus(Path inlinePath) throws FileNotFoundException, IOException { + throw new UnsupportedOperationException("No support for listStatus"); + } + + @Override + public void setWorkingDirectory(Path path) { + throw new UnsupportedOperationException("Can't set working directory"); + } + + @Override + public Path getWorkingDirectory() { + return null; + } + + @Override + public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { + throw new UnsupportedOperationException("Can't mkdir"); + } + + @Override + public boolean exists(Path f) throws IOException { + throw new UnsupportedOperationException("Can't check for exists"); + } + + @Override + public FileStatus getFileStatus(Path inlinePath) throws IOException { + throw new UnsupportedOperationException("No support for getFileStatus"); + } + + @Override + public void close() throws IOException { + super.close(); + bos.close(); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/FileSystemTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/FileSystemTestUtils.java new file mode 100644 index 000000000..5d0b6a4a7 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/FileSystemTestUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.inline.fs; + +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.util.UUID; + +/** + * Utils class to assist in testing {@link InMemoryFileSystem} and {@link InLineFileSystem}. + */ +public class FileSystemTestUtils { + + public static final String TEMP = "tmp"; + public static final String FORWARD_SLASH = "/"; + public static final String FILE_SCHEME = "file"; + public static final String COLON = ":"; + static final Random RANDOM = new Random(); + + public static Path getRandomOuterInMemPath() { + String randomFileName = UUID.randomUUID().toString(); + String fileSuffix = COLON + FORWARD_SLASH + TEMP + FORWARD_SLASH + randomFileName; + return new Path(InMemoryFileSystem.SCHEME + fileSuffix); + } + + static Path getRandomOuterFSPath() { + String randomFileName = UUID.randomUUID().toString(); + String fileSuffix = COLON + FORWARD_SLASH + TEMP + FORWARD_SLASH + randomFileName; + return new Path(FILE_SCHEME + fileSuffix); + } + + public static Path getPhantomFile(Path outerPath, long startOffset, long inlineLength) { + // Generate phathom inline file + return InLineFSUtils.getInlineFilePath(outerPath, FILE_SCHEME, startOffset, inlineLength); + } + + public static void deleteFile(File fileToDelete) throws IOException { + if (!fileToDelete.exists()) { + return; + } + if (!fileToDelete.delete()) { + String message = + "Unable to delete file " + fileToDelete + "."; + throw new IOException(message); + } + } +} \ No newline at end of file diff --git a/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/TestHFileInLining.java b/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/TestHFileInLining.java new file mode 100644 index 000000000..391a2390a --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/TestHFileInLining.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.inline.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.FILE_SCHEME; +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.RANDOM; +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getPhantomFile; +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getRandomOuterInMemPath; + +/** + * Tests {@link InLineFileSystem} to inline HFile. + */ +public class TestHFileInLining { + + private final Configuration inMemoryConf; + private final Configuration inlineConf; + private final int minBlockSize = 1024; + private static final String LOCAL_FORMATTER = "%010d"; + private int maxRows = 100 + RANDOM.nextInt(1000); + private Path generatedPath; + + public TestHFileInLining() { + inMemoryConf = new Configuration(); + inMemoryConf.set("fs." + InMemoryFileSystem.SCHEME + ".impl", InMemoryFileSystem.class.getName()); + inlineConf = new Configuration(); + inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName()); + } + + @After + public void teardown() throws IOException { + if (generatedPath != null) { + File filePath = new File(generatedPath.toString().substring(generatedPath.toString().indexOf(':') + 1)); + if (filePath.exists()) { + FileSystemTestUtils.deleteFile(filePath); + } + } + } + + @Test + public void testSimpleInlineFileSystem() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + Path outerPath = new Path(FILE_SCHEME + outerInMemFSPath.toString().substring(outerInMemFSPath.toString().indexOf(':'))); + generatedPath = outerPath; + CacheConfig cacheConf = new CacheConfig(inMemoryConf); + FSDataOutputStream fout = createFSOutput(outerInMemFSPath, inMemoryConf); + HFileContext meta = new HFileContextBuilder() + .withBlockSize(minBlockSize) + .build(); + HFile.Writer writer = HFile.getWriterFactory(inMemoryConf, cacheConf) + .withOutputStream(fout) + .withFileContext(meta) + .withComparator(new KeyValue.KVComparator()) + .create(); + + writeRecords(writer); + fout.close(); + + byte[] inlineBytes = getBytesToInline(outerInMemFSPath); + long startOffset = generateOuterFile(outerPath, inlineBytes); + + long inlineLength = inlineBytes.length; + + // Generate phantom inline file + Path inlinePath = getPhantomFile(outerPath, startOffset, inlineLength); + + InLineFileSystem inlineFileSystem = (InLineFileSystem) inlinePath.getFileSystem(inlineConf); + FSDataInputStream fin = inlineFileSystem.open(inlinePath); + + HFile.Reader reader = HFile.createReader(inlineFileSystem, inlinePath, cacheConf, inlineConf); + // Load up the index. + reader.loadFileInfo(); + // Get a scanner that caches and that does not use pread. + HFileScanner scanner = reader.getScanner(true, false); + // Align scanner at start of the file. + scanner.seekTo(); + readAllRecords(scanner); + + Set rowIdsToSearch = getRandomValidRowIds(10); + for (int rowId : rowIdsToSearch) { + Assert.assertTrue("location lookup failed", + scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId))) == 0); + // read the key and see if it matches + ByteBuffer readKey = scanner.getKey(); + Assert.assertTrue("seeked key does not match", Arrays.equals(getSomeKey(rowId), + Bytes.toBytes(readKey))); + scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId))); + ByteBuffer val1 = scanner.getValue(); + scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId))); + ByteBuffer val2 = scanner.getValue(); + Assert.assertTrue(Arrays.equals(Bytes.toBytes(val1), Bytes.toBytes(val2))); + } + + int[] invalidRowIds = {-4, maxRows, maxRows + 1, maxRows + 120, maxRows + 160, maxRows + 1000}; + for (int rowId : invalidRowIds) { + Assert.assertFalse("location lookup should have failed", + scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId))) == 0); + } + reader.close(); + fin.close(); + outerPath.getFileSystem(inMemoryConf).delete(outerPath, true); + } + + private Set getRandomValidRowIds(int count) { + Set rowIds = new HashSet<>(); + while (rowIds.size() < count) { + int index = RANDOM.nextInt(maxRows); + if (!rowIds.contains(index)) { + rowIds.add(index); + } + } + return rowIds; + } + + private byte[] getSomeKey(int rowId) { + KeyValue kv = new KeyValue(String.format(LOCAL_FORMATTER, Integer.valueOf(rowId)).getBytes(), + Bytes.toBytes("family"), Bytes.toBytes("qual"), HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put); + return kv.getKey(); + } + + private FSDataOutputStream createFSOutput(Path name, Configuration conf) throws IOException { + return name.getFileSystem(conf).create(name); + } + + private void writeRecords(HFile.Writer writer) throws IOException { + writeSomeRecords(writer); + writer.close(); + } + + private int writeSomeRecords(HFile.Writer writer) + throws IOException { + String value = "value"; + KeyValue kv; + for (int i = 0; i < (maxRows); i++) { + String key = String.format(LOCAL_FORMATTER, Integer.valueOf(i)); + kv = new KeyValue(Bytes.toBytes(key), Bytes.toBytes("family"), Bytes.toBytes("qual"), + Bytes.toBytes(value + key)); + writer.append(kv); + } + return (maxRows); + } + + private void readAllRecords(HFileScanner scanner) throws IOException { + readAndCheckbytes(scanner, 0, maxRows); + } + + // read the records and check + private int readAndCheckbytes(HFileScanner scanner, int start, int n) + throws IOException { + String value = "value"; + int i = start; + for (; i < (start + n); i++) { + ByteBuffer key = scanner.getKey(); + ByteBuffer val = scanner.getValue(); + String keyStr = String.format(LOCAL_FORMATTER, Integer.valueOf(i)); + String valStr = value + keyStr; + KeyValue kv = new KeyValue(Bytes.toBytes(keyStr), Bytes.toBytes("family"), + Bytes.toBytes("qual"), Bytes.toBytes(valStr)); + byte[] keyBytes = new KeyValue.KeyOnlyKeyValue(Bytes.toBytes(key), 0, + Bytes.toBytes(key).length).getKey(); + Assert.assertTrue("bytes for keys do not match " + keyStr + " " + + Bytes.toString(Bytes.toBytes(key)), Arrays.equals(kv.getKey(), keyBytes)); + byte[] valBytes = Bytes.toBytes(val); + Assert.assertTrue("bytes for vals do not match " + valStr + " " + + Bytes.toString(valBytes), Arrays.equals(Bytes.toBytes(valStr), valBytes)); + if (!scanner.next()) { + break; + } + } + Assert.assertEquals(i, start + n - 1); + return (start + n); + } + + private long generateOuterFile(Path outerPath, byte[] inlineBytes) throws IOException { + FSDataOutputStream wrappedOut = outerPath.getFileSystem(inMemoryConf).create(outerPath, true); + // write random bytes + writeRandomBytes(wrappedOut, 10); + + // save position for start offset + long startOffset = wrappedOut.getPos(); + // embed inline file + wrappedOut.write(inlineBytes); + + // write random bytes + writeRandomBytes(wrappedOut, 5); + wrappedOut.hsync(); + wrappedOut.close(); + return startOffset; + } + + private byte[] getBytesToInline(Path outerInMemFSPath) throws IOException { + InMemoryFileSystem inMemoryFileSystem = (InMemoryFileSystem) outerInMemFSPath.getFileSystem(inMemoryConf); + return inMemoryFileSystem.getFileAsBytes(); + } + + private void writeRandomBytes(FSDataOutputStream writer, int count) throws IOException { + for (int i = 0; i < count; i++) { + writer.writeUTF(UUID.randomUUID().toString()); + } + } +} + diff --git a/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/TestInLineFileSystem.java b/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/TestInLineFileSystem.java new file mode 100644 index 000000000..2c6d15dba --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/TestInLineFileSystem.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.inline.fs; + +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.RANDOM; +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getRandomOuterFSPath; + +/** + * Tests {@link InLineFileSystem}. + */ +public class TestInLineFileSystem { + private Configuration conf; + private List listOfGeneratedPaths; + + public TestInLineFileSystem() { + conf = new Configuration(); + conf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName()); + this.listOfGeneratedPaths = new ArrayList<>(); + } + + @After + public void teardown() throws IOException { + for (Path pathToDelete : listOfGeneratedPaths) { + File filePath = new File(pathToDelete.toString().substring(pathToDelete.toString().indexOf(':') + 1)); + if (filePath.exists()) { + FileSystemTestUtils.deleteFile(filePath); + } + } + } + + @Test + public void testReadInlineFile() throws IOException { + Path outerPath = getRandomOuterFSPath(); + listOfGeneratedPaths.add(outerPath); + + int totalSlices = 5; // embed n slices so that we can test N inline seqPaths + List> startOffsetLengthPairs = new ArrayList<>(); + List expectedByteArrays = new ArrayList<>(); + + FSDataOutputStream wrappedOut = outerPath.getFileSystem(conf).create(outerPath, true); + for (int i = 0; i < totalSlices; i++) { + // append random bytes + byte[] randomBytes = new byte[RANDOM.nextInt(1000)]; + RANDOM.nextBytes(randomBytes); + wrappedOut.write(randomBytes); + long startOffset = wrappedOut.getPos(); + // add inline content + byte[] embeddedInlineBytes = new byte[RANDOM.nextInt(1000)]; + RANDOM.nextBytes(embeddedInlineBytes); + wrappedOut.write(embeddedInlineBytes); + expectedByteArrays.add(embeddedInlineBytes); + startOffsetLengthPairs.add(Pair.of(startOffset, embeddedInlineBytes.length)); + } + // suffix random bytes + byte[] randomBytes = new byte[RANDOM.nextInt(1000)]; + RANDOM.nextBytes(randomBytes); + wrappedOut.write(randomBytes); + wrappedOut.flush(); + wrappedOut.close(); + FileStatus expectedFileStatus = outerPath.getFileSystem(conf).getFileStatus(outerPath); + + for (int i = 0; i < totalSlices; i++) { + Pair startOffsetLengthPair = startOffsetLengthPairs.get(i); + byte[] expectedBytes = expectedByteArrays.get(i); + Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPath, startOffsetLengthPair.getLeft(), startOffsetLengthPair.getRight()); + InLineFileSystem inlineFileSystem = (InLineFileSystem) inlinePath.getFileSystem(conf); + FSDataInputStream fsDataInputStream = inlineFileSystem.open(inlinePath); + Assert.assertTrue(inlineFileSystem.exists(inlinePath)); + verifyFileStatus(expectedFileStatus, inlinePath, startOffsetLengthPair.getRight(), inlineFileSystem.getFileStatus(inlinePath)); + FileStatus[] actualFileStatuses = inlineFileSystem.listStatus(inlinePath); + Assert.assertEquals(1, actualFileStatuses.length); + verifyFileStatus(expectedFileStatus, inlinePath, startOffsetLengthPair.getRight(), actualFileStatuses[0]); + byte[] actualBytes = new byte[expectedBytes.length]; + fsDataInputStream.readFully(0, actualBytes); + Assert.assertArrayEquals(expectedBytes, actualBytes); + fsDataInputStream.close(); + Assert.assertEquals(InLineFileSystem.SCHEME, inlineFileSystem.getScheme()); + Assert.assertEquals(URI.create(InLineFileSystem.SCHEME), inlineFileSystem.getUri()); + } + } + + @Test + public void testFileSystemApis() throws IOException { + OuterPathInfo outerPathInfo = generateOuterFileAndGetInfo(1000); + Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPathInfo.outerPath, outerPathInfo.startOffset, outerPathInfo.length); + InLineFileSystem inlineFileSystem = (InLineFileSystem) inlinePath.getFileSystem(conf); + FSDataInputStream fsDataInputStream = inlineFileSystem.open(inlinePath); + byte[] actualBytes = new byte[outerPathInfo.expectedBytes.length]; + // verify pos + Assert.assertEquals(0 - outerPathInfo.startOffset, fsDataInputStream.getPos()); + fsDataInputStream.readFully(0, actualBytes); + Assert.assertArrayEquals(outerPathInfo.expectedBytes, actualBytes); + + // read partial data + // test read(long position, byte[] buffer, int offset, int length) + actualBytes = new byte[100]; + fsDataInputStream.read(0, actualBytes, 10, 10); + verifyArrayEquality(outerPathInfo.expectedBytes, 0, 10, actualBytes, 10, 10); + actualBytes = new byte[310]; + fsDataInputStream.read(25, actualBytes, 100, 210); + verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210); + // give length to read > than actual inline content + actualBytes = new byte[1100]; + try { + fsDataInputStream.read(0, actualBytes, 0, 1101); + Assert.fail("Should have thrown IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException e) { + // no op + } + + // test readFully(long position, byte[] buffer, int offset, int length) + actualBytes = new byte[100]; + fsDataInputStream.readFully(0, actualBytes, 10, 20); + verifyArrayEquality(outerPathInfo.expectedBytes, 0, 10, actualBytes, 10, 10); + actualBytes = new byte[310]; + fsDataInputStream.readFully(25, actualBytes, 100, 210); + verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210); + // give length to read > than actual inline content + actualBytes = new byte[1100]; + try { + fsDataInputStream.readFully(0, actualBytes, 0, 1101); + Assert.fail("Should have thrown IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException e) { + // no op + } + + // test readFully(long position, byte[] buffer) + actualBytes = new byte[100]; + fsDataInputStream.readFully(0, actualBytes); + verifyArrayEquality(outerPathInfo.expectedBytes, 0, 100, actualBytes, 0, 100); + actualBytes = new byte[310]; + fsDataInputStream.readFully(25, actualBytes); + verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 0, 210); + // give length to read > than actual inline content + actualBytes = new byte[1100]; + fsDataInputStream.readFully(0, actualBytes); + verifyArrayEquality(outerPathInfo.expectedBytes, 0, 1000, actualBytes, 0, 1000); + + // TODO. seek does not move the position. need to investigate. + // test seekToNewSource(long targetPos) + /* fsDataInputStream.seekToNewSource(75); + Assert.assertEquals(outerPathInfo.startOffset + 75, fsDataInputStream.getPos()); + fsDataInputStream.seekToNewSource(180); + Assert.assertEquals(outerPathInfo.startOffset + 180, fsDataInputStream.getPos()); + fsDataInputStream.seekToNewSource(910); + Assert.assertEquals(outerPathInfo.startOffset + 910, fsDataInputStream.getPos()); + */ + // test read(ByteBuffer buf) + ByteBuffer actualByteBuffer = ByteBuffer.allocate(100); + try { + fsDataInputStream.read(actualByteBuffer); + Assert.fail("Should have thrown"); + } catch (UnsupportedOperationException e) { + // ignore + } + + Assert.assertEquals(outerPathInfo.outerPath.getFileSystem(conf).open(outerPathInfo.outerPath).getFileDescriptor(), fsDataInputStream.getFileDescriptor()); + + try { + fsDataInputStream.setReadahead(10L); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + + try { + fsDataInputStream.setDropBehind(true); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + + // yet to test + // read(ByteBufferPool bufferPool, int maxLength, EnumSet opts) + // releaseBuffer(ByteBuffer buffer) + // unbuffer() + + fsDataInputStream.close(); + } + + private void verifyArrayEquality(byte[] expected, int expectedOffset, int expectedLength, + byte[] actual, int actualOffset, int actualLength) { + Assert.assertArrayEquals(Arrays.copyOfRange(expected, expectedOffset, expectedOffset + expectedLength), Arrays.copyOfRange(actual, actualOffset, actualOffset + actualLength)); + } + + private OuterPathInfo generateOuterFileAndGetInfo(int inlineContentSize) throws IOException { + OuterPathInfo toReturn = new OuterPathInfo(); + Path outerPath = getRandomOuterFSPath(); + listOfGeneratedPaths.add(outerPath); + toReturn.outerPath = outerPath; + FSDataOutputStream wrappedOut = outerPath.getFileSystem(conf).create(outerPath, true); + // append random bytes + byte[] randomBytes = new byte[RANDOM.nextInt(1000)]; + RANDOM.nextBytes(randomBytes); + wrappedOut.write(randomBytes); + toReturn.startOffset = wrappedOut.getPos(); + // add inline content + byte[] embeddedInlineBytes = new byte[inlineContentSize]; + RANDOM.nextBytes(embeddedInlineBytes); + wrappedOut.write(embeddedInlineBytes); + toReturn.expectedBytes = embeddedInlineBytes; + toReturn.length = embeddedInlineBytes.length; + // suffix random bytes + randomBytes = new byte[RANDOM.nextInt(1000)]; + RANDOM.nextBytes(randomBytes); + wrappedOut.write(randomBytes); + wrappedOut.flush(); + wrappedOut.close(); + return toReturn; + } + + @Test + public void testOpen() throws IOException { + Path inlinePath = getRandomInlinePath(); + // open non existant path + try { + inlinePath.getFileSystem(conf).open(inlinePath); + Assert.fail("Should have thrown exception"); + } catch (FileNotFoundException e) { + // ignore + } + } + + @Test + public void testCreate() throws IOException { + Path inlinePath = getRandomInlinePath(); + try { + inlinePath.getFileSystem(conf).create(inlinePath, true); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testAppend() throws IOException { + Path inlinePath = getRandomInlinePath(); + try { + inlinePath.getFileSystem(conf).append(inlinePath); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testRename() throws IOException { + Path inlinePath = getRandomInlinePath(); + try { + inlinePath.getFileSystem(conf).rename(inlinePath, inlinePath); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testDelete() throws IOException { + Path inlinePath = getRandomInlinePath(); + try { + inlinePath.getFileSystem(conf).delete(inlinePath, true); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testgetWorkingDir() throws IOException { + Path inlinePath = getRandomInlinePath(); + try { + inlinePath.getFileSystem(conf).getWorkingDirectory(); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testsetWorkingDirectory() throws IOException { + Path inlinePath = getRandomInlinePath(); + try { + inlinePath.getFileSystem(conf).setWorkingDirectory(inlinePath); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testExists() throws IOException { + Path inlinePath = getRandomInlinePath(); + Assert.assertFalse(inlinePath.getFileSystem(conf).exists(inlinePath)); + } + + private Path getRandomInlinePath() { + Path outerPath = getRandomOuterFSPath(); + listOfGeneratedPaths.add(outerPath); + return FileSystemTestUtils.getPhantomFile(outerPath, 100, 100); + } + + private void verifyFileStatus(FileStatus expected, Path inlinePath, long expectedLength, FileStatus actual) { + Assert.assertEquals(inlinePath, actual.getPath()); + Assert.assertEquals(expectedLength, actual.getLen()); + Assert.assertEquals(expected.getAccessTime(), actual.getAccessTime()); + Assert.assertEquals(expected.getBlockSize(), actual.getBlockSize()); + Assert.assertEquals(expected.getGroup(), actual.getGroup()); + Assert.assertEquals(expected.getModificationTime(), actual.getModificationTime()); + Assert.assertEquals(expected.getOwner(), actual.getOwner()); + Assert.assertEquals(expected.getPermission(), actual.getPermission()); + Assert.assertEquals(expected.getReplication(), actual.getReplication()); + } + + class OuterPathInfo { + Path outerPath; + long startOffset; + int length; + byte[] expectedBytes; + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/TestInMemoryFileSystem.java b/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/TestInMemoryFileSystem.java new file mode 100644 index 000000000..9d739148f --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/inline/fs/TestInMemoryFileSystem.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.inline.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; + +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.RANDOM; +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getRandomOuterInMemPath; + +/** + * Unit tests {@link InMemoryFileSystem}. + */ +public class TestInMemoryFileSystem { + + private Configuration conf; + + public TestInMemoryFileSystem() { + conf = new Configuration(); + conf.set("fs." + InMemoryFileSystem.SCHEME + ".impl", InMemoryFileSystem.class.getName()); + } + + @Test + public void testCreateWriteGetFileAsBytes() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + FSDataOutputStream out = outerInMemFSPath.getFileSystem(conf).create(outerInMemFSPath, true); + // write random bytes + byte[] randomBytes = new byte[RANDOM.nextInt(1000)]; + RANDOM.nextBytes(randomBytes); + out.write(randomBytes); + out.close(); + InMemoryFileSystem inMemoryFileSystem = (InMemoryFileSystem) outerInMemFSPath.getFileSystem(conf); + byte[] bytesRead = inMemoryFileSystem.getFileAsBytes(); + Assert.assertArrayEquals(randomBytes, bytesRead); + Assert.assertEquals(InMemoryFileSystem.SCHEME, inMemoryFileSystem.getScheme()); + Assert.assertEquals(URI.create(outerInMemFSPath.toString()), inMemoryFileSystem.getUri()); + } + + @Test + public void testOpen() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + Assert.assertNull(outerInMemFSPath.getFileSystem(conf).open(outerInMemFSPath)); + } + + @Test + public void testAppend() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + Assert.assertNull(outerInMemFSPath.getFileSystem(conf).append(outerInMemFSPath)); + } + + @Test + public void testRename() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + try { + outerInMemFSPath.getFileSystem(conf).rename(outerInMemFSPath, outerInMemFSPath); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testDelete() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + try { + outerInMemFSPath.getFileSystem(conf).delete(outerInMemFSPath, true); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testgetWorkingDir() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + Assert.assertNull(outerInMemFSPath.getFileSystem(conf).getWorkingDirectory()); + } + + @Test + public void testsetWorkingDirectory() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + try { + outerInMemFSPath.getFileSystem(conf).setWorkingDirectory(outerInMemFSPath); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testExists() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + try { + outerInMemFSPath.getFileSystem(conf).exists(outerInMemFSPath); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testFileStatus() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + try { + outerInMemFSPath.getFileSystem(conf).getFileStatus(outerInMemFSPath); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + + @Test + public void testListStatus() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + try { + outerInMemFSPath.getFileSystem(conf).listStatus(outerInMemFSPath); + Assert.fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // ignore + } + } + +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/inline/fs/TestParquetInLining.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/inline/fs/TestParquetInLining.java new file mode 100644 index 000000000..9479e284b --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/inline/fs/TestParquetInLining.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.inline.fs; + +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.inline.fs.FileSystemTestUtils; +import org.apache.hudi.common.inline.fs.InLineFileSystem; +import org.apache.hudi.common.inline.fs.InMemoryFileSystem; +import org.apache.hudi.common.model.HoodieRecord; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.FILE_SCHEME; +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getPhantomFile; +import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getRandomOuterInMemPath; + +/** + * Tests {@link InLineFileSystem} with Parquet writer and reader. hudi-common can't access HoodieTestDataGenerator. + * Hence keeping this test in hudi-utilities. + */ +public class TestParquetInLining { + + private final Configuration inMemoryConf; + private final Configuration inlineConf; + private Path generatedPath; + + public TestParquetInLining() { + inMemoryConf = new Configuration(); + inMemoryConf.set("fs." + InMemoryFileSystem.SCHEME + ".impl", InMemoryFileSystem.class.getName()); + inlineConf = new Configuration(); + inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName()); + } + + @After + public void teardown() throws IOException { + if (generatedPath != null) { + File filePath = new File(generatedPath.toString().substring(generatedPath.toString().indexOf(':') + 1)); + if (filePath.exists()) { + FileSystemTestUtils.deleteFile(filePath); + } + } + } + + @Test + public void testSimpleInlineFileSystem() throws IOException { + Path outerInMemFSPath = getRandomOuterInMemPath(); + Path outerPath = new Path(FILE_SCHEME + outerInMemFSPath.toString().substring(outerInMemFSPath.toString().indexOf(':'))); + generatedPath = outerPath; + ParquetWriter inlineWriter = new AvroParquetWriter(outerInMemFSPath, HoodieTestDataGenerator.AVRO_SCHEMA, + CompressionCodecName.GZIP, 100 * 1024 * 1024, 1024 * 1024, true, inMemoryConf); + // write few records + List recordsToWrite = getParquetHoodieRecords(); + for (GenericRecord rec : recordsToWrite) { + inlineWriter.write(rec); + } + inlineWriter.close(); + byte[] inlineBytes = getBytesToInline(outerInMemFSPath); + long startOffset = generateOuterFile(outerPath, inlineBytes); + + long inlineLength = inlineBytes.length; + + // Generate phantom inline file + Path inlinePath = getPhantomFile(outerPath, startOffset, inlineLength); + + // instantiate Parquet reader + ParquetReader inLineReader = AvroParquetReader.builder(inlinePath).withConf(inlineConf).build(); + List records = readParquetGenericRecords(inLineReader); + Assert.assertArrayEquals(recordsToWrite.toArray(), records.toArray()); + inLineReader.close(); + } + + private long generateOuterFile(Path outerPath, byte[] inlineBytes) throws IOException { + FSDataOutputStream wrappedOut = outerPath.getFileSystem(inMemoryConf).create(outerPath, true); + // write random bytes + writeRandomBytes(wrappedOut, 10); + + // save position for start offset + long startOffset = wrappedOut.getPos(); + // embed inline file + wrappedOut.write(inlineBytes); + + // write random bytes + writeRandomBytes(wrappedOut, 5); + wrappedOut.hsync(); + wrappedOut.close(); + return startOffset; + } + + private byte[] getBytesToInline(Path outerInMemFSPath) throws IOException { + InMemoryFileSystem inMemoryFileSystem = (InMemoryFileSystem) outerInMemFSPath.getFileSystem(inMemoryConf); + return inMemoryFileSystem.getFileAsBytes(); + } + + static List readParquetGenericRecords(ParquetReader reader) throws IOException { + List toReturn = new ArrayList<>(); + Object obj = reader.read(); + while (obj instanceof GenericRecord) { + toReturn.add((GenericRecord) obj); + obj = reader.read(); + } + return toReturn; + } + + private void writeRandomBytes(FSDataOutputStream writer, int count) throws IOException { + for (int i = 0; i < count; i++) { + writer.writeUTF(UUID.randomUUID().toString()); + } + } + + static List getParquetHoodieRecords() throws IOException { + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + String commitTime = "001"; + List hoodieRecords = dataGenerator.generateInsertsWithHoodieAvroPayload(commitTime, 10); + List toReturn = new ArrayList<>(); + for (HoodieRecord record : hoodieRecords) { + toReturn.add((GenericRecord) record.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get()); + } + return toReturn; + } +} diff --git a/pom.xml b/pom.xml index 2274147a8..9db40e4c4 100644 --- a/pom.xml +++ b/pom.xml @@ -683,13 +683,6 @@ - - - org.apache.hbase - hbase-client - ${hbase.version} - - ${hive.groupid}