[HUDI-2443] Hudi KVComparator for all HFile writer usages (#3889)
* [HUDI-2443] Hudi KVComparator for all HFile writer usages - Hudi relies on custom class shading for Hbase's KeyValue.KVComparator to avoid versioning and class loading issues. There are few places which are still using the Hbase's comparator class directly and version upgrades would make them obsolete. Refactoring the HoodieKVComparator and making all HFile writer creation using the same shaded class. * [HUDI-2443] Hudi KVComparator for all HFile writer usages - Moving HoodieKVComparator from common.bootstrap.index to common.util * [HUDI-2443] Hudi KVComparator for all HFile writer usages - Retaining the old HoodieKVComparatorV2 for boostrap case. Adding the new comparator as HoodieKVComparatorV2 to differentiate from the old one. * [HUDI-2443] Hudi KVComparator for all HFile writer usages - Renamed HoodieKVComparatorV2 to HoodieMetadataKVComparator and moved it under the package org.apache.hudi.metadata. * Make comparator classname configurable * Revert new config and address other review comments Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
This commit is contained in:
committed by
GitHub
parent
90f2ea2f12
commit
973f78f5ca
@@ -38,6 +38,10 @@ import java.io.IOException;
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
|
||||
import static org.apache.hudi.io.storage.HoodieHFileConfig.CACHE_DATA_IN_L1;
|
||||
import static org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION;
|
||||
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
|
||||
import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN;
|
||||
|
||||
public class HoodieFileWriterFactory {
|
||||
|
||||
@@ -82,7 +86,8 @@ public class HoodieFileWriterFactory {
|
||||
|
||||
BloomFilter filter = createBloomFilter(config);
|
||||
HoodieHFileConfig hfileConfig = new HoodieHFileConfig(hoodieTable.getHadoopConf(),
|
||||
config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(), filter);
|
||||
config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(),
|
||||
PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR);
|
||||
|
||||
return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier);
|
||||
}
|
||||
|
||||
@@ -18,35 +18,35 @@
|
||||
|
||||
package org.apache.hudi.io.storage;
|
||||
|
||||
import org.apache.hudi.common.bloom.BloomFilter;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hudi.common.bloom.BloomFilter;
|
||||
|
||||
public class HoodieHFileConfig {
|
||||
|
||||
private Compression.Algorithm compressionAlgorithm;
|
||||
private int blockSize;
|
||||
private long maxFileSize;
|
||||
private boolean prefetchBlocksOnOpen;
|
||||
private boolean cacheDataInL1;
|
||||
private boolean dropBehindCacheCompaction;
|
||||
private Configuration hadoopConf;
|
||||
private BloomFilter bloomFilter;
|
||||
|
||||
public static final KeyValue.KVComparator HFILE_COMPARATOR = new HoodieHBaseKVComparator();
|
||||
public static final boolean PREFETCH_ON_OPEN = CacheConfig.DEFAULT_PREFETCH_ON_OPEN;
|
||||
public static final boolean CACHE_DATA_IN_L1 = HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1;
|
||||
// This is private in CacheConfig so have been copied here.
|
||||
private static boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
|
||||
public static final boolean DROP_BEHIND_CACHE_COMPACTION = true;
|
||||
|
||||
public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
|
||||
long maxFileSize, BloomFilter bloomFilter) {
|
||||
this(hadoopConf, compressionAlgorithm, blockSize, maxFileSize, CacheConfig.DEFAULT_PREFETCH_ON_OPEN,
|
||||
HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION_DEFAULT, bloomFilter);
|
||||
}
|
||||
private final Compression.Algorithm compressionAlgorithm;
|
||||
private final int blockSize;
|
||||
private final long maxFileSize;
|
||||
private final boolean prefetchBlocksOnOpen;
|
||||
private final boolean cacheDataInL1;
|
||||
private final boolean dropBehindCacheCompaction;
|
||||
private final Configuration hadoopConf;
|
||||
private final BloomFilter bloomFilter;
|
||||
private final KeyValue.KVComparator hfileComparator;
|
||||
|
||||
public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
|
||||
long maxFileSize, boolean prefetchBlocksOnOpen, boolean cacheDataInL1,
|
||||
boolean dropBehindCacheCompaction, BloomFilter bloomFilter) {
|
||||
boolean dropBehindCacheCompaction, BloomFilter bloomFilter, KeyValue.KVComparator hfileComparator) {
|
||||
this.hadoopConf = hadoopConf;
|
||||
this.compressionAlgorithm = compressionAlgorithm;
|
||||
this.blockSize = blockSize;
|
||||
@@ -55,6 +55,7 @@ public class HoodieHFileConfig {
|
||||
this.cacheDataInL1 = cacheDataInL1;
|
||||
this.dropBehindCacheCompaction = dropBehindCacheCompaction;
|
||||
this.bloomFilter = bloomFilter;
|
||||
this.hfileComparator = hfileComparator;
|
||||
}
|
||||
|
||||
public Configuration getHadoopConf() {
|
||||
@@ -92,4 +93,8 @@ public class HoodieHFileConfig {
|
||||
public BloomFilter getBloomFilter() {
|
||||
return bloomFilter;
|
||||
}
|
||||
|
||||
public KeyValue.KVComparator getHfileComparator() {
|
||||
return hfileComparator;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
@@ -85,14 +86,18 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
|
||||
this.taskContextSupplier = taskContextSupplier;
|
||||
|
||||
HFileContext context = new HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
|
||||
.withCompression(hfileConfig.getCompressionAlgorithm())
|
||||
.build();
|
||||
.withCompression(hfileConfig.getCompressionAlgorithm())
|
||||
.build();
|
||||
|
||||
conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, String.valueOf(hfileConfig.shouldPrefetchBlocksOnOpen()));
|
||||
conf.set(HColumnDescriptor.CACHE_DATA_IN_L1, String.valueOf(hfileConfig.shouldCacheDataInL1()));
|
||||
conf.set(DROP_BEHIND_CACHE_COMPACTION_KEY, String.valueOf(hfileConfig.shouldDropBehindCacheCompaction()));
|
||||
CacheConfig cacheConfig = new CacheConfig(conf);
|
||||
this.writer = HFile.getWriterFactory(conf, cacheConfig).withPath(this.fs, this.file).withFileContext(context).create();
|
||||
this.writer = HFile.getWriterFactory(conf, cacheConfig)
|
||||
.withPath(this.fs, this.file)
|
||||
.withFileContext(context)
|
||||
.withComparator(hfileConfig.getHfileComparator())
|
||||
.create();
|
||||
|
||||
writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());
|
||||
}
|
||||
@@ -145,7 +150,8 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException { }
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -50,6 +50,10 @@ import java.util.Set;
|
||||
|
||||
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
|
||||
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
||||
import static org.apache.hudi.io.storage.HoodieHFileConfig.CACHE_DATA_IN_L1;
|
||||
import static org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION;
|
||||
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
|
||||
import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class TestHoodieHFileReaderWriter {
|
||||
@@ -76,7 +80,7 @@ public class TestHoodieHFileReaderWriter {
|
||||
String instantTime = "000";
|
||||
|
||||
HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024,
|
||||
filter);
|
||||
PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR);
|
||||
return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier);
|
||||
}
|
||||
|
||||
|
||||
@@ -26,18 +26,16 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
|
||||
import org.apache.hudi.io.storage.HoodieHFileReader;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
@@ -45,6 +43,10 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
@@ -56,8 +58,6 @@ import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
/**
|
||||
* HoodieHFileDataBlock contains a list of records stored inside an HFile format. It is used with the HFile
|
||||
* base file format.
|
||||
@@ -102,7 +102,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
|
||||
FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
|
||||
|
||||
HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
|
||||
.withOutputStream(ostream).withFileContext(context).create();
|
||||
.withOutputStream(ostream).withFileContext(context).withComparator(new HoodieHBaseKVComparator()).create();
|
||||
|
||||
// Serialize records into bytes
|
||||
Map<String, byte[]> sortedRecordsMap = new TreeMap<>();
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.io.storage;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
||||
/**
|
||||
* This class is explicitly used as Key Comparator to work around the hard coded
|
||||
* legacy format class names inside HBase. Otherwise, we will face issues with shading.
|
||||
*/
|
||||
public class HoodieHBaseKVComparator extends KeyValue.KVComparator {
|
||||
}
|
||||
@@ -19,6 +19,7 @@
|
||||
package org.apache.hudi.common.fs.inline;
|
||||
|
||||
import org.apache.hudi.common.testutils.FileSystemTestUtils;
|
||||
import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
@@ -92,7 +93,7 @@ public class TestInLineFileSystemHFileInLining {
|
||||
HFile.Writer writer = HFile.getWriterFactory(inMemoryConf, cacheConf)
|
||||
.withOutputStream(fout)
|
||||
.withFileContext(meta)
|
||||
.withComparator(new KeyValue.KVComparator())
|
||||
.withComparator(new HoodieHBaseKVComparator())
|
||||
.create();
|
||||
|
||||
writeRecords(writer);
|
||||
|
||||
Reference in New Issue
Block a user