1
0

[HUDI-1180] Upgrade HBase to 2.4.9 (#5004)

Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
This commit is contained in:
Y Ethan Guo
2022-03-24 19:04:53 -07:00
committed by GitHub
parent 5e86cdd1e9
commit eaa4c4f2e2
41 changed files with 3866 additions and 746 deletions

View File

@@ -33,10 +33,12 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHFileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@@ -178,9 +180,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
private static HFile.Reader createReader(String hFilePath, Configuration conf, FileSystem fileSystem) {
try {
LOG.info("Opening HFile for reading :" + hFilePath);
HFile.Reader reader = HFile.createReader(fileSystem, new HFilePathForReader(hFilePath),
new CacheConfig(conf), conf);
return reader;
return HoodieHFileUtils.createHFileReader(fileSystem, new HFilePathForReader(hFilePath), new CacheConfig(conf), conf);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
@@ -259,7 +259,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
private HoodieBootstrapIndexInfo fetchBootstrapIndexInfo() throws IOException {
return TimelineMetadataUtils.deserializeAvroMetadata(
partitionIndexReader().loadFileInfo().get(INDEX_INFO_KEY),
partitionIndexReader().getHFileInfo().get(INDEX_INFO_KEY),
HoodieBootstrapIndexInfo.class);
}
@@ -306,7 +306,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
try {
boolean available = scanner.seekTo();
while (available) {
keys.add(converter.apply(getUserKeyFromCellKey(CellUtil.getCellKeyAsString(scanner.getKeyValue()))));
keys.add(converter.apply(getUserKeyFromCellKey(CellUtil.getCellKeyAsString(scanner.getCell()))));
available = scanner.next();
}
} catch (IOException ioe) {
@@ -528,13 +528,13 @@ public class HFileBootstrapIndex extends BootstrapIndex {
@Override
public void begin() {
try {
HFileContext meta = new HFileContextBuilder().build();
HFileContext meta = new HFileContextBuilder().withCellComparator(new HoodieKVComparator()).build();
this.indexByPartitionWriter = HFile.getWriterFactory(metaClient.getHadoopConf(),
new CacheConfig(metaClient.getHadoopConf())).withPath(metaClient.getFs(), indexByPartitionPath)
.withFileContext(meta).withComparator(new HoodieKVComparator()).create();
.withFileContext(meta).create();
this.indexByFileIdWriter = HFile.getWriterFactory(metaClient.getHadoopConf(),
new CacheConfig(metaClient.getHadoopConf())).withPath(metaClient.getFs(), indexByFileIdPath)
.withFileContext(meta).withComparator(new HoodieKVComparator()).create();
.withFileContext(meta).create();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
@@ -581,6 +581,6 @@ public class HFileBootstrapIndex extends BootstrapIndex {
* This class is explicitly used as Key Comparator to workaround hard coded
* legacy format class names inside HBase. Otherwise we will face issues with shading.
*/
public static class HoodieKVComparator extends KeyValue.KVComparator {
public static class HoodieKVComparator extends CellComparatorImpl {
}
}

View File

@@ -424,6 +424,9 @@ public abstract class AbstractHoodieLogRecordReader {
processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
break;
case HFILE_DATA_BLOCK:
if (!keys.isPresent()) {
keys = Option.of(Collections.emptyList());
}
processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
break;
case PARQUET_DATA_BLOCK:

View File

@@ -208,7 +208,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
String.format("HFile block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION));
return new HoodieHFileDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc,
Option.ofNullable(readerSchema), header, footer, enableRecordLookups);
Option.ofNullable(readerSchema), header, footer, enableRecordLookups, logFile.getPath());
case PARQUET_DATA_BLOCK:
checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION,

View File

@@ -18,6 +18,18 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -30,17 +42,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -65,6 +66,9 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
private static final int DEFAULT_BLOCK_SIZE = 1024 * 1024;
private final Option<Compression.Algorithm> compressionAlgorithm;
// This path is used for constructing HFile reader context, which should not be
// interpreted as the actual file path for the HFile data blocks
private final Path pathForReader;
public HoodieHFileDataBlock(FSDataInputStream inputStream,
Option<byte[]> content,
@@ -73,16 +77,20 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
Option<Schema> readerSchema,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
boolean enablePointLookups) {
boolean enablePointLookups,
Path pathForReader) {
super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, HoodieHFileReader.KEY_FIELD_NAME, enablePointLookups);
this.compressionAlgorithm = Option.empty();
this.pathForReader = pathForReader;
}
public HoodieHFileDataBlock(List<IndexedRecord> records,
Map<HeaderMetadataType, String> header,
Compression.Algorithm compressionAlgorithm) {
Compression.Algorithm compressionAlgorithm,
Path pathForReader) {
super(records, header, new HashMap<>(), HoodieHFileReader.KEY_FIELD_NAME);
this.compressionAlgorithm = Option.of(compressionAlgorithm);
this.pathForReader = pathForReader;
}
@Override
@@ -95,6 +103,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
HFileContext context = new HFileContextBuilder()
.withBlockSize(DEFAULT_BLOCK_SIZE)
.withCompression(compressionAlgorithm.get())
.withCellComparator(new HoodieHBaseKVComparator())
.build();
Configuration conf = new Configuration();
@@ -128,7 +137,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
}
HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
.withOutputStream(ostream).withFileContext(context).withComparator(new HoodieHBaseKVComparator()).create();
.withOutputStream(ostream).withFileContext(context).create();
// Write the records
sortedRecordsMap.forEach((recordKey, recordBytes) -> {
@@ -155,7 +164,8 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
// Read the content
HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(content);
HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(
FSUtils.getFs(pathForReader.toString(), new Configuration()), pathForReader, content);
// Sets up the writer schema
reader.withSchema(writerSchema);
Iterator<IndexedRecord> recordIterator = reader.getRecordIterator(readerSchema);

View File

@@ -19,11 +19,11 @@
package org.apache.hudi.io.storage;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.CellComparatorImpl;
/**
* This class is explicitly used as Key Comparator to work around the hard coded
* legacy format class names inside HBase. Otherwise, we will face issues with shading.
*/
public class HoodieHBaseKVComparator extends KeyValue.KVComparator {
public class HoodieHBaseKVComparator extends CellComparatorImpl {
}

View File

@@ -18,18 +18,16 @@
package org.apache.hudi.io.storage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
@@ -41,26 +39,37 @@ import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileReader<R> {
public static final String KEY_FIELD_NAME = "key";
public static final String KEY_SCHEMA = "schema";
public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
public static final String KEY_BLOOM_FILTER_TYPE_CODE = "bloomFilterTypeCode";
public static final String KEY_MIN_RECORD = "minRecordKey";
public static final String KEY_MAX_RECORD = "maxRecordKey";
private static final Logger LOG = LogManager.getLogger(HoodieHFileReader.class);
private Path path;
private Configuration conf;
private HFile.Reader reader;
@@ -70,55 +79,35 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
// key retrieval.
private HFileScanner keyScanner;
public static final String KEY_FIELD_NAME = "key";
public static final String KEY_SCHEMA = "schema";
public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
public static final String KEY_BLOOM_FILTER_TYPE_CODE = "bloomFilterTypeCode";
public static final String KEY_MIN_RECORD = "minRecordKey";
public static final String KEY_MAX_RECORD = "maxRecordKey";
public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig) throws IOException {
this.conf = configuration;
this.path = path;
this.reader = HFile.createReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf);
this.reader = HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf);
}
public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException {
this.conf = configuration;
this.path = path;
this.fsDataInputStream = fs.open(path);
this.reader = HFile.createReader(fs, path, cacheConfig, configuration);
this.reader = HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, configuration);
}
public HoodieHFileReader(byte[] content) throws IOException {
Configuration conf = new Configuration();
Path path = new Path("hoodie");
SeekableByteArrayInputStream bis = new SeekableByteArrayInputStream(content);
FSDataInputStream fsdis = new FSDataInputStream(bis);
this.reader = HFile.createReader(FSUtils.getFs("hoodie", conf), path, new FSDataInputStreamWrapper(fsdis),
content.length, new CacheConfig(conf), conf);
public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content) throws IOException {
this.reader = HoodieHFileUtils.createHFileReader(fs, dummyPath, content);
}
@Override
public String[] readMinMaxRecordKeys() {
try {
Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
return new String[] { new String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
} catch (IOException e) {
throw new HoodieException("Could not read min/max record key out of file information block correctly from path", e);
}
HFileInfo fileInfo = reader.getHFileInfo();
return new String[] {new String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
}
@Override
public Schema getSchema() {
if (schema == null) {
try {
Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
schema = new Schema.Parser().parse(new String(fileInfo.get(KEY_SCHEMA.getBytes())));
} catch (IOException e) {
throw new HoodieException("Could not read schema of file from path", e);
}
HFileInfo fileInfo = reader.getHFileInfo();
schema = new Schema.Parser().parse(new String(fileInfo.get(KEY_SCHEMA.getBytes())));
}
return schema;
@@ -133,10 +122,10 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
@Override
public BloomFilter readBloomFilter() {
Map<byte[], byte[]> fileInfo;
HFileInfo fileInfo;
try {
fileInfo = reader.loadFileInfo();
ByteBuffer serializedFilter = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false);
fileInfo = reader.getHFileInfo();
ByteBuff serializedFilter = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false).getBufferWithoutHeader();
byte[] filterBytes = new byte[serializedFilter.remaining()];
serializedFilter.get(filterBytes); // read the bytes that were written
return BloomFilterFactory.fromString(new String(filterBytes),
@@ -206,7 +195,7 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
final HFileScanner scanner = reader.getScanner(false, false);
if (scanner.seekTo()) {
do {
Cell c = scanner.getKeyValue();
Cell c = scanner.getCell();
final Pair<String, R> keyAndRecordPair = getRecordFromCell(c, writerSchema, readerSchema, keyFieldSchema);
recordList.add(keyAndRecordPair);
} while (scanner.next());
@@ -250,7 +239,6 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
*/
public List<Pair<String, R>> readRecords(List<String> keys, Schema schema) throws IOException {
this.schema = schema;
reader.loadFileInfo();
List<Pair<String, R>> records = new ArrayList<>();
for (String key: keys) {
Option<R> value = getRecordByKey(key, schema);
@@ -263,7 +251,6 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
public ClosableIterator<R> getRecordIterator(List<String> keys, Schema schema) throws IOException {
this.schema = schema;
reader.loadFileInfo();
Iterator<String> iterator = keys.iterator();
return new ClosableIterator<R>() {
private R next;
@@ -310,7 +297,7 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
// To handle when hasNext() is called multiple times for idempotency and/or the first time
if (this.next == null && !this.eof) {
if (!scanner.isSeeked() && scanner.seekTo()) {
final Pair<String, R> keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keyFieldSchema);
final Pair<String, R> keyAndRecordPair = getRecordFromCell(scanner.getCell(), getSchema(), readerSchema, keyFieldSchema);
this.next = keyAndRecordPair.getSecond();
}
}
@@ -331,7 +318,7 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
}
R retVal = this.next;
if (scanner.next()) {
final Pair<String, R> keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keyFieldSchema);
final Pair<String, R> keyAndRecordPair = getRecordFromCell(scanner.getCell(), getSchema(), readerSchema, keyFieldSchema);
this.next = keyAndRecordPair.getSecond();
} else {
this.next = null;
@@ -371,7 +358,7 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
}
if (keyScanner.seekTo(kv) == 0) {
Cell c = keyScanner.getKeyValue();
Cell c = keyScanner.getCell();
// Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards
value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
}

View File

@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
import java.io.IOException;
/**
* Util class for HFile reading and writing in Hudi
*/
public class HoodieHFileUtils {
// Based on HBase 2.4.9, the primaryReplicaReader is mainly used for constructing
// block cache key, so if we do not use block cache then it is OK to set it as any
// value. We use true here.
private static final boolean USE_PRIMARY_REPLICA_READER = true;
/**
* Creates HFile reader for a file with default `primaryReplicaReader` as true.
*
* @param fs File system.
* @param path Path to file to read.
* @param cacheConfig Cache configuration.
* @param configuration Configuration
* @return HFile reader
* @throws IOException Upon error.
*/
public static HFile.Reader createHFileReader(
FileSystem fs, Path path, CacheConfig cacheConfig, Configuration configuration) throws IOException {
return HFile.createReader(fs, path, cacheConfig, USE_PRIMARY_REPLICA_READER, configuration);
}
/**
* Creates HFile reader for byte array with default `primaryReplicaReader` as true.
*
* @param fs File system.
* @param dummyPath Dummy path to file to read.
* @param content Content in byte array.
* @return HFile reader
* @throws IOException Upon error.
*/
public static HFile.Reader createHFileReader(
FileSystem fs, Path dummyPath, byte[] content) throws IOException {
Configuration conf = new Configuration();
HoodieHFileReader.SeekableByteArrayInputStream bis = new HoodieHFileReader.SeekableByteArrayInputStream(content);
FSDataInputStream fsdis = new FSDataInputStream(bis);
FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fsdis);
ReaderContext context = new ReaderContextBuilder()
.withFilePath(dummyPath)
.withInputStreamWrapper(stream)
.withFileSize(content.length)
.withFileSystem(fs)
.withPrimaryReplicaReader(USE_PRIMARY_REPLICA_READER)
.withReaderType(ReaderContext.ReaderType.STREAM)
.build();
HFileInfo fileInfo = new HFileInfo(context, conf);
HFile.Reader reader = HFile.createReader(context, fileInfo, new CacheConfig(conf), conf);
fileInfo.initMetaAndIndex(reader);
return reader;
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -19,12 +19,13 @@
package org.apache.hudi.common.fs.inline;
import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieHFileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -39,10 +40,12 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import static org.apache.hadoop.hbase.CellComparatorImpl.COMPARATOR;
import static org.apache.hudi.common.testutils.FileSystemTestUtils.FILE_SCHEME;
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.testutils.FileSystemTestUtils.getPhantomFile;
@@ -56,11 +59,12 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
*/
public class TestInLineFileSystemHFileInLining {
private static final String LOCAL_FORMATTER = "%010d";
private static final String VALUE_PREFIX = "value";
private static final int MIN_BLOCK_BYTES = 1024;
private final Configuration inMemoryConf;
private final Configuration inlineConf;
private final int minBlockSize = 1024;
private static final String LOCAL_FORMATTER = "%010d";
private int maxRows = 100 + RANDOM.nextInt(1000);
private final int maxRows = 100 + RANDOM.nextInt(1000);
private Path generatedPath;
public TestInLineFileSystemHFileInLining() {
@@ -88,12 +92,11 @@ public class TestInLineFileSystemHFileInLining {
CacheConfig cacheConf = new CacheConfig(inMemoryConf);
FSDataOutputStream fout = createFSOutput(outerInMemFSPath, inMemoryConf);
HFileContext meta = new HFileContextBuilder()
.withBlockSize(minBlockSize)
.withBlockSize(MIN_BLOCK_BYTES).withCellComparator(COMPARATOR)
.build();
HFile.Writer writer = HFile.getWriterFactory(inMemoryConf, cacheConf)
.withOutputStream(fout)
.withFileContext(meta)
.withComparator(new HoodieHBaseKVComparator())
.create();
writeRecords(writer);
@@ -110,9 +113,8 @@ public class TestInLineFileSystemHFileInLining {
InLineFileSystem inlineFileSystem = (InLineFileSystem) inlinePath.getFileSystem(inlineConf);
FSDataInputStream fin = inlineFileSystem.open(inlinePath);
HFile.Reader reader = HFile.createReader(inlineFileSystem, inlinePath, cacheConf, inlineConf);
// Load up the index.
reader.loadFileInfo();
HFile.Reader reader =
HoodieHFileUtils.createHFileReader(inlineFileSystem, inlinePath, cacheConf, inlineConf);
// Get a scanner that caches and that does not use pread.
HFileScanner scanner = reader.getScanner(true, false);
// Align scanner at start of the file.
@@ -121,21 +123,24 @@ public class TestInLineFileSystemHFileInLining {
Set<Integer> rowIdsToSearch = getRandomValidRowIds(10);
for (int rowId : rowIdsToSearch) {
assertEquals(0, scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId))),
KeyValue keyValue = new KeyValue.KeyOnlyKeyValue(getSomeKey(rowId));
assertEquals(0, scanner.seekTo(keyValue),
"location lookup failed");
// read the key and see if it matches
ByteBuffer readKey = scanner.getKey();
assertArrayEquals(getSomeKey(rowId), Bytes.toBytes(readKey), "seeked key does not match");
scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId)));
Cell cell = scanner.getCell();
byte[] key = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength());
byte[] expectedKey = Arrays.copyOfRange(keyValue.getRowArray(), keyValue.getRowOffset(), keyValue.getRowOffset() + keyValue.getRowLength());
assertArrayEquals(expectedKey, key, "seeked key does not match");
scanner.seekTo(keyValue);
ByteBuffer val1 = scanner.getValue();
scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId)));
scanner.seekTo(keyValue);
ByteBuffer val2 = scanner.getValue();
assertArrayEquals(Bytes.toBytes(val1), Bytes.toBytes(val2));
}
int[] invalidRowIds = {-4, maxRows, maxRows + 1, maxRows + 120, maxRows + 160, maxRows + 1000};
for (int rowId : invalidRowIds) {
assertNotEquals(0, scanner.seekTo(KeyValue.createKeyValueFromKey(getSomeKey(rowId))),
assertNotEquals(0, scanner.seekTo(new KeyValue.KeyOnlyKeyValue(getSomeKey(rowId))),
"location lookup should have failed");
}
reader.close();
@@ -155,7 +160,7 @@ public class TestInLineFileSystemHFileInLining {
}
private byte[] getSomeKey(int rowId) {
KeyValue kv = new KeyValue(String.format(LOCAL_FORMATTER, Integer.valueOf(rowId)).getBytes(),
KeyValue kv = new KeyValue(String.format(LOCAL_FORMATTER, rowId).getBytes(),
Bytes.toBytes("family"), Bytes.toBytes("qual"), HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put);
return kv.getKey();
}
@@ -169,17 +174,15 @@ public class TestInLineFileSystemHFileInLining {
writer.close();
}
private int writeSomeRecords(HFile.Writer writer)
private void writeSomeRecords(HFile.Writer writer)
throws IOException {
String value = "value";
KeyValue kv;
for (int i = 0; i < (maxRows); i++) {
String key = String.format(LOCAL_FORMATTER, Integer.valueOf(i));
String key = String.format(LOCAL_FORMATTER, i);
kv = new KeyValue(Bytes.toBytes(key), Bytes.toBytes("family"), Bytes.toBytes("qual"),
Bytes.toBytes(value + key));
Bytes.toBytes(VALUE_PREFIX + key));
writer.append(kv);
}
return (maxRows);
}
private void readAllRecords(HFileScanner scanner) throws IOException {
@@ -187,30 +190,31 @@ public class TestInLineFileSystemHFileInLining {
}
// read the records and check
private int readAndCheckbytes(HFileScanner scanner, int start, int n)
private void readAndCheckbytes(HFileScanner scanner, int start, int n)
throws IOException {
String value = "value";
int i = start;
for (; i < (start + n); i++) {
ByteBuffer key = scanner.getKey();
ByteBuffer val = scanner.getValue();
String keyStr = String.format(LOCAL_FORMATTER, Integer.valueOf(i));
String valStr = value + keyStr;
Cell cell = scanner.getCell();
byte[] key = Arrays.copyOfRange(
cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength());
byte[] val = Arrays.copyOfRange(
cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
String keyStr = String.format(LOCAL_FORMATTER, i);
String valStr = VALUE_PREFIX + keyStr;
KeyValue kv = new KeyValue(Bytes.toBytes(keyStr), Bytes.toBytes("family"),
Bytes.toBytes("qual"), Bytes.toBytes(valStr));
byte[] keyBytes = new KeyValue.KeyOnlyKeyValue(Bytes.toBytes(key), 0,
Bytes.toBytes(key).length).getKey();
assertArrayEquals(kv.getKey(), keyBytes,
"bytes for keys do not match " + keyStr + " " + Bytes.toString(Bytes.toBytes(key)));
byte[] valBytes = Bytes.toBytes(val);
assertArrayEquals(Bytes.toBytes(valStr), valBytes,
"bytes for vals do not match " + valStr + " " + Bytes.toString(valBytes));
byte[] keyBytes = new KeyValue.KeyOnlyKeyValue(key, 0, key.length).getKey();
byte[] expectedKeyBytes = Arrays.copyOfRange(
kv.getRowArray(), kv.getRowOffset(), kv.getRowOffset() + kv.getRowLength());
assertArrayEquals(expectedKeyBytes, keyBytes,
"bytes for keys do not match " + keyStr + " " + Bytes.toString(key));
assertArrayEquals(Bytes.toBytes(valStr), val,
"bytes for vals do not match " + valStr + " " + Bytes.toString(val));
if (!scanner.next()) {
break;
}
}
assertEquals(i, start + n - 1);
return (start + n);
}
private long generateOuterFile(Path outerPath, byte[] inlineBytes) throws IOException {

View File

@@ -1886,11 +1886,16 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<IndexedRecord> records,
Map<HeaderMetadataType, String> header) {
return getDataBlock(dataBlockType, records, header, new Path("dummy_path"));
}
private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<IndexedRecord> records,
Map<HeaderMetadataType, String> header, Path pathForReader) {
switch (dataBlockType) {
case AVRO_DATA_BLOCK:
return new HoodieAvroDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(records, header, Compression.Algorithm.GZ);
return new HoodieHFileDataBlock(records, header, Compression.Algorithm.GZ, pathForReader);
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP);
default: