1
0

[HUDI-960] Implementation of the HFile base and log file format. (#1804)

* [HUDI-960] Implementation of the HFile base and log file format.

1. Includes HFileWriter and HFileReader
2. Includes HFileInputFormat for both snapshot and realtime input format for Hive
3. Unit test for new code
4. IT for using HFile format and querying using Hive (Presto and SparkSQL are not supported)

Advantage:
HFile file format saves data as binary key-value pairs. This implementation chooses the following values:
1. Key = Hoodie Record Key (as bytes)
2. Value = Avro encoded GenericRecord (as bytes)

HFile allows efficient lookup of a record by key or range of keys. Hence, this base file format is well suited to applications like RFC-15, RFC-08 which will benefit from the ability to lookup records by key or search in a range of keys without having to read the entire data/log format.

Limitations:
HFile storage format has certain limitations when used as a general purpose data storage format.
1. Does not have a implemented reader for Presto and SparkSQL
2. Is not a columnar file format and hence may lead to lower compression levels and greater IO on query side due to lack of column pruning


Other changes: 
 - Remove databricks/avro from pom
 - Fix HoodieClientTestUtils from not using scala imports/reflection based conversion etc
 - Breaking up limitFileSize(), per parquet and hfile base files
 - Added three new configs for HoodieHFileConfig - prefetchBlocksOnOpen, cacheDataInL1, dropBehindCacheCompaction
 - Throw UnsupportedException in HFileReader.getRecordKeys()
 - Updated HoodieCopyOnWriteTable to create the correct merge handle (HoodieSortedMergeHandle for HFile and HoodieMergeHandle otherwise)

* Fixing checkstyle

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Prashant Wason
2020-08-31 08:05:59 -07:00
committed by GitHub
parent 6df8f88d86
commit 6461927eac
54 changed files with 2224 additions and 295 deletions

View File

@@ -206,6 +206,12 @@
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<scope>provided</scope>
</dependency>
<!-- Hoodie - Tests -->
<dependency>

View File

@@ -39,6 +39,10 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
public static final String DEFAULT_PARQUET_BLOCK_SIZE_BYTES = DEFAULT_PARQUET_FILE_MAX_BYTES;
public static final String PARQUET_PAGE_SIZE_BYTES = "hoodie.parquet.page.size";
public static final String DEFAULT_PARQUET_PAGE_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
public static final String HFILE_FILE_MAX_BYTES = "hoodie.hfile.max.file.size";
public static final String HFILE_BLOCK_SIZE_BYTES = "hoodie.hfile.block.size";
public static final String DEFAULT_HFILE_BLOCK_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
public static final String DEFAULT_HFILE_FILE_MAX_BYTES = String.valueOf(120 * 1024 * 1024);
// used to size log files
public static final String LOGFILE_SIZE_MAX_BYTES = "hoodie.logfile.max.size";
public static final String DEFAULT_LOGFILE_SIZE_MAX_BYTES = String.valueOf(1024 * 1024 * 1024); // 1 GB
@@ -49,8 +53,10 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
// Default compression ratio for parquet
public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1);
public static final String PARQUET_COMPRESSION_CODEC = "hoodie.parquet.compression.codec";
public static final String HFILE_COMPRESSION_ALGORITHM = "hoodie.hfile.compression.algorithm";
// Default compression codec for parquet
public static final String DEFAULT_PARQUET_COMPRESSION_CODEC = "gzip";
public static final String DEFAULT_HFILE_COMPRESSION_ALGORITHM = "GZ";
public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio";
// Default compression ratio for log file to parquet, general 3x
public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35);
@@ -79,7 +85,7 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
return this;
}
public Builder limitFileSize(long maxFileSize) {
public Builder parquetMaxFileSize(long maxFileSize) {
props.setProperty(PARQUET_FILE_MAX_BYTES, String.valueOf(maxFileSize));
return this;
}
@@ -94,6 +100,16 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
return this;
}
public Builder hfileMaxFileSize(long maxFileSize) {
props.setProperty(HFILE_FILE_MAX_BYTES, String.valueOf(maxFileSize));
return this;
}
public Builder hfileBlockSize(int blockSize) {
props.setProperty(HFILE_BLOCK_SIZE_BYTES, String.valueOf(blockSize));
return this;
}
public Builder logFileDataBlockMaxSize(int dataBlockSize) {
props.setProperty(LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES, String.valueOf(dataBlockSize));
return this;
@@ -114,6 +130,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
return this;
}
public Builder hfileCompressionAlgorithm(String hfileCompressionAlgorithm) {
props.setProperty(HFILE_COMPRESSION_ALGORITHM, hfileCompressionAlgorithm);
return this;
}
public Builder logFileToParquetCompressionRatio(double logFileToParquetCompressionRatio) {
props.setProperty(LOGFILE_TO_PARQUET_COMPRESSION_RATIO, String.valueOf(logFileToParquetCompressionRatio));
return this;
@@ -137,6 +158,14 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
DEFAULT_PARQUET_COMPRESSION_CODEC);
setDefaultOnCondition(props, !props.containsKey(LOGFILE_TO_PARQUET_COMPRESSION_RATIO),
LOGFILE_TO_PARQUET_COMPRESSION_RATIO, DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO);
setDefaultOnCondition(props, !props.containsKey(HFILE_BLOCK_SIZE_BYTES), HFILE_BLOCK_SIZE_BYTES,
DEFAULT_HFILE_BLOCK_SIZE_BYTES);
setDefaultOnCondition(props, !props.containsKey(HFILE_COMPRESSION_ALGORITHM), HFILE_COMPRESSION_ALGORITHM,
DEFAULT_HFILE_COMPRESSION_ALGORITHM);
setDefaultOnCondition(props, !props.containsKey(HFILE_FILE_MAX_BYTES), HFILE_FILE_MAX_BYTES,
DEFAULT_HFILE_FILE_MAX_BYTES);
return config;
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.config;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
@@ -55,6 +56,8 @@ import java.util.stream.Collectors;
@Immutable
public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final long serialVersionUID = 0L;
public static final String TABLE_NAME = "hoodie.table.name";
public static final String DEFAULT_ROLLBACK_USING_MARKERS = "false";
public static final String ROLLBACK_USING_MARKERS = "hoodie.rollback.using.markers";
@@ -556,6 +559,18 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Double.parseDouble(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO));
}
public long getHFileMaxFileSize() {
return Long.parseLong(props.getProperty(HoodieStorageConfig.HFILE_FILE_MAX_BYTES));
}
public int getHFileBlockSize() {
return Integer.parseInt(props.getProperty(HoodieStorageConfig.HFILE_BLOCK_SIZE_BYTES));
}
public Compression.Algorithm getHFileCompressionAlgorithm() {
return Compression.Algorithm.valueOf(props.getProperty(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM));
}
/**
* metrics properties.
*/

View File

@@ -45,6 +45,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> {
@@ -55,7 +56,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
private long recordsWritten = 0;
private long insertRecordsWritten = 0;
private long recordsDeleted = 0;
private Iterator<HoodieRecord<T>> recordIterator;
private Map<String, HoodieRecord<T>> recordMap;
private boolean useWriterSchema = false;
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
@@ -90,9 +91,10 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
* Called by the compactor code path.
*/
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) {
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
SparkTaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, sparkTaskContextSupplier);
this.recordIterator = recordIterator;
this.recordMap = recordMap;
this.useWriterSchema = true;
}
@@ -138,9 +140,17 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
* Writes all records passed.
*/
public void write() {
Iterator<String> keyIterator;
if (hoodieTable.requireSortedRecords()) {
// Sorting the keys limits the amount of extra memory required for writing sorted records
keyIterator = recordMap.keySet().stream().sorted().iterator();
} else {
keyIterator = recordMap.keySet().stream().iterator();
}
try {
while (recordIterator.hasNext()) {
HoodieRecord<T> record = recordIterator.next();
while (keyIterator.hasNext()) {
final String key = keyIterator.next();
HoodieRecord<T> record = recordMap.get(key);
if (useWriterSchema) {
write(record, record.getData().getInsertValue(writerSchemaWithMetafields));
} else {

View File

@@ -58,16 +58,17 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
private Map<String, HoodieRecord<T>> keyToNewRecords;
private Set<String> writtenRecordKeys;
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
private HoodieFileWriter<IndexedRecord> fileWriter;
private Path newFilePath;
private Path oldFilePath;
private long recordsWritten = 0;
private long recordsDeleted = 0;
private long updatedRecordsWritten = 0;
private long insertRecordsWritten = 0;
private boolean useWriterSchema;
protected long insertRecordsWritten = 0;
protected boolean useWriterSchema;
private HoodieBaseFile baseFileToMerge;
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
@@ -179,7 +180,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
return writeRecord(hoodieRecord, indexedRecord);
}
private boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
Option recordMetadata = hoodieRecord.getData().getMetadata();
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "

View File

@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
/**
* Hoodie merge handle which writes records (new inserts or updates) sorted by their key.
*
* The implementation performs a merge-sort by comparing the key of the record being written to the list of
* keys in newRecordKeys (sorted in-memory).
*/
public class HoodieSortedMergeHandle<T extends HoodieRecordPayload> extends HoodieMergeHandle<T> {
private Queue<String> newRecordKeysSorted = new PriorityQueue<>();
public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
newRecordKeysSorted.addAll(keyToNewRecords.keySet());
}
/**
* Called by compactor code path.
*/
public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecordsOrig, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, hoodieTable, keyToNewRecordsOrig, partitionPath, fileId, dataFileToBeMerged,
sparkTaskContextSupplier);
newRecordKeysSorted.addAll(keyToNewRecords.keySet());
}
/**
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
*/
@Override
public void write(GenericRecord oldRecord) {
String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
// To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than
// the oldRecord's key.
while (!newRecordKeysSorted.isEmpty() && newRecordKeysSorted.peek().compareTo(key) <= 0) {
String keyToPreWrite = newRecordKeysSorted.remove();
if (keyToPreWrite.equals(key)) {
// will be handled as an update later
break;
}
// This is a new insert
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(keyToPreWrite));
if (writtenRecordKeys.contains(keyToPreWrite)) {
throw new HoodieUpsertException("Insert/Update not in sorted order");
}
try {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
}
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
} catch (IOException e) {
throw new HoodieUpsertException("Failed to write records", e);
}
}
super.write(oldRecord);
}
@Override
public WriteStatus close() {
// write out any pending records (this can happen when inserts are turned into updates)
newRecordKeysSorted.stream().forEach(key -> {
try {
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
}
insertRecordsWritten++;
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
}
});
newRecordKeysSorted.clear();
keyToNewRecords.clear();
return super.close();
}
}

View File

@@ -35,6 +35,7 @@ import org.apache.parquet.avro.AvroSchemaConverter;
import java.io.IOException;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
public class HoodieFileWriterFactory {
@@ -45,16 +46,16 @@ public class HoodieFileWriterFactory {
if (PARQUET.getFileExtension().equals(extension)) {
return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
}
if (HFILE.getFileExtension().equals(extension)) {
return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
}
throw new UnsupportedOperationException(extension + " format not supported yet.");
}
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
BloomFilter filter = BloomFilterFactory
.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
config.getDynamicBloomFilterMaxNumEntries(),
config.getBloomFilterType());
BloomFilter filter = createBloomFilter(config);
HoodieAvroWriteSupport writeSupport =
new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter);
@@ -64,4 +65,21 @@ public class HoodieFileWriterFactory {
return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, sparkTaskContextSupplier);
}
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newHFileFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
BloomFilter filter = createBloomFilter(config);
HoodieHFileConfig hfileConfig = new HoodieHFileConfig(hoodieTable.getHadoopConf(),
config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(), filter);
return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, sparkTaskContextSupplier);
}
private static BloomFilter createBloomFilter(HoodieWriteConfig config) {
return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
config.getDynamicBloomFilterMaxNumEntries(),
config.getBloomFilterType());
}
}

View File

@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hudi.common.bloom.BloomFilter;
public class HoodieHFileConfig {
private Compression.Algorithm compressionAlgorithm;
private int blockSize;
private long maxFileSize;
private boolean prefetchBlocksOnOpen;
private boolean cacheDataInL1;
private boolean dropBehindCacheCompaction;
private Configuration hadoopConf;
private BloomFilter bloomFilter;
// This is private in CacheConfig so have been copied here.
private static boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
long maxFileSize, BloomFilter bloomFilter) {
this(hadoopConf, compressionAlgorithm, blockSize, maxFileSize, CacheConfig.DEFAULT_PREFETCH_ON_OPEN,
HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION_DEFAULT, bloomFilter);
}
public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
long maxFileSize, boolean prefetchBlocksOnOpen, boolean cacheDataInL1,
boolean dropBehindCacheCompaction, BloomFilter bloomFilter) {
this.hadoopConf = hadoopConf;
this.compressionAlgorithm = compressionAlgorithm;
this.blockSize = blockSize;
this.maxFileSize = maxFileSize;
this.prefetchBlocksOnOpen = prefetchBlocksOnOpen;
this.cacheDataInL1 = cacheDataInL1;
this.dropBehindCacheCompaction = dropBehindCacheCompaction;
this.bloomFilter = bloomFilter;
}
public Configuration getHadoopConf() {
return hadoopConf;
}
public Compression.Algorithm getCompressionAlgorithm() {
return compressionAlgorithm;
}
public int getBlockSize() {
return blockSize;
}
public long getMaxFileSize() {
return maxFileSize;
}
public boolean shouldPrefetchBlocksOnOpen() {
return prefetchBlocksOnOpen;
}
public boolean shouldCacheDataInL1() {
return cacheDataInL1;
}
public boolean shouldDropBehindCacheCompaction() {
return dropBehindCacheCompaction;
}
public boolean useBloomFilter() {
return bloomFilter != null;
}
public BloomFilter getBloomFilter() {
return bloomFilter;
}
}

View File

@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* HoodieHFileWriter writes IndexedRecords into an HFile. The record's key is used as the key and the
* AVRO encoded record bytes are saved as the value.
*
* Limitations (compared to columnar formats like Parquet or ORC):
* 1. Records should be added in order of keys
* 2. There are no column stats
*/
public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
implements HoodieFileWriter<R> {
private static AtomicLong recordIndex = new AtomicLong(1);
private final Path file;
private HoodieHFileConfig hfileConfig;
private final HoodieWrapperFileSystem fs;
private final long maxFileSize;
private final String instantTime;
private final SparkTaskContextSupplier sparkTaskContextSupplier;
private HFile.Writer writer;
private String minRecordKey;
private String maxRecordKey;
// This is private in CacheConfig so have been copied here.
private static String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction";
public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileConfig, Schema schema,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
Configuration conf = FSUtils.registerFileSystem(file, hfileConfig.getHadoopConf());
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
this.hfileConfig = hfileConfig;
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
// stream and the actual file size reported by HDFS
// this.maxFileSize = hfileConfig.getMaxFileSize()
// + Math.round(hfileConfig.getMaxFileSize() * hfileConfig.getCompressionRatio());
this.maxFileSize = hfileConfig.getMaxFileSize();
this.instantTime = instantTime;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
HFileContext context = new HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
.withCompression(hfileConfig.getCompressionAlgorithm())
.build();
conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, String.valueOf(hfileConfig.shouldPrefetchBlocksOnOpen()));
conf.set(HColumnDescriptor.CACHE_DATA_IN_L1, String.valueOf(hfileConfig.shouldCacheDataInL1()));
conf.set(DROP_BEHIND_CACHE_COMPACTION_KEY, String.valueOf(hfileConfig.shouldDropBehindCacheCompaction()));
CacheConfig cacheConfig = new CacheConfig(conf);
this.writer = HFile.getWriterFactory(conf, cacheConfig).withPath(this.fs, this.file).withFileContext(context).create();
writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());
}
@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
String seqId =
HoodieRecord.generateSequenceId(instantTime, sparkTaskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement());
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(),
file.getName());
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
writeAvro(record.getRecordKey(), (IndexedRecord)avroRecord);
}
@Override
public boolean canWrite() {
return fs.getBytesWritten(file) < maxFileSize;
}
@Override
public void writeAvro(String recordKey, IndexedRecord object) throws IOException {
byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord)object);
KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, value);
writer.append(kv);
if (hfileConfig.useBloomFilter()) {
hfileConfig.getBloomFilter().add(recordKey);
if (minRecordKey != null) {
minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
} else {
minRecordKey = recordKey;
}
if (maxRecordKey != null) {
maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
} else {
maxRecordKey = recordKey;
}
}
}
@Override
public void close() throws IOException {
if (hfileConfig.useBloomFilter()) {
final BloomFilter bloomFilter = hfileConfig.getBloomFilter();
if (minRecordKey == null) {
minRecordKey = "";
}
if (maxRecordKey == null) {
maxRecordKey = "";
}
writer.appendFileInfo(HoodieHFileReader.KEY_MIN_RECORD.getBytes(), minRecordKey.getBytes());
writer.appendFileInfo(HoodieHFileReader.KEY_MAX_RECORD.getBytes(), maxRecordKey.getBytes());
writer.appendFileInfo(HoodieHFileReader.KEY_BLOOM_FILTER_TYPE_CODE.getBytes(),
bloomFilter.getBloomFilterTypeCode().toString().getBytes());
writer.appendMetaBlock(HoodieHFileReader.KEY_BLOOM_FILTER_META_BLOCK, new Writable() {
@Override
public void write(DataOutput out) throws IOException {
out.write(bloomFilter.serializeToString().getBytes());
}
@Override
public void readFields(DataInput in) throws IOException { }
});
}
writer.close();
writer = null;
}
}

View File

@@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.BootstrapCommitActionExecutor;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
@@ -158,7 +159,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
MergeHelper.runMerge(this, upsertHandle);
}
// TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
@@ -169,14 +169,19 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords,
partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier);
if (requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, sparkTaskContextSupplier);
} else {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, sparkTaskContextSupplier);
}
}
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr) {
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
HoodieCreateHandle createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordItr, sparkTaskContextSupplier);
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, sparkTaskContextSupplier);
createHandle.write();
return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
}

View File

@@ -593,6 +593,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
switch (getBaseFileFormat()) {
case PARQUET:
return HoodieLogBlockType.AVRO_DATA_BLOCK;
case HFILE:
return HoodieLogBlockType.HFILE_DATA_BLOCK;
default:
throw new HoodieException("Base file format " + getBaseFileFormat()
+ " does not have associated log block format");
@@ -602,4 +604,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
public String getBaseFileExtension() {
return getBaseFileFormat().getFileExtension();
}
public boolean requireSortedRecords() {
return getBaseFileFormat() == HoodieFileFormat.HFILE;
}
}

View File

@@ -18,17 +18,11 @@
package org.apache.hudi.table.action.commit;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -50,9 +44,21 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import scala.Tuple2;
public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>, R>
@@ -153,9 +159,26 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>,
}
private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
return dedupedRecords.mapToPair(
record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record))
.partitionBy(partitioner).map(Tuple2::_2);
JavaPairRDD<Tuple2, HoodieRecord<T>> mappedRDD = dedupedRecords.mapToPair(
record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record));
JavaPairRDD<Tuple2, HoodieRecord<T>> partitionedRDD;
if (table.requireSortedRecords()) {
// Partition and sort within each partition as a single step. This is faster than partitioning first and then
// applying a sort.
Comparator<Tuple2> comparator = (Comparator<Tuple2> & Serializable)(t1, t2) -> {
HoodieKey key1 = (HoodieKey) t1._1;
HoodieKey key2 = (HoodieKey) t2._1;
return key1.getRecordKey().compareTo(key2.getRecordKey());
};
partitionedRDD = mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
} else {
// Partition only
partitionedRDD = mappedRDD.partitionBy(partitioner);
}
return partitionedRDD.map(Tuple2::_2);
}
protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.LazyInsertIterable;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -98,7 +99,11 @@ public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
}
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
if (table.requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieTable<T>)table, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
} else {
return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
}
}
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId,

View File

@@ -135,15 +135,14 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
// Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result;
// If the dataFile is present, there is a base parquet file present, perform updates else perform inserts into a
// new base parquet file.
// If the dataFile is present, perform updates else perform inserts into a new base file.
if (oldDataFileOpt.isPresent()) {
result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(),
operation.getFileId(), scanner.getRecords(),
oldDataFileOpt.get());
} else {
result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(),
scanner.iterator());
scanner.getRecords());
}
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {

View File

@@ -1261,7 +1261,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.insertSplitSize(insertSplitSize).build())
.withStorageConfig(
HoodieStorageConfig.newBuilder()
.limitFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
.hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
.parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
.build();
}
}

View File

@@ -411,7 +411,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
private void checkReadRecords(String instantTime, int numExpectedRecords) throws IOException {
if (tableType == HoodieTableType.COPY_ON_WRITE) {
HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline();
assertEquals(numExpectedRecords, HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, instantTime).count());
assertEquals(numExpectedRecords, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, instantTime));
} else {
// TODO: This code fails to read records under the following conditions:
// 1. No parquet files yet (i.e. no compaction done yet)

View File

@@ -45,6 +45,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -89,8 +92,10 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
insertRecords
.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
.collect(Collectors.toMap(r -> r.getRecordKey(), Function.identity()));
HoodieCreateHandle createHandle =
new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(), "f1-0", insertRecords.iterator(), supplier);
new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(), "f1-0", insertRecordMap, supplier);
createHandle.write();
return createHandle.close();
}).collect();

View File

@@ -441,7 +441,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
.withWriteStatusClass(MetadataMergeWriteStatus.class)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()

View File

@@ -465,7 +465,8 @@ public class TestHBaseIndex extends FunctionalTestHarness {
.withParallelism(1, 1)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build())
.withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
.withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder()

View File

@@ -76,7 +76,8 @@ public class TestHBaseQPSResourceAllocator {
.withParallelism(1, 1)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build())
.withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table").withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.HBASE).withHBaseIndexConfig(hoodieHBaseIndexConfig).build());
}

View File

@@ -178,7 +178,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
.withWriteStatusClass(MetadataMergeWriteStatus.class)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()

View File

@@ -310,7 +310,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withBulkInsertParallelism(2).withWriteStatusClass(TestWriteStatus.class);

View File

@@ -50,6 +50,11 @@ public class TestHoodieFileWriterFactory extends HoodieClientTestBase {
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
assertTrue(parquetWriter instanceof HoodieParquetWriter);
final Path hfilePath = new Path(basePath + "/partition/path/f1_1-0-1_000.hfile");
HoodieFileWriter<IndexedRecord> hfileWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
hfilePath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
assertTrue(hfileWriter instanceof HoodieHFileWriter);
// other file format exception.
final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> {

View File

@@ -49,7 +49,9 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
@@ -69,9 +71,9 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
@@ -100,15 +102,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
@TempDir
public java.nio.file.Path tempFolder;
private HoodieFileFormat baseFileFormat;
static Stream<HoodieFileFormat> argumentsProvider() {
return Stream.of(HoodieFileFormat.PARQUET);
}
private HoodieFileFormat baseFileFormat;
public void init(HoodieFileFormat baseFileFormat) throws IOException {
this.baseFileFormat = baseFileFormat;
initDFS();
initSparkContexts("TestHoodieMergeOnReadTable");
hadoopConf.addResource(dfs.getConf());
@@ -122,15 +120,65 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
rtJobConf = new JobConf(hadoopConf);
}
@BeforeEach
public void init() throws IOException {
init(HoodieFileFormat.PARQUET);
}
@AfterEach
public void clean() throws IOException {
cleanupResources();
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testSimpleInsertAndUpdate(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
@Test
public void testSimpleInsertAndUpdate() throws Exception {
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
insertAndGetFilePaths(records, client, cfg, newCommitTime);
/**
* Write 2 (updates)
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateAndGetFilePaths(records, client, cfg, newCommitTime);
String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
client.compact(compactionCommitTime);
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(),
"Expecting a single commit.");
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime));
assertEquals(200, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"),
"Must contain 200 records");
}
}
@Test
public void testSimpleInsertAndUpdateHFile() throws Exception {
clean();
init(HoodieFileFormat.HFILE);
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -170,18 +218,15 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime));
assertEquals(200, HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
assertEquals(200, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"),
"Must contain 200 records");
}
}
// test incremental read does not go past compaction instant for RO views
// For RT views, incremental read can go past compaction
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testIncrementalReadsWithCompaction(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
@Test
public void testIncrementalReadsWithCompaction() throws Exception {
String partitionPath = "2020/02/20"; // use only one partition for this test
dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
HoodieWriteConfig cfg = getConfig(true);
@@ -279,11 +324,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
// Check if record level metadata is aggregated properly at the end of write.
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMetadataAggregateFromWriteStatus(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
@Test
public void testMetadataAggregateFromWriteStatus() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -305,11 +347,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testSimpleInsertUpdateAndDelete(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
@Test
public void testSimpleInsertUpdateAndDelete() throws Exception {
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -388,10 +427,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
private void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
init(baseFileFormat);
private void testCOWToMORConvertedTableRollback(Boolean rollbackUsingMarkers) throws Exception {
// Set TableType to COW
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat);
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -428,7 +466,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// Set TableType to MOR
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, baseFileFormat);
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
// rollback a COW commit when TableType is MOR
client.rollback(newCommitTime);
@@ -443,22 +481,19 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testCOWToMORConvertedTableRollbackUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
testCOWToMORConvertedTableRollback(baseFileFormat, false);
@Test
public void testCOWToMORConvertedTableRollbackUsingFileList() throws Exception {
testCOWToMORConvertedTableRollback(false);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testCOWToMORConvertedTableRollbackUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
testCOWToMORConvertedTableRollback(baseFileFormat, true);
@Test
public void testCOWToMORConvertedTableRollbackUsingMarkers() throws Exception {
testCOWToMORConvertedTableRollback(true);
}
private void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
init(baseFileFormat);
private void testRollbackWithDeltaAndCompactionCommit(Boolean rollbackUsingMarkers) throws Exception {
HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
// Test delta commit rollback
@@ -604,23 +639,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testRollbackWithDeltaAndCompactionCommitUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
testRollbackWithDeltaAndCompactionCommit(baseFileFormat, false);
@Test
public void testRollbackWithDeltaAndCompactionCommitUsingFileList() throws Exception {
testRollbackWithDeltaAndCompactionCommit(false);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testRollbackWithDeltaAndCompactionCommitUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
testRollbackWithDeltaAndCompactionCommit(baseFileFormat, true);
@Test
public void testRollbackWithDeltaAndCompactionCommitUsingMarkers() throws Exception {
testRollbackWithDeltaAndCompactionCommit(true);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMultiRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
@Test
public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
HoodieWriteConfig cfg = getConfig(false);
try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
/**
@@ -777,15 +807,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withEmbeddedTimelineServerEnabled(true)
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024).build()).forTable("test-trip-table")
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024).parquetMaxFileSize(1024).build()).forTable("test-trip-table")
.build();
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testUpsertPartitioner(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
@Test
public void testUpsertPartitioner() throws Exception {
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -863,11 +890,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testLogFileCountsAfterCompaction(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
@Test
public void testLogFileCountsAfterCompaction() throws Exception {
// insert 100 records
HoodieWriteConfig config = getConfig(true);
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
@@ -939,11 +963,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testSimpleInsertsGeneratedIntoLogFiles(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
@Test
public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception {
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -979,10 +1000,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
private void testInsertsGeneratedIntoLogFilesRollback(HoodieFileFormat baseFileFormat,
Boolean rollbackUsingMarkers) throws Exception {
init(baseFileFormat);
private void testInsertsGeneratedIntoLogFilesRollback(Boolean rollbackUsingMarkers) throws Exception {
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build();
@@ -1069,22 +1087,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollbackUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
testInsertsGeneratedIntoLogFilesRollback(baseFileFormat, false);
@Test
public void testInsertsGeneratedIntoLogFilesRollbackUsingFileList() throws Exception {
testInsertsGeneratedIntoLogFilesRollback(false);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollbackUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
testInsertsGeneratedIntoLogFilesRollback(baseFileFormat, true);
@Test
public void testInsertsGeneratedIntoLogFilesRollbackUsingMarkers() throws Exception {
testInsertsGeneratedIntoLogFilesRollback(true);
}
private void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(HoodieFileFormat baseFileFormat,
Boolean rollbackUsingMarkers) throws Exception {
init(baseFileFormat);
private void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(Boolean rollbackUsingMarkers) throws Exception {
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build();
@@ -1135,23 +1148,20 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(baseFileFormat, false);
@Test
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingFileList() throws Exception {
testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(false);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(baseFileFormat, true);
@Test
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingMarkers() throws Exception {
testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(true);
}
/**
* Test to ensure metadata stats are correctly written to metadata file.
*/
public void testMetadataStatsOnCommit(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
init(baseFileFormat);
public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY)
.withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -1231,26 +1241,21 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
/**
* Test to ensure rolling stats are correctly written to metadata file.
*/
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMetadataStatsOnCommitUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
testMetadataStatsOnCommit(baseFileFormat, false);
@Test
public void testMetadataStatsOnCommitUsingFileList() throws Exception {
testMetadataStatsOnCommit(false);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMetadataStatsOnCommitUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
testMetadataStatsOnCommit(baseFileFormat, true);
@Test
public void testMetadataStatsOnCommitUsingMarkers() throws Exception {
testMetadataStatsOnCommit(true);
}
/**
* Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them.
*/
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMetadataStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
@Test
public void testRollingStatsWithSmallFileHandling() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
Map<String, Long> fileIdToInsertsMap = new HashMap<>();
@@ -1364,11 +1369,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
/**
* Test to validate invoking table.handleUpdate() with input records from multiple partitions will fail.
*/
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testHandleUpdateWithMultiplePartitions(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
@Test
public void testHandleUpdateWithMultiplePartitions() throws Exception {
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -1467,7 +1469,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
.withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
@@ -1606,6 +1608,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
} else {
return ((HoodieParquetInputFormat)inputFormat).listStatus(jobConf);
}
case HFILE:
if (realtime) {
return ((HoodieHFileRealtimeInputFormat)inputFormat).listStatus(jobConf);
} else {
return ((HoodieHFileInputFormat)inputFormat).listStatus(jobConf);
}
default:
throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
}

View File

@@ -366,7 +366,8 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
@Test
public void testFileSizeUpsertRecords() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(HoodieStorageConfig.newBuilder()
.limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build();
.parquetMaxFileSize(64 * 1024).hfileMaxFileSize(64 * 1024)
.parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build();
String instantTime = HoodieTestUtils.makeNewCommitTime();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
@@ -401,7 +402,8 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
@Test
public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
.withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(1000 * 1024).hfileMaxFileSize(1000 * 1024).build()).build();
metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
String instantTime = "000";

View File

@@ -71,7 +71,8 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build())
.build();
FileCreateUtils.createCommit(basePath, "001");
FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", "file1", fileSize);

View File

@@ -72,7 +72,8 @@ public class CompactionTestBase extends HoodieClientTestBase {
.withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
@@ -194,7 +195,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
assertEquals(latestCompactionCommitTime, compactionInstantTime,
"Expect compaction instant time to be the latest commit time");
assertEquals(expectedNumRecs,
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"),
"Must contain expected records");
}

View File

@@ -95,7 +95,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
.withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.withMemoryConfig(HoodieMemoryConfig.newBuilder().withMaxDFSStreamBufferSize(1 * 1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());

View File

@@ -130,7 +130,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
.withWriteStatusClass(MetadataMergeWriteStatus.class)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
@@ -459,12 +459,12 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
// Check that the incremental consumption from prevCommitTime
assertEquals(HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count(),
HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, prevCommitTime),
"Incremental consumption from " + prevCommitTime + " should give all records in latest commit");
if (commitTimesBetweenPrevAndNew.isPresent()) {
commitTimesBetweenPrevAndNew.get().forEach(ct -> {
assertEquals(HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count(),
HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, ct),
"Incremental consumption from " + ct + " should give all records in latest commit");
});
}
@@ -527,7 +527,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
// Check that the incremental consumption from prevCommitTime
assertEquals(HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count(),
HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, prevCommitTime),
"Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
+ " since it is a delete operation");
}

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -45,6 +46,10 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -57,12 +62,15 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Utility methods to aid testing inside the HoodieClient module.
@@ -129,7 +137,7 @@ public class HoodieClientTestUtils {
/**
* Obtain all new data written into the Hoodie table since the given timestamp.
*/
public static Dataset<Row> readSince(String basePath, SQLContext sqlContext,
public static long countRecordsSince(JavaSparkContext jsc, String basePath, SQLContext sqlContext,
HoodieTimeline commitTimeline, String lastCommitTime) {
List<HoodieInstant> commitsToReturn =
commitTimeline.findInstantsAfter(lastCommitTime, Integer.MAX_VALUE).getInstants().collect(Collectors.toList());
@@ -137,12 +145,17 @@ public class HoodieClientTestUtils {
// Go over the commit metadata, and obtain the new files that need to be read.
HashMap<String, String> fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
String[] paths = fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]);
Dataset<Row> rows = null;
if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
rows = sqlContext.read().parquet(paths);
return sqlContext.read().parquet(paths)
.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime))
.count();
} else if (paths[0].endsWith(HoodieFileFormat.HFILE.getFileExtension())) {
return readHFile(jsc, paths)
.filter(gr -> HoodieTimeline.compareTimestamps(lastCommitTime, HoodieActiveTimeline.LESSER_THAN,
gr.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()))
.count();
}
return rows.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime));
throw new HoodieException("Unsupported base file format for file :" + paths[0]);
} catch (IOException e) {
throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + lastCommitTime, e);
}
@@ -170,6 +183,37 @@ public class HoodieClientTestUtils {
}
}
public static Stream<GenericRecord> readHFile(JavaSparkContext jsc, String[] paths) {
// TODO: this should be ported to use HoodieStorageReader
List<GenericRecord> valuesAsList = new LinkedList<>();
FileSystem fs = FSUtils.getFs(paths[0], jsc.hadoopConfiguration());
CacheConfig cacheConfig = new CacheConfig(fs.getConf());
Schema schema = null;
for (String path : paths) {
try {
HFile.Reader reader = HFile.createReader(fs, new Path(path), cacheConfig, fs.getConf());
if (schema == null) {
schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get("schema".getBytes())));
}
HFileScanner scanner = reader.getScanner(false, false);
if (!scanner.seekTo()) {
// EOF reached
continue;
}
do {
Cell c = scanner.getKeyValue();
byte[] value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
valuesAsList.add(HoodieAvroUtils.bytesToAvro(value, schema));
} while (scanner.next());
} catch (IOException e) {
throw new HoodieException("Error reading hfile " + path + " as a dataframe", e);
}
}
return valuesAsList.stream();
}
/**
* TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}.
*/

View File

@@ -166,7 +166,7 @@ public class SparkDatasetTestUtils {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withBulkInsertParallelism(2);