[HUDI-764] [HUDI-765] ORC reader writer Implementation (#2999)
Co-authored-by: Qingyun (Teresa) Kang <kteresa@uber.com>
This commit is contained in:
@@ -39,10 +39,21 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
||||
public static final String DEFAULT_PARQUET_BLOCK_SIZE_BYTES = DEFAULT_PARQUET_FILE_MAX_BYTES;
|
||||
public static final String PARQUET_PAGE_SIZE_BYTES = "hoodie.parquet.page.size";
|
||||
public static final String DEFAULT_PARQUET_PAGE_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
|
||||
|
||||
public static final String HFILE_FILE_MAX_BYTES = "hoodie.hfile.max.file.size";
|
||||
public static final String HFILE_BLOCK_SIZE_BYTES = "hoodie.hfile.block.size";
|
||||
public static final String DEFAULT_HFILE_BLOCK_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
|
||||
public static final String DEFAULT_HFILE_FILE_MAX_BYTES = String.valueOf(120 * 1024 * 1024);
|
||||
|
||||
public static final String ORC_FILE_MAX_BYTES = "hoodie.orc.max.file.size";
|
||||
public static final String DEFAULT_ORC_FILE_MAX_BYTES = String.valueOf(120 * 1024 * 1024);
|
||||
// size of the memory buffer in bytes for writing
|
||||
public static final String ORC_STRIPE_SIZE = "hoodie.orc.stripe.size";
|
||||
public static final String DEFAULT_ORC_STRIPE_SIZE = String.valueOf(64 * 1024 * 1024);
|
||||
// file system block size
|
||||
public static final String ORC_BLOCK_SIZE = "hoodie.orc.block.size";
|
||||
public static final String DEFAULT_ORC_BLOCK_SIZE = DEFAULT_ORC_FILE_MAX_BYTES;
|
||||
|
||||
// used to size log files
|
||||
public static final String LOGFILE_SIZE_MAX_BYTES = "hoodie.logfile.max.size";
|
||||
public static final String DEFAULT_LOGFILE_SIZE_MAX_BYTES = String.valueOf(1024 * 1024 * 1024); // 1 GB
|
||||
@@ -54,9 +65,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
||||
public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1);
|
||||
public static final String PARQUET_COMPRESSION_CODEC = "hoodie.parquet.compression.codec";
|
||||
public static final String HFILE_COMPRESSION_ALGORITHM = "hoodie.hfile.compression.algorithm";
|
||||
public static final String ORC_COMPRESSION_CODEC = "hoodie.orc.compression.codec";
|
||||
// Default compression codec for parquet
|
||||
public static final String DEFAULT_PARQUET_COMPRESSION_CODEC = "gzip";
|
||||
public static final String DEFAULT_HFILE_COMPRESSION_ALGORITHM = "GZ";
|
||||
public static final String DEFAULT_ORC_COMPRESSION_CODEC = "ZLIB";
|
||||
public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio";
|
||||
// Default compression ratio for log file to parquet, general 3x
|
||||
public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35);
|
||||
@@ -140,6 +153,26 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder orcMaxFileSize(long maxFileSize) {
|
||||
props.setProperty(ORC_FILE_MAX_BYTES, String.valueOf(maxFileSize));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder orcStripeSize(int orcStripeSize) {
|
||||
props.setProperty(ORC_STRIPE_SIZE, String.valueOf(orcStripeSize));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder orcBlockSize(int orcBlockSize) {
|
||||
props.setProperty(ORC_BLOCK_SIZE, String.valueOf(orcBlockSize));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder orcCompressionCodec(String orcCompressionCodec) {
|
||||
props.setProperty(ORC_COMPRESSION_CODEC, orcCompressionCodec);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieStorageConfig build() {
|
||||
HoodieStorageConfig config = new HoodieStorageConfig(props);
|
||||
setDefaultOnCondition(props, !props.containsKey(PARQUET_FILE_MAX_BYTES), PARQUET_FILE_MAX_BYTES,
|
||||
@@ -166,6 +199,15 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
||||
setDefaultOnCondition(props, !props.containsKey(HFILE_FILE_MAX_BYTES), HFILE_FILE_MAX_BYTES,
|
||||
DEFAULT_HFILE_FILE_MAX_BYTES);
|
||||
|
||||
setDefaultOnCondition(props, !props.containsKey(ORC_FILE_MAX_BYTES), ORC_FILE_MAX_BYTES,
|
||||
DEFAULT_ORC_FILE_MAX_BYTES);
|
||||
setDefaultOnCondition(props, !props.containsKey(ORC_STRIPE_SIZE), ORC_STRIPE_SIZE,
|
||||
DEFAULT_ORC_STRIPE_SIZE);
|
||||
setDefaultOnCondition(props, !props.containsKey(ORC_BLOCK_SIZE), ORC_BLOCK_SIZE,
|
||||
DEFAULT_ORC_BLOCK_SIZE);
|
||||
setDefaultOnCondition(props, !props.containsKey(ORC_COMPRESSION_CODEC), ORC_COMPRESSION_CODEC,
|
||||
DEFAULT_ORC_COMPRESSION_CODEC);
|
||||
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ import org.apache.hudi.metrics.MetricsReporterType;
|
||||
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
|
||||
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
|
||||
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
|
||||
import org.apache.orc.CompressionKind;
|
||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
@@ -784,6 +785,22 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return Compression.Algorithm.valueOf(props.getProperty(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM));
|
||||
}
|
||||
|
||||
public long getOrcMaxFileSize() {
|
||||
return Long.parseLong(props.getProperty(HoodieStorageConfig.ORC_FILE_MAX_BYTES));
|
||||
}
|
||||
|
||||
public int getOrcStripeSize() {
|
||||
return Integer.parseInt(props.getProperty(HoodieStorageConfig.ORC_STRIPE_SIZE));
|
||||
}
|
||||
|
||||
public int getOrcBlockSize() {
|
||||
return Integer.parseInt(props.getProperty(HoodieStorageConfig.ORC_BLOCK_SIZE));
|
||||
}
|
||||
|
||||
public CompressionKind getOrcCompressionCodec() {
|
||||
return CompressionKind.valueOf(props.getProperty(HoodieStorageConfig.ORC_COMPRESSION_CODEC));
|
||||
}
|
||||
|
||||
/**
|
||||
* metrics properties.
|
||||
*/
|
||||
|
||||
@@ -18,6 +18,9 @@
|
||||
|
||||
package org.apache.hudi.io.storage;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
@@ -35,4 +38,11 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
|
||||
void writeAvro(String key, R oldRecord) throws IOException;
|
||||
|
||||
long getBytesWritten();
|
||||
|
||||
default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
|
||||
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
|
||||
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
|
||||
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
|
||||
|
||||
@@ -49,6 +50,9 @@ public class HoodieFileWriterFactory {
|
||||
if (HFILE.getFileExtension().equals(extension)) {
|
||||
return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
|
||||
}
|
||||
if (ORC.getFileExtension().equals(extension)) {
|
||||
return newOrcFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
|
||||
}
|
||||
throw new UnsupportedOperationException(extension + " format not supported yet.");
|
||||
}
|
||||
|
||||
@@ -77,6 +81,15 @@ public class HoodieFileWriterFactory {
|
||||
return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier);
|
||||
}
|
||||
|
||||
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newOrcFileWriter(
|
||||
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
|
||||
TaskContextSupplier taskContextSupplier) throws IOException {
|
||||
BloomFilter filter = createBloomFilter(config);
|
||||
HoodieOrcConfig orcConfig = new HoodieOrcConfig(hoodieTable.getHadoopConf(), config.getOrcCompressionCodec(),
|
||||
config.getOrcStripeSize(), config.getOrcBlockSize(), config.getOrcMaxFileSize(), filter);
|
||||
return new HoodieOrcWriter<>(instantTime, path, orcConfig, schema, taskContextSupplier);
|
||||
}
|
||||
|
||||
private static BloomFilter createBloomFilter(HoodieWriteConfig config) {
|
||||
return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
|
||||
config.getDynamicBloomFilterMaxNumEntries(),
|
||||
|
||||
@@ -99,13 +99,9 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
|
||||
|
||||
@Override
|
||||
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
|
||||
String seqId =
|
||||
HoodieRecord.generateSequenceId(instantTime, taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement());
|
||||
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(),
|
||||
file.getName());
|
||||
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
|
||||
|
||||
writeAvro(record.getRecordKey(), (IndexedRecord)avroRecord);
|
||||
prepRecordWithMetadata(avroRecord, record, instantTime,
|
||||
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
|
||||
writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.io.storage;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hudi.common.bloom.BloomFilter;
|
||||
import org.apache.orc.CompressionKind;
|
||||
|
||||
public class HoodieOrcConfig {
|
||||
static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema";
|
||||
|
||||
private final CompressionKind compressionKind;
|
||||
private final int stripeSize;
|
||||
private final int blockSize;
|
||||
private final long maxFileSize;
|
||||
private final Configuration hadoopConf;
|
||||
private final BloomFilter bloomFilter;
|
||||
|
||||
public HoodieOrcConfig(Configuration hadoopConf, CompressionKind compressionKind, int stripeSize,
|
||||
int blockSize, long maxFileSize, BloomFilter bloomFilter) {
|
||||
this.hadoopConf = hadoopConf;
|
||||
this.compressionKind = compressionKind;
|
||||
this.stripeSize = stripeSize;
|
||||
this.blockSize = blockSize;
|
||||
this.maxFileSize = maxFileSize;
|
||||
this.bloomFilter = bloomFilter;
|
||||
}
|
||||
|
||||
public Configuration getHadoopConf() {
|
||||
return hadoopConf;
|
||||
}
|
||||
|
||||
public CompressionKind getCompressionKind() {
|
||||
return compressionKind;
|
||||
}
|
||||
|
||||
public int getStripeSize() {
|
||||
return stripeSize;
|
||||
}
|
||||
|
||||
public int getBlockSize() {
|
||||
return blockSize;
|
||||
}
|
||||
|
||||
public long getMaxFileSize() {
|
||||
return maxFileSize;
|
||||
}
|
||||
|
||||
public boolean useBloomFilter() {
|
||||
return bloomFilter != null;
|
||||
}
|
||||
|
||||
public BloomFilter getBloomFilter() {
|
||||
return bloomFilter;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,172 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.io.storage;
|
||||
|
||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
|
||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
|
||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
|
||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
|
||||
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
|
||||
import org.apache.hudi.common.bloom.BloomFilter;
|
||||
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
|
||||
import org.apache.orc.OrcFile;
|
||||
import org.apache.orc.TypeDescription;
|
||||
import org.apache.orc.Writer;
|
||||
import org.apache.hudi.common.engine.TaskContextSupplier;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.AvroOrcUtils;
|
||||
|
||||
public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
|
||||
implements HoodieFileWriter<R> {
|
||||
private static final AtomicLong RECORD_INDEX = new AtomicLong(1);
|
||||
|
||||
private final long maxFileSize;
|
||||
private final Schema avroSchema;
|
||||
private final List<TypeDescription> fieldTypes;
|
||||
private final List<String> fieldNames;
|
||||
private final VectorizedRowBatch batch;
|
||||
private final Writer writer;
|
||||
|
||||
private final Path file;
|
||||
private final HoodieWrapperFileSystem fs;
|
||||
private final String instantTime;
|
||||
private final TaskContextSupplier taskContextSupplier;
|
||||
|
||||
private HoodieOrcConfig orcConfig;
|
||||
private String minRecordKey;
|
||||
private String maxRecordKey;
|
||||
|
||||
public HoodieOrcWriter(String instantTime, Path file, HoodieOrcConfig config, Schema schema,
|
||||
TaskContextSupplier taskContextSupplier) throws IOException {
|
||||
|
||||
Configuration conf = FSUtils.registerFileSystem(file, config.getHadoopConf());
|
||||
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
|
||||
this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
|
||||
this.instantTime = instantTime;
|
||||
this.taskContextSupplier = taskContextSupplier;
|
||||
|
||||
this.avroSchema = schema;
|
||||
final TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema);
|
||||
this.fieldTypes = orcSchema.getChildren();
|
||||
this.fieldNames = orcSchema.getFieldNames();
|
||||
this.maxFileSize = config.getMaxFileSize();
|
||||
this.batch = orcSchema.createRowBatch();
|
||||
OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf)
|
||||
.blockSize(config.getBlockSize())
|
||||
.stripeSize(config.getStripeSize())
|
||||
.compress(config.getCompressionKind())
|
||||
.bufferSize(config.getBlockSize())
|
||||
.fileSystem(fs)
|
||||
.setSchema(orcSchema);
|
||||
this.writer = OrcFile.createWriter(this.file, writerOptions);
|
||||
this.orcConfig = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
|
||||
prepRecordWithMetadata(avroRecord, record, instantTime,
|
||||
taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName());
|
||||
writeAvro(record.getRecordKey(), avroRecord);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canWrite() {
|
||||
return fs.getBytesWritten(file) < maxFileSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeAvro(String recordKey, IndexedRecord object) throws IOException {
|
||||
for (int col = 0; col < batch.numCols; col++) {
|
||||
ColumnVector colVector = batch.cols[col];
|
||||
final String thisField = fieldNames.get(col);
|
||||
final TypeDescription type = fieldTypes.get(col);
|
||||
|
||||
Object fieldValue = ((GenericRecord) object).get(thisField);
|
||||
Schema.Field avroField = avroSchema.getField(thisField);
|
||||
AvroOrcUtils.addToVector(type, colVector, avroField.schema(), fieldValue, batch.size);
|
||||
}
|
||||
|
||||
batch.size++;
|
||||
|
||||
// Batch size corresponds to the number of written rows out of 1024 total rows (by default)
|
||||
// in the row batch, add the batch to file once all rows are filled and reset.
|
||||
if (batch.size == batch.getMaxSize()) {
|
||||
writer.addRowBatch(batch);
|
||||
batch.reset();
|
||||
batch.size = 0;
|
||||
}
|
||||
|
||||
if (orcConfig.useBloomFilter()) {
|
||||
orcConfig.getBloomFilter().add(recordKey);
|
||||
if (minRecordKey != null) {
|
||||
minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
|
||||
} else {
|
||||
minRecordKey = recordKey;
|
||||
}
|
||||
|
||||
if (maxRecordKey != null) {
|
||||
maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
|
||||
} else {
|
||||
maxRecordKey = recordKey;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (batch.size != 0) {
|
||||
writer.addRowBatch(batch);
|
||||
batch.reset();
|
||||
}
|
||||
|
||||
if (orcConfig.useBloomFilter()) {
|
||||
final BloomFilter bloomFilter = orcConfig.getBloomFilter();
|
||||
writer.addUserMetadata(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()));
|
||||
if (minRecordKey != null && maxRecordKey != null) {
|
||||
writer.addUserMetadata(HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes()));
|
||||
writer.addUserMetadata(HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes()));
|
||||
}
|
||||
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
|
||||
writer.addUserMetadata(HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
|
||||
}
|
||||
}
|
||||
writer.addUserMetadata(HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY, ByteBuffer.wrap(avroSchema.toString().getBytes()));
|
||||
|
||||
writer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBytesWritten() {
|
||||
return fs.getBytesWritten(file);
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hudi.io.storage;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.avro.HoodieAvroWriteSupport;
|
||||
import org.apache.hudi.common.engine.TaskContextSupplier;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
@@ -27,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.parquet.hadoop.ParquetFileWriter;
|
||||
@@ -75,11 +73,8 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
|
||||
|
||||
@Override
|
||||
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
|
||||
String seqId =
|
||||
HoodieRecord.generateSequenceId(instantTime, taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement());
|
||||
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(),
|
||||
file.getName());
|
||||
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
|
||||
prepRecordWithMetadata(avroRecord, record, instantTime,
|
||||
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
|
||||
super.write(avroRecord);
|
||||
writeSupport.add(record.getRecordKey());
|
||||
}
|
||||
|
||||
@@ -656,6 +656,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
||||
public HoodieLogBlockType getLogDataBlockFormat() {
|
||||
switch (getBaseFileFormat()) {
|
||||
case PARQUET:
|
||||
case ORC:
|
||||
return HoodieLogBlockType.AVRO_DATA_BLOCK;
|
||||
case HFILE:
|
||||
return HoodieLogBlockType.HFILE_DATA_BLOCK;
|
||||
|
||||
@@ -0,0 +1,261 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.io.storage;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.bloom.BloomFilter;
|
||||
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
||||
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
||||
import org.apache.hudi.common.engine.TaskContextSupplier;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.orc.CompressionKind;
|
||||
import org.apache.orc.OrcFile;
|
||||
import org.apache.orc.Reader;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
|
||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
|
||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
|
||||
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
||||
import static org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class TestHoodieOrcReaderWriter {
|
||||
private final Path filePath = new Path(System.getProperty("java.io.tmpdir") + "/f1_1-0-1_000.orc");
|
||||
|
||||
@BeforeEach
|
||||
@AfterEach
|
||||
public void clearTempFile() {
|
||||
File file = new File(filePath.toString());
|
||||
if (file.exists()) {
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
private HoodieOrcWriter createOrcWriter(Schema avroSchema) throws Exception {
|
||||
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name());
|
||||
Configuration conf = new Configuration();
|
||||
int orcStripSize = Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_STRIPE_SIZE);
|
||||
int orcBlockSize = Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_BLOCK_SIZE);
|
||||
int maxFileSize = Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_FILE_MAX_BYTES);
|
||||
HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB, orcStripSize, orcBlockSize, maxFileSize, filter);
|
||||
TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class);
|
||||
String instantTime = "000";
|
||||
return new HoodieOrcWriter(instantTime, filePath, config, avroSchema, mockTaskContextSupplier);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteReadMetadata() throws Exception {
|
||||
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc");
|
||||
HoodieOrcWriter writer = createOrcWriter(avroSchema);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
GenericRecord record = new GenericData.Record(avroSchema);
|
||||
record.put("_row_key", "key" + i);
|
||||
record.put("time", Integer.toString(i));
|
||||
record.put("number", i);
|
||||
writer.writeAvro("key" + i, record);
|
||||
}
|
||||
writer.close();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
Reader orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
|
||||
assertEquals(4, orcReader.getMetadataKeys().size());
|
||||
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MIN_RECORD_KEY_FOOTER));
|
||||
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MAX_RECORD_KEY_FOOTER));
|
||||
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY));
|
||||
assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY));
|
||||
assertEquals(CompressionKind.ZLIB.name(), orcReader.getCompressionKind().toString());
|
||||
|
||||
HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
|
||||
BloomFilter filter = hoodieReader.readBloomFilter();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
assertTrue(filter.mightContain("key" + i));
|
||||
}
|
||||
assertFalse(filter.mightContain("non-existent-key"));
|
||||
assertEquals(3, hoodieReader.getTotalRecords());
|
||||
String[] minMaxRecordKeys = hoodieReader.readMinMaxRecordKeys();
|
||||
assertEquals(2, minMaxRecordKeys.length);
|
||||
assertEquals("key0", minMaxRecordKeys[0]);
|
||||
assertEquals("key2", minMaxRecordKeys[1]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteReadPrimitiveRecord() throws Exception {
|
||||
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc");
|
||||
HoodieOrcWriter writer = createOrcWriter(avroSchema);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
GenericRecord record = new GenericData.Record(avroSchema);
|
||||
record.put("_row_key", "key" + i);
|
||||
record.put("time", Integer.toString(i));
|
||||
record.put("number", i);
|
||||
writer.writeAvro("key" + i, record);
|
||||
}
|
||||
writer.close();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
Reader orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
|
||||
assertEquals("struct<_row_key:string,time:string,number:int>", orcReader.getSchema().toString());
|
||||
assertEquals(3, orcReader.getNumberOfRows());
|
||||
|
||||
HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
|
||||
Iterator<GenericRecord> iter = hoodieReader.getRecordIterator();
|
||||
int index = 0;
|
||||
while (iter.hasNext()) {
|
||||
GenericRecord record = iter.next();
|
||||
assertEquals("key" + index, record.get("_row_key").toString());
|
||||
assertEquals(Integer.toString(index), record.get("time").toString());
|
||||
assertEquals(index, record.get("number"));
|
||||
index++;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteReadComplexRecord() throws Exception {
|
||||
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithUDT.avsc");
|
||||
Schema udtSchema = avroSchema.getField("driver").schema().getTypes().get(1);
|
||||
HoodieOrcWriter writer = createOrcWriter(avroSchema);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
GenericRecord record = new GenericData.Record(avroSchema);
|
||||
record.put("_row_key", "key" + i);
|
||||
record.put("time", Integer.toString(i));
|
||||
record.put("number", i);
|
||||
GenericRecord innerRecord = new GenericData.Record(udtSchema);
|
||||
innerRecord.put("driver_name", "driver" + i);
|
||||
innerRecord.put("list", Collections.singletonList(i));
|
||||
innerRecord.put("map", Collections.singletonMap("key" + i, "value" + i));
|
||||
record.put("driver", innerRecord);
|
||||
writer.writeAvro("key" + i, record);
|
||||
}
|
||||
writer.close();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
|
||||
assertEquals("struct<_row_key:string,time:string,number:int,driver:struct<driver_name:string,list:array<int>,map:map<string,string>>>",
|
||||
reader.getSchema().toString());
|
||||
assertEquals(3, reader.getNumberOfRows());
|
||||
|
||||
HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
|
||||
Iterator<GenericRecord> iter = hoodieReader.getRecordIterator();
|
||||
int index = 0;
|
||||
while (iter.hasNext()) {
|
||||
GenericRecord record = iter.next();
|
||||
assertEquals("key" + index, record.get("_row_key").toString());
|
||||
assertEquals(Integer.toString(index), record.get("time").toString());
|
||||
assertEquals(index, record.get("number"));
|
||||
GenericRecord innerRecord = (GenericRecord) record.get("driver");
|
||||
assertEquals("driver" + index, innerRecord.get("driver_name").toString());
|
||||
assertEquals(1, ((List<?>)innerRecord.get("list")).size());
|
||||
assertEquals(index, ((List<?>)innerRecord.get("list")).get(0));
|
||||
assertEquals("value" + index, ((Map<?,?>)innerRecord.get("map")).get("key" + index).toString());
|
||||
index++;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteReadWithEvolvedSchema() throws Exception {
|
||||
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc");
|
||||
HoodieOrcWriter writer = createOrcWriter(avroSchema);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
GenericRecord record = new GenericData.Record(avroSchema);
|
||||
record.put("_row_key", "key" + i);
|
||||
record.put("time", Integer.toString(i));
|
||||
record.put("number", i);
|
||||
writer.writeAvro("key" + i, record);
|
||||
}
|
||||
writer.close();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
|
||||
Schema evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchema.avsc");
|
||||
Iterator<GenericRecord> iter = hoodieReader.getRecordIterator(evolvedSchema);
|
||||
int index = 0;
|
||||
while (iter.hasNext()) {
|
||||
GenericRecord record = iter.next();
|
||||
assertEquals("key" + index, record.get("_row_key").toString());
|
||||
assertEquals(Integer.toString(index), record.get("time").toString());
|
||||
assertEquals(index, record.get("number"));
|
||||
assertNull(record.get("added_field"));
|
||||
index++;
|
||||
}
|
||||
|
||||
evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaChangeOrder.avsc");
|
||||
iter = hoodieReader.getRecordIterator(evolvedSchema);
|
||||
index = 0;
|
||||
while (iter.hasNext()) {
|
||||
GenericRecord record = iter.next();
|
||||
assertEquals("key" + index, record.get("_row_key").toString());
|
||||
assertEquals(Integer.toString(index), record.get("time").toString());
|
||||
assertEquals(index, record.get("number"));
|
||||
assertNull(record.get("added_field"));
|
||||
index++;
|
||||
}
|
||||
|
||||
evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaColumnRequire.avsc");
|
||||
iter = hoodieReader.getRecordIterator(evolvedSchema);
|
||||
index = 0;
|
||||
while (iter.hasNext()) {
|
||||
GenericRecord record = iter.next();
|
||||
assertEquals("key" + index, record.get("_row_key").toString());
|
||||
assertEquals(Integer.toString(index), record.get("time").toString());
|
||||
assertEquals(index, record.get("number"));
|
||||
assertNull(record.get("added_field"));
|
||||
index++;
|
||||
}
|
||||
|
||||
evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaColumnType.avsc");
|
||||
iter = hoodieReader.getRecordIterator(evolvedSchema);
|
||||
index = 0;
|
||||
while (iter.hasNext()) {
|
||||
GenericRecord record = iter.next();
|
||||
assertEquals("key" + index, record.get("_row_key").toString());
|
||||
assertEquals(Integer.toString(index), record.get("time").toString());
|
||||
assertEquals(Integer.toString(index), record.get("number").toString());
|
||||
assertNull(record.get("added_field"));
|
||||
index++;
|
||||
}
|
||||
|
||||
evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaDeleteColumn.avsc");
|
||||
iter = hoodieReader.getRecordIterator(evolvedSchema);
|
||||
index = 0;
|
||||
while (iter.hasNext()) {
|
||||
GenericRecord record = iter.next();
|
||||
assertEquals("key" + index, record.get("_row_key").toString());
|
||||
assertEquals(Integer.toString(index), record.get("time").toString());
|
||||
assertNull(record.get("number"));
|
||||
assertNull(record.get("added_field"));
|
||||
index++;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
{
|
||||
"namespace": "example.schema",
|
||||
"type": "record",
|
||||
"name": "trip",
|
||||
"fields": [
|
||||
{
|
||||
"name": "_row_key",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "time",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "number",
|
||||
"type": ["null", "int"]
|
||||
},
|
||||
{
|
||||
"name": "driver",
|
||||
"type": [
|
||||
"null",
|
||||
{
|
||||
"name": "person",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"default": null,
|
||||
"name": "driver_name",
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
{
|
||||
"name": "list",
|
||||
"type": {
|
||||
"type": "array",
|
||||
"items": "int"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "map",
|
||||
"type": {
|
||||
"type": "map",
|
||||
"values": "string"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
Reference in New Issue
Block a user