1
0

[HUDI-430] Adding InlineFileSystem to support embedding any file format as an InlineFile (#1176)

* Adding InlineFileSystem to support embedding any file format (parquet, hfile, etc). Supports reading the embedded file using respective readers.
This commit is contained in:
Sivabalan Narayanan
2020-03-28 12:13:35 -04:00
committed by GitHub
parent 04449f33fe
commit ac73bdcdc3
12 changed files with 1460 additions and 7 deletions

View File

@@ -183,6 +183,7 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- Hoodie - Tests -->

View File

@@ -177,5 +177,20 @@
</exclusion>
</exclusions>
</dependency>
<!-- HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.inline.fs;
import org.apache.hadoop.fs.Path;
/**
* Utils to parse InLineFileSystem paths.
* Inline FS format:
* "inlinefs://<path_to_outer_file>/<outer_file_scheme>/?start_offset=start_offset>&length=<length>"
* Eg: "inlinefs://<path_to_outer_file>/s3a/?start_offset=20&length=40"
*/
public class InLineFSUtils {
private static final String START_OFFSET_STR = "start_offset";
private static final String LENGTH_STR = "length";
private static final String EQUALS_STR = "=";
/**
* Fetch inline file path from outer path.
* Eg
* Input:
* Path = s3a://file1, origScheme: file, startOffset = 20, length = 40
* Output: "inlinefs:/file1/s3a/?start_offset=20&length=40"
*
* @param outerPath
* @param origScheme
* @param inLineStartOffset
* @param inLineLength
* @return
*/
public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) {
String subPath = outerPath.toString().substring(outerPath.toString().indexOf(":") + 1);
return new Path(
InLineFileSystem.SCHEME + "://" + subPath + "/" + origScheme
+ "/" + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
+ "&" + LENGTH_STR + EQUALS_STR + inLineLength
);
}
/**
* Inline file format
* "inlinefs://<path_to_outer_file>/<outer_file_scheme>/?start_offset=start_offset>&length=<length>"
* Outer File format
* "<outer_file_scheme>://<path_to_outer_file>"
* <p>
* Eg input : "inlinefs://file1/sa3/?start_offset=20&length=40".
* Output : "sa3://file1"
*
* @param inlinePath inline file system path
* @return
*/
public static Path getOuterfilePathFromInlinePath(Path inlinePath) {
String scheme = inlinePath.getParent().getName();
Path basePath = inlinePath.getParent().getParent();
return new Path(basePath.toString().replaceFirst(InLineFileSystem.SCHEME, scheme));
}
/**
* Eg input : "inlinefs://file1/s3a/?start_offset=20&length=40".
* output: 20
*
* @param inlinePath
* @return
*/
public static int startOffset(Path inlinePath) {
String[] slices = inlinePath.toString().split("[?&=]");
return Integer.parseInt(slices[slices.length - 3]);
}
/**
* Eg input : "inlinefs:/file1/s3a/?start_offset=20&length=40".
* Output: 40
*
* @param inlinePath
* @return
*/
public static int length(Path inlinePath) {
String[] slices = inlinePath.toString().split("[?&=]");
return Integer.parseInt(slices[slices.length - 1]);
}
}

View File

@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.inline.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.net.URI;
/**
* Enables reading any inline file at a given offset and length. This {@link FileSystem} is used only in read path and does not support
* any write apis.
* <p>
* - Reading an inlined file at a given offset, length, read it out as if it were an independent file of that length
* - Inlined path is of the form "inlinefs:///path/to/outer/file/<outer_file_scheme>/?start_offset=<start_offset>&length=<length>
* <p>
* TODO: The reader/writer may try to use relative paths based on the inlinepath and it may not work. Need to handle
* this gracefully eg. the parquet summary metadata reading. TODO: If this shows promise, also support directly writing
* the inlined file to the underneath file without buffer
*/
public class InLineFileSystem extends FileSystem {
public static final String SCHEME = "inlinefs";
private Configuration conf = null;
@Override
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
this.conf = conf;
}
@Override
public URI getUri() {
return URI.create(getScheme());
}
public String getScheme() {
return SCHEME;
}
@Override
public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException {
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize);
return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath));
}
@Override
public boolean exists(Path f) {
try {
return getFileStatus(f) != null;
} catch (Exception e) {
return false;
}
}
@Override
public FileStatus getFileStatus(Path inlinePath) throws IOException {
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FileStatus status = outerFs.getFileStatus(outerPath);
FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
status.getModificationTime(), status.getAccessTime(), status.getPermission(), status.getOwner(),
status.getGroup(), inlinePath);
return toReturn;
}
@Override
public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l,
Progressable progressable) throws IOException {
throw new UnsupportedOperationException("Can't rename files");
}
@Override
public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
throw new UnsupportedOperationException("Can't rename files");
}
@Override
public boolean rename(Path path, Path path1) throws IOException {
throw new UnsupportedOperationException("Can't rename files");
}
@Override
public boolean delete(Path path, boolean b) throws IOException {
throw new UnsupportedOperationException("Can't delete files");
}
@Override
public FileStatus[] listStatus(Path inlinePath) throws IOException {
return new FileStatus[] {getFileStatus(inlinePath)};
}
@Override
public void setWorkingDirectory(Path path) {
throw new UnsupportedOperationException("Can't set working directory");
}
@Override
public Path getWorkingDirectory() {
throw new UnsupportedOperationException("Can't get working directory");
}
@Override
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
throw new UnsupportedOperationException("Can't set working directory");
}
}

View File

@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.inline.fs;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.io.ByteBufferPool;
import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
/**
* Inline {@link FSDataInputStream}. A startOffset that is passed in is assumed to be the start of the InputStream.
* All operations are handled having the {@code startOffset} as starting point.
*/
public class InLineFsDataInputStream extends FSDataInputStream {
private final int startOffset;
private final FSDataInputStream outerStream;
private final int length;
public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) {
super(outerStream.getWrappedStream());
this.startOffset = startOffset;
this.outerStream = outerStream;
this.length = length;
}
@Override
public void seek(long desired) throws IOException {
outerStream.seek(startOffset + desired);
}
@Override
public long getPos() throws IOException {
return outerStream.getPos() - startOffset;
}
@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
return outerStream.read(startOffset + position, buffer, offset, length);
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
outerStream.readFully(startOffset + position, buffer, offset, length);
}
@Override
public void readFully(long position, byte[] buffer)
throws IOException {
outerStream.readFully(startOffset + position, buffer, 0, buffer.length);
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
boolean toReturn = outerStream.seekToNewSource(startOffset + targetPos);
return toReturn;
}
@Override
public int read(ByteBuffer buf) throws IOException {
return outerStream.read(buf);
}
@Override
public FileDescriptor getFileDescriptor() throws IOException {
return outerStream.getFileDescriptor();
}
@Override
public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
outerStream.setReadahead(readahead);
}
@Override
public void setDropBehind(Boolean dropBehind) throws IOException, UnsupportedOperationException {
outerStream.setDropBehind(dropBehind);
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
return outerStream.read(bufferPool, maxLength, opts);
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
outerStream.releaseBuffer(buffer);
}
@Override
public void unbuffer() {
outerStream.unbuffer();
}
}

View File

@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.inline.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* A FileSystem which stores all content in memory and returns a byte[] when {@link #getFileAsBytes()} is called
* This FileSystem is used only in write path. Does not support any read apis except {@link #getFileAsBytes()}.
*/
public class InMemoryFileSystem extends FileSystem {
// TODO: this needs to be per path to support num_cores > 1, and we should release the buffer once done
private ByteArrayOutputStream bos;
private Configuration conf = null;
public static final String SCHEME = "inmemfs";
private URI uri;
InMemoryFileSystem() {
}
@Override
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
this.conf = conf;
this.uri = name;
}
@Override
public URI getUri() {
return uri;
}
public String getScheme() {
return SCHEME;
}
@Override
public FSDataInputStream open(Path inlinePath, int bufferSize) {
return null;
}
@Override
public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l,
Progressable progressable) throws IOException {
bos = new ByteArrayOutputStream();
try {
this.uri = new URI(path.toString());
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Path not parsable as URI " + path);
}
return new FSDataOutputStream(bos, new Statistics(getScheme()));
}
public byte[] getFileAsBytes() {
return bos.toByteArray();
}
@Override
public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
return null;
}
@Override
public boolean rename(Path path, Path path1) throws IOException {
throw new UnsupportedOperationException("Can't rename files");
}
@Override
public boolean delete(Path path, boolean b) throws IOException {
throw new UnsupportedOperationException("Can't delete files");
}
@Override
public FileStatus[] listStatus(Path inlinePath) throws FileNotFoundException, IOException {
throw new UnsupportedOperationException("No support for listStatus");
}
@Override
public void setWorkingDirectory(Path path) {
throw new UnsupportedOperationException("Can't set working directory");
}
@Override
public Path getWorkingDirectory() {
return null;
}
@Override
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
throw new UnsupportedOperationException("Can't mkdir");
}
@Override
public boolean exists(Path f) throws IOException {
throw new UnsupportedOperationException("Can't check for exists");
}
@Override
public FileStatus getFileStatus(Path inlinePath) throws IOException {
throw new UnsupportedOperationException("No support for getFileStatus");
}
@Override
public void close() throws IOException {
super.close();
bos.close();
}
}

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.inline.fs;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
/**
* Utils class to assist in testing {@link InMemoryFileSystem} and {@link InLineFileSystem}.
*/
public class FileSystemTestUtils {
public static final String TEMP = "tmp";
public static final String FORWARD_SLASH = "/";
public static final String FILE_SCHEME = "file";
public static final String COLON = ":";
static final Random RANDOM = new Random();
public static Path getRandomOuterInMemPath() {
String randomFileName = UUID.randomUUID().toString();
String fileSuffix = COLON + FORWARD_SLASH + TEMP + FORWARD_SLASH + randomFileName;
return new Path(InMemoryFileSystem.SCHEME + fileSuffix);
}
static Path getRandomOuterFSPath() {
String randomFileName = UUID.randomUUID().toString();
String fileSuffix = COLON + FORWARD_SLASH + TEMP + FORWARD_SLASH + randomFileName;
return new Path(FILE_SCHEME + fileSuffix);
}
public static Path getPhantomFile(Path outerPath, long startOffset, long inlineLength) {
// Generate phathom inline file
return InLineFSUtils.getInlineFilePath(outerPath, FILE_SCHEME, startOffset, inlineLength);
}
public static void deleteFile(File fileToDelete) throws IOException {
if (!fileToDelete.exists()) {
return;
}
if (!fileToDelete.delete()) {
String message =
"Unable to delete file " + fileToDelete + ".";
throw new IOException(message);
}
}
}

View File

@@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.inline.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.FILE_SCHEME;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getPhantomFile;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getRandomOuterInMemPath;
/**
* Tests {@link InLineFileSystem} to inline HFile.
*/
public class TestHFileInLining {
private final Configuration inMemoryConf;
private final Configuration inlineConf;
private final int minBlockSize = 1024;
private static final String LOCAL_FORMATTER = "%010d";
private int maxRows = 100 + RANDOM.nextInt(1000);
private Path generatedPath;
public TestHFileInLining() {
inMemoryConf = new Configuration();
inMemoryConf.set("fs." + InMemoryFileSystem.SCHEME + ".impl", InMemoryFileSystem.class.getName());
inlineConf = new Configuration();
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName());
}
@After
public void teardown() throws IOException {
if (generatedPath != null) {
File filePath = new File(generatedPath.toString().substring(generatedPath.toString().indexOf(':') + 1));
if (filePath.exists()) {
FileSystemTestUtils.deleteFile(filePath);
}
}
}
@Test
public void testSimpleInlineFileSystem() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
Path outerPath = new Path(FILE_SCHEME + outerInMemFSPath.toString().substring(outerInMemFSPath.toString().indexOf(':')));
generatedPath = outerPath;
CacheConfig cacheConf = new CacheConfig(inMemoryConf);
FSDataOutputStream fout = createFSOutput(outerInMemFSPath, inMemoryConf);
HFileContext meta = new HFileContextBuilder()
.withBlockSize(minBlockSize)
.build();
HFile.Writer writer = HFile.getWriterFactory(inMemoryConf, cacheConf)
.withOutputStream(fout)
.withFileContext(meta)
.withComparator(new KeyValue.KVComparator())
.create();
writeRecords(writer);
fout.close();
byte[] inlineBytes = getBytesToInline(outerInMemFSPath);
long startOffset = generateOuterFile(outerPath, inlineBytes);
long inlineLength = inlineBytes.length;
// Generate phantom inline file
Path inlinePath = getPhantomFile(outerPath, startOffset, inlineLength);
InLineFileSystem inlineFileSystem = (InLineFileSystem) inlinePath.getFileSystem(inlineConf);
FSDataInputStream fin = inlineFileSystem.open(inlinePath);
HFile.Reader reader = HFile.createReader(inlineFileSystem, inlinePath, cacheConf, inlineConf);
// Load up the index.
reader.loadFileInfo();
// Get a scanner that caches and that does not use pread.
HFileScanner scanner = reader.getScanner(true, false);
// Align scanner at start of the file.
scanner.seekTo();
readAllRecords(scanner);
Set<Integer> rowIdsToSearch = getRandomValidRowIds(10);
for (int rowId : rowIdsToSearch) {
Assert.assertTrue("location lookup failed",
scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId))) == 0);
// read the key and see if it matches
ByteBuffer readKey = scanner.getKey();
Assert.assertTrue("seeked key does not match", Arrays.equals(getSomeKey(rowId),
Bytes.toBytes(readKey)));
scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId)));
ByteBuffer val1 = scanner.getValue();
scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId)));
ByteBuffer val2 = scanner.getValue();
Assert.assertTrue(Arrays.equals(Bytes.toBytes(val1), Bytes.toBytes(val2)));
}
int[] invalidRowIds = {-4, maxRows, maxRows + 1, maxRows + 120, maxRows + 160, maxRows + 1000};
for (int rowId : invalidRowIds) {
Assert.assertFalse("location lookup should have failed",
scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId))) == 0);
}
reader.close();
fin.close();
outerPath.getFileSystem(inMemoryConf).delete(outerPath, true);
}
private Set<Integer> getRandomValidRowIds(int count) {
Set<Integer> rowIds = new HashSet<>();
while (rowIds.size() < count) {
int index = RANDOM.nextInt(maxRows);
if (!rowIds.contains(index)) {
rowIds.add(index);
}
}
return rowIds;
}
private byte[] getSomeKey(int rowId) {
KeyValue kv = new KeyValue(String.format(LOCAL_FORMATTER, Integer.valueOf(rowId)).getBytes(),
Bytes.toBytes("family"), Bytes.toBytes("qual"), HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put);
return kv.getKey();
}
private FSDataOutputStream createFSOutput(Path name, Configuration conf) throws IOException {
return name.getFileSystem(conf).create(name);
}
private void writeRecords(HFile.Writer writer) throws IOException {
writeSomeRecords(writer);
writer.close();
}
private int writeSomeRecords(HFile.Writer writer)
throws IOException {
String value = "value";
KeyValue kv;
for (int i = 0; i < (maxRows); i++) {
String key = String.format(LOCAL_FORMATTER, Integer.valueOf(i));
kv = new KeyValue(Bytes.toBytes(key), Bytes.toBytes("family"), Bytes.toBytes("qual"),
Bytes.toBytes(value + key));
writer.append(kv);
}
return (maxRows);
}
private void readAllRecords(HFileScanner scanner) throws IOException {
readAndCheckbytes(scanner, 0, maxRows);
}
// read the records and check
private int readAndCheckbytes(HFileScanner scanner, int start, int n)
throws IOException {
String value = "value";
int i = start;
for (; i < (start + n); i++) {
ByteBuffer key = scanner.getKey();
ByteBuffer val = scanner.getValue();
String keyStr = String.format(LOCAL_FORMATTER, Integer.valueOf(i));
String valStr = value + keyStr;
KeyValue kv = new KeyValue(Bytes.toBytes(keyStr), Bytes.toBytes("family"),
Bytes.toBytes("qual"), Bytes.toBytes(valStr));
byte[] keyBytes = new KeyValue.KeyOnlyKeyValue(Bytes.toBytes(key), 0,
Bytes.toBytes(key).length).getKey();
Assert.assertTrue("bytes for keys do not match " + keyStr + " "
+ Bytes.toString(Bytes.toBytes(key)), Arrays.equals(kv.getKey(), keyBytes));
byte[] valBytes = Bytes.toBytes(val);
Assert.assertTrue("bytes for vals do not match " + valStr + " "
+ Bytes.toString(valBytes), Arrays.equals(Bytes.toBytes(valStr), valBytes));
if (!scanner.next()) {
break;
}
}
Assert.assertEquals(i, start + n - 1);
return (start + n);
}
private long generateOuterFile(Path outerPath, byte[] inlineBytes) throws IOException {
FSDataOutputStream wrappedOut = outerPath.getFileSystem(inMemoryConf).create(outerPath, true);
// write random bytes
writeRandomBytes(wrappedOut, 10);
// save position for start offset
long startOffset = wrappedOut.getPos();
// embed inline file
wrappedOut.write(inlineBytes);
// write random bytes
writeRandomBytes(wrappedOut, 5);
wrappedOut.hsync();
wrappedOut.close();
return startOffset;
}
private byte[] getBytesToInline(Path outerInMemFSPath) throws IOException {
InMemoryFileSystem inMemoryFileSystem = (InMemoryFileSystem) outerInMemFSPath.getFileSystem(inMemoryConf);
return inMemoryFileSystem.getFileAsBytes();
}
private void writeRandomBytes(FSDataOutputStream writer, int count) throws IOException {
for (int i = 0; i < count; i++) {
writer.writeUTF(UUID.randomUUID().toString());
}
}
}

View File

@@ -0,0 +1,356 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.inline.fs;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getRandomOuterFSPath;
/**
* Tests {@link InLineFileSystem}.
*/
public class TestInLineFileSystem {
private Configuration conf;
private List<Path> listOfGeneratedPaths;
public TestInLineFileSystem() {
conf = new Configuration();
conf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName());
this.listOfGeneratedPaths = new ArrayList<>();
}
@After
public void teardown() throws IOException {
for (Path pathToDelete : listOfGeneratedPaths) {
File filePath = new File(pathToDelete.toString().substring(pathToDelete.toString().indexOf(':') + 1));
if (filePath.exists()) {
FileSystemTestUtils.deleteFile(filePath);
}
}
}
@Test
public void testReadInlineFile() throws IOException {
Path outerPath = getRandomOuterFSPath();
listOfGeneratedPaths.add(outerPath);
int totalSlices = 5; // embed n slices so that we can test N inline seqPaths
List<Pair<Long, Integer>> startOffsetLengthPairs = new ArrayList<>();
List<byte[]> expectedByteArrays = new ArrayList<>();
FSDataOutputStream wrappedOut = outerPath.getFileSystem(conf).create(outerPath, true);
for (int i = 0; i < totalSlices; i++) {
// append random bytes
byte[] randomBytes = new byte[RANDOM.nextInt(1000)];
RANDOM.nextBytes(randomBytes);
wrappedOut.write(randomBytes);
long startOffset = wrappedOut.getPos();
// add inline content
byte[] embeddedInlineBytes = new byte[RANDOM.nextInt(1000)];
RANDOM.nextBytes(embeddedInlineBytes);
wrappedOut.write(embeddedInlineBytes);
expectedByteArrays.add(embeddedInlineBytes);
startOffsetLengthPairs.add(Pair.of(startOffset, embeddedInlineBytes.length));
}
// suffix random bytes
byte[] randomBytes = new byte[RANDOM.nextInt(1000)];
RANDOM.nextBytes(randomBytes);
wrappedOut.write(randomBytes);
wrappedOut.flush();
wrappedOut.close();
FileStatus expectedFileStatus = outerPath.getFileSystem(conf).getFileStatus(outerPath);
for (int i = 0; i < totalSlices; i++) {
Pair<Long, Integer> startOffsetLengthPair = startOffsetLengthPairs.get(i);
byte[] expectedBytes = expectedByteArrays.get(i);
Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPath, startOffsetLengthPair.getLeft(), startOffsetLengthPair.getRight());
InLineFileSystem inlineFileSystem = (InLineFileSystem) inlinePath.getFileSystem(conf);
FSDataInputStream fsDataInputStream = inlineFileSystem.open(inlinePath);
Assert.assertTrue(inlineFileSystem.exists(inlinePath));
verifyFileStatus(expectedFileStatus, inlinePath, startOffsetLengthPair.getRight(), inlineFileSystem.getFileStatus(inlinePath));
FileStatus[] actualFileStatuses = inlineFileSystem.listStatus(inlinePath);
Assert.assertEquals(1, actualFileStatuses.length);
verifyFileStatus(expectedFileStatus, inlinePath, startOffsetLengthPair.getRight(), actualFileStatuses[0]);
byte[] actualBytes = new byte[expectedBytes.length];
fsDataInputStream.readFully(0, actualBytes);
Assert.assertArrayEquals(expectedBytes, actualBytes);
fsDataInputStream.close();
Assert.assertEquals(InLineFileSystem.SCHEME, inlineFileSystem.getScheme());
Assert.assertEquals(URI.create(InLineFileSystem.SCHEME), inlineFileSystem.getUri());
}
}
@Test
public void testFileSystemApis() throws IOException {
OuterPathInfo outerPathInfo = generateOuterFileAndGetInfo(1000);
Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPathInfo.outerPath, outerPathInfo.startOffset, outerPathInfo.length);
InLineFileSystem inlineFileSystem = (InLineFileSystem) inlinePath.getFileSystem(conf);
FSDataInputStream fsDataInputStream = inlineFileSystem.open(inlinePath);
byte[] actualBytes = new byte[outerPathInfo.expectedBytes.length];
// verify pos
Assert.assertEquals(0 - outerPathInfo.startOffset, fsDataInputStream.getPos());
fsDataInputStream.readFully(0, actualBytes);
Assert.assertArrayEquals(outerPathInfo.expectedBytes, actualBytes);
// read partial data
// test read(long position, byte[] buffer, int offset, int length)
actualBytes = new byte[100];
fsDataInputStream.read(0, actualBytes, 10, 10);
verifyArrayEquality(outerPathInfo.expectedBytes, 0, 10, actualBytes, 10, 10);
actualBytes = new byte[310];
fsDataInputStream.read(25, actualBytes, 100, 210);
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210);
// give length to read > than actual inline content
actualBytes = new byte[1100];
try {
fsDataInputStream.read(0, actualBytes, 0, 1101);
Assert.fail("Should have thrown IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException e) {
// no op
}
// test readFully(long position, byte[] buffer, int offset, int length)
actualBytes = new byte[100];
fsDataInputStream.readFully(0, actualBytes, 10, 20);
verifyArrayEquality(outerPathInfo.expectedBytes, 0, 10, actualBytes, 10, 10);
actualBytes = new byte[310];
fsDataInputStream.readFully(25, actualBytes, 100, 210);
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210);
// give length to read > than actual inline content
actualBytes = new byte[1100];
try {
fsDataInputStream.readFully(0, actualBytes, 0, 1101);
Assert.fail("Should have thrown IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException e) {
// no op
}
// test readFully(long position, byte[] buffer)
actualBytes = new byte[100];
fsDataInputStream.readFully(0, actualBytes);
verifyArrayEquality(outerPathInfo.expectedBytes, 0, 100, actualBytes, 0, 100);
actualBytes = new byte[310];
fsDataInputStream.readFully(25, actualBytes);
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 0, 210);
// give length to read > than actual inline content
actualBytes = new byte[1100];
fsDataInputStream.readFully(0, actualBytes);
verifyArrayEquality(outerPathInfo.expectedBytes, 0, 1000, actualBytes, 0, 1000);
// TODO. seek does not move the position. need to investigate.
// test seekToNewSource(long targetPos)
/* fsDataInputStream.seekToNewSource(75);
Assert.assertEquals(outerPathInfo.startOffset + 75, fsDataInputStream.getPos());
fsDataInputStream.seekToNewSource(180);
Assert.assertEquals(outerPathInfo.startOffset + 180, fsDataInputStream.getPos());
fsDataInputStream.seekToNewSource(910);
Assert.assertEquals(outerPathInfo.startOffset + 910, fsDataInputStream.getPos());
*/
// test read(ByteBuffer buf)
ByteBuffer actualByteBuffer = ByteBuffer.allocate(100);
try {
fsDataInputStream.read(actualByteBuffer);
Assert.fail("Should have thrown");
} catch (UnsupportedOperationException e) {
// ignore
}
Assert.assertEquals(outerPathInfo.outerPath.getFileSystem(conf).open(outerPathInfo.outerPath).getFileDescriptor(), fsDataInputStream.getFileDescriptor());
try {
fsDataInputStream.setReadahead(10L);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
try {
fsDataInputStream.setDropBehind(true);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
// yet to test
// read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
// releaseBuffer(ByteBuffer buffer)
// unbuffer()
fsDataInputStream.close();
}
private void verifyArrayEquality(byte[] expected, int expectedOffset, int expectedLength,
byte[] actual, int actualOffset, int actualLength) {
Assert.assertArrayEquals(Arrays.copyOfRange(expected, expectedOffset, expectedOffset + expectedLength), Arrays.copyOfRange(actual, actualOffset, actualOffset + actualLength));
}
private OuterPathInfo generateOuterFileAndGetInfo(int inlineContentSize) throws IOException {
OuterPathInfo toReturn = new OuterPathInfo();
Path outerPath = getRandomOuterFSPath();
listOfGeneratedPaths.add(outerPath);
toReturn.outerPath = outerPath;
FSDataOutputStream wrappedOut = outerPath.getFileSystem(conf).create(outerPath, true);
// append random bytes
byte[] randomBytes = new byte[RANDOM.nextInt(1000)];
RANDOM.nextBytes(randomBytes);
wrappedOut.write(randomBytes);
toReturn.startOffset = wrappedOut.getPos();
// add inline content
byte[] embeddedInlineBytes = new byte[inlineContentSize];
RANDOM.nextBytes(embeddedInlineBytes);
wrappedOut.write(embeddedInlineBytes);
toReturn.expectedBytes = embeddedInlineBytes;
toReturn.length = embeddedInlineBytes.length;
// suffix random bytes
randomBytes = new byte[RANDOM.nextInt(1000)];
RANDOM.nextBytes(randomBytes);
wrappedOut.write(randomBytes);
wrappedOut.flush();
wrappedOut.close();
return toReturn;
}
@Test
public void testOpen() throws IOException {
Path inlinePath = getRandomInlinePath();
// open non existant path
try {
inlinePath.getFileSystem(conf).open(inlinePath);
Assert.fail("Should have thrown exception");
} catch (FileNotFoundException e) {
// ignore
}
}
@Test
public void testCreate() throws IOException {
Path inlinePath = getRandomInlinePath();
try {
inlinePath.getFileSystem(conf).create(inlinePath, true);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testAppend() throws IOException {
Path inlinePath = getRandomInlinePath();
try {
inlinePath.getFileSystem(conf).append(inlinePath);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testRename() throws IOException {
Path inlinePath = getRandomInlinePath();
try {
inlinePath.getFileSystem(conf).rename(inlinePath, inlinePath);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testDelete() throws IOException {
Path inlinePath = getRandomInlinePath();
try {
inlinePath.getFileSystem(conf).delete(inlinePath, true);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testgetWorkingDir() throws IOException {
Path inlinePath = getRandomInlinePath();
try {
inlinePath.getFileSystem(conf).getWorkingDirectory();
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testsetWorkingDirectory() throws IOException {
Path inlinePath = getRandomInlinePath();
try {
inlinePath.getFileSystem(conf).setWorkingDirectory(inlinePath);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testExists() throws IOException {
Path inlinePath = getRandomInlinePath();
Assert.assertFalse(inlinePath.getFileSystem(conf).exists(inlinePath));
}
private Path getRandomInlinePath() {
Path outerPath = getRandomOuterFSPath();
listOfGeneratedPaths.add(outerPath);
return FileSystemTestUtils.getPhantomFile(outerPath, 100, 100);
}
private void verifyFileStatus(FileStatus expected, Path inlinePath, long expectedLength, FileStatus actual) {
Assert.assertEquals(inlinePath, actual.getPath());
Assert.assertEquals(expectedLength, actual.getLen());
Assert.assertEquals(expected.getAccessTime(), actual.getAccessTime());
Assert.assertEquals(expected.getBlockSize(), actual.getBlockSize());
Assert.assertEquals(expected.getGroup(), actual.getGroup());
Assert.assertEquals(expected.getModificationTime(), actual.getModificationTime());
Assert.assertEquals(expected.getOwner(), actual.getOwner());
Assert.assertEquals(expected.getPermission(), actual.getPermission());
Assert.assertEquals(expected.getReplication(), actual.getReplication());
}
class OuterPathInfo {
Path outerPath;
long startOffset;
int length;
byte[] expectedBytes;
}
}

View File

@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.inline.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getRandomOuterInMemPath;
/**
* Unit tests {@link InMemoryFileSystem}.
*/
public class TestInMemoryFileSystem {
private Configuration conf;
public TestInMemoryFileSystem() {
conf = new Configuration();
conf.set("fs." + InMemoryFileSystem.SCHEME + ".impl", InMemoryFileSystem.class.getName());
}
@Test
public void testCreateWriteGetFileAsBytes() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
FSDataOutputStream out = outerInMemFSPath.getFileSystem(conf).create(outerInMemFSPath, true);
// write random bytes
byte[] randomBytes = new byte[RANDOM.nextInt(1000)];
RANDOM.nextBytes(randomBytes);
out.write(randomBytes);
out.close();
InMemoryFileSystem inMemoryFileSystem = (InMemoryFileSystem) outerInMemFSPath.getFileSystem(conf);
byte[] bytesRead = inMemoryFileSystem.getFileAsBytes();
Assert.assertArrayEquals(randomBytes, bytesRead);
Assert.assertEquals(InMemoryFileSystem.SCHEME, inMemoryFileSystem.getScheme());
Assert.assertEquals(URI.create(outerInMemFSPath.toString()), inMemoryFileSystem.getUri());
}
@Test
public void testOpen() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
Assert.assertNull(outerInMemFSPath.getFileSystem(conf).open(outerInMemFSPath));
}
@Test
public void testAppend() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
Assert.assertNull(outerInMemFSPath.getFileSystem(conf).append(outerInMemFSPath));
}
@Test
public void testRename() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
try {
outerInMemFSPath.getFileSystem(conf).rename(outerInMemFSPath, outerInMemFSPath);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testDelete() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
try {
outerInMemFSPath.getFileSystem(conf).delete(outerInMemFSPath, true);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testgetWorkingDir() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
Assert.assertNull(outerInMemFSPath.getFileSystem(conf).getWorkingDirectory());
}
@Test
public void testsetWorkingDirectory() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
try {
outerInMemFSPath.getFileSystem(conf).setWorkingDirectory(outerInMemFSPath);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testExists() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
try {
outerInMemFSPath.getFileSystem(conf).exists(outerInMemFSPath);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testFileStatus() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
try {
outerInMemFSPath.getFileSystem(conf).getFileStatus(outerInMemFSPath);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
@Test
public void testListStatus() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
try {
outerInMemFSPath.getFileSystem(conf).listStatus(outerInMemFSPath);
Assert.fail("Should have thrown exception");
} catch (UnsupportedOperationException e) {
// ignore
}
}
}

View File

@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.inline.fs;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.inline.fs.FileSystemTestUtils;
import org.apache.hudi.common.inline.fs.InLineFileSystem;
import org.apache.hudi.common.inline.fs.InMemoryFileSystem;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.FILE_SCHEME;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getPhantomFile;
import static org.apache.hudi.common.inline.fs.FileSystemTestUtils.getRandomOuterInMemPath;
/**
* Tests {@link InLineFileSystem} with Parquet writer and reader. hudi-common can't access HoodieTestDataGenerator.
* Hence keeping this test in hudi-utilities.
*/
public class TestParquetInLining {
private final Configuration inMemoryConf;
private final Configuration inlineConf;
private Path generatedPath;
public TestParquetInLining() {
inMemoryConf = new Configuration();
inMemoryConf.set("fs." + InMemoryFileSystem.SCHEME + ".impl", InMemoryFileSystem.class.getName());
inlineConf = new Configuration();
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName());
}
@After
public void teardown() throws IOException {
if (generatedPath != null) {
File filePath = new File(generatedPath.toString().substring(generatedPath.toString().indexOf(':') + 1));
if (filePath.exists()) {
FileSystemTestUtils.deleteFile(filePath);
}
}
}
@Test
public void testSimpleInlineFileSystem() throws IOException {
Path outerInMemFSPath = getRandomOuterInMemPath();
Path outerPath = new Path(FILE_SCHEME + outerInMemFSPath.toString().substring(outerInMemFSPath.toString().indexOf(':')));
generatedPath = outerPath;
ParquetWriter inlineWriter = new AvroParquetWriter(outerInMemFSPath, HoodieTestDataGenerator.AVRO_SCHEMA,
CompressionCodecName.GZIP, 100 * 1024 * 1024, 1024 * 1024, true, inMemoryConf);
// write few records
List<GenericRecord> recordsToWrite = getParquetHoodieRecords();
for (GenericRecord rec : recordsToWrite) {
inlineWriter.write(rec);
}
inlineWriter.close();
byte[] inlineBytes = getBytesToInline(outerInMemFSPath);
long startOffset = generateOuterFile(outerPath, inlineBytes);
long inlineLength = inlineBytes.length;
// Generate phantom inline file
Path inlinePath = getPhantomFile(outerPath, startOffset, inlineLength);
// instantiate Parquet reader
ParquetReader inLineReader = AvroParquetReader.builder(inlinePath).withConf(inlineConf).build();
List<GenericRecord> records = readParquetGenericRecords(inLineReader);
Assert.assertArrayEquals(recordsToWrite.toArray(), records.toArray());
inLineReader.close();
}
private long generateOuterFile(Path outerPath, byte[] inlineBytes) throws IOException {
FSDataOutputStream wrappedOut = outerPath.getFileSystem(inMemoryConf).create(outerPath, true);
// write random bytes
writeRandomBytes(wrappedOut, 10);
// save position for start offset
long startOffset = wrappedOut.getPos();
// embed inline file
wrappedOut.write(inlineBytes);
// write random bytes
writeRandomBytes(wrappedOut, 5);
wrappedOut.hsync();
wrappedOut.close();
return startOffset;
}
private byte[] getBytesToInline(Path outerInMemFSPath) throws IOException {
InMemoryFileSystem inMemoryFileSystem = (InMemoryFileSystem) outerInMemFSPath.getFileSystem(inMemoryConf);
return inMemoryFileSystem.getFileAsBytes();
}
static List<GenericRecord> readParquetGenericRecords(ParquetReader reader) throws IOException {
List<GenericRecord> toReturn = new ArrayList<>();
Object obj = reader.read();
while (obj instanceof GenericRecord) {
toReturn.add((GenericRecord) obj);
obj = reader.read();
}
return toReturn;
}
private void writeRandomBytes(FSDataOutputStream writer, int count) throws IOException {
for (int i = 0; i < count; i++) {
writer.writeUTF(UUID.randomUUID().toString());
}
}
static List<GenericRecord> getParquetHoodieRecords() throws IOException {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
String commitTime = "001";
List<HoodieRecord> hoodieRecords = dataGenerator.generateInsertsWithHoodieAvroPayload(commitTime, 10);
List<GenericRecord> toReturn = new ArrayList<>();
for (HoodieRecord record : hoodieRecords) {
toReturn.add((GenericRecord) record.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get());
}
return toReturn;
}
}

View File

@@ -683,13 +683,6 @@
</exclusions>
</dependency>
<!-- HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>