1
0

[HUDI-4038] Avoid calling getDataSize after every record written (#5497)

- getDataSize has non-trivial overhead in the current ParquetWriter impl, requiring traversal of already composed Column Groups in memory. Instead we can sample these calls to getDataSize to amortize its cost.

Co-authored-by: sivabalan <n.siva.b@gmail.com>
This commit is contained in:
Alexey Kudinkin
2022-05-11 05:08:31 -07:00
committed by GitHub
parent 4258a71517
commit 4a8589f222
12 changed files with 124 additions and 97 deletions

View File

@@ -18,22 +18,15 @@
package org.apache.hudi.io.storage;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if
@@ -42,45 +35,24 @@ import java.util.concurrent.atomic.AtomicLong;
* ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close
*/
@NotThreadSafe
public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
extends ParquetWriter<IndexedRecord> implements HoodieFileWriter<R> {
public class HoodieAvroParquetWriter<R extends IndexedRecord>
extends HoodieBaseParquetWriter<IndexedRecord>
implements HoodieFileWriter<R> {
private static AtomicLong recordIndex = new AtomicLong(1);
private final Path file;
private final HoodieWrapperFileSystem fs;
private final long maxFileSize;
private final HoodieAvroWriteSupport writeSupport;
private final String fileName;
private final String instantTime;
private final TaskContextSupplier taskContextSupplier;
private final boolean populateMetaFields;
private final HoodieAvroWriteSupport writeSupport;
public HoodieParquetWriter(String instantTime,
Path file,
HoodieAvroParquetConfig parquetConfig,
Schema schema,
TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE,
parquetConfig.getWriteSupport(),
parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(),
parquetConfig.getPageSize(),
parquetConfig.getPageSize(),
parquetConfig.dictionaryEnabled(),
DEFAULT_IS_VALIDATING_ENABLED,
DEFAULT_WRITER_VERSION,
FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
this.fs =
(HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
// We cannot accurately measure the snappy compressed output file size. We are choosing a
// conservative 10%
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
// stream and the actual file size reported by HDFS
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
@SuppressWarnings({"unchecked", "rawtypes"})
public HoodieAvroParquetWriter(Path file,
HoodieAvroParquetConfig parquetConfig,
String instantTime,
TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
super(file, (HoodieBaseParquetConfig) parquetConfig);
this.fileName = file.getName();
this.writeSupport = parquetConfig.getWriteSupport();
this.instantTime = instantTime;
this.taskContextSupplier = taskContextSupplier;
@@ -91,7 +63,7 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
if (populateMetaFields) {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName);
super.write(avroRecord);
writeSupport.add(key.getRecordKey());
} else {
@@ -99,11 +71,6 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
}
}
@Override
public boolean canWrite() {
return getDataSize() < maxFileSize;
}
@Override
public void writeAvro(String key, IndexedRecord object) throws IOException {
super.write(object);

View File

@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* Base class of Hudi's custom {@link ParquetWriter} implementations
*
* @param <R> target type of the object being written into Parquet files (for ex,
* {@code IndexedRecord}, {@code InternalRow})
*/
public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
private static final int WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK = 1000;
private final AtomicLong writtenRecordCount = new AtomicLong(0);
private final long maxFileSize;
private long lastCachedDataSize = -1;
public HoodieBaseParquetWriter(Path file,
HoodieBaseParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE,
parquetConfig.getWriteSupport(),
parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(),
parquetConfig.getPageSize(),
parquetConfig.getPageSize(),
parquetConfig.dictionaryEnabled(),
DEFAULT_IS_VALIDATING_ENABLED,
DEFAULT_WRITER_VERSION,
FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
// We cannot accurately measure the snappy compressed output file size. We are choosing a
// conservative 10%
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
// stream and the actual file size reported by HDFS
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
}
public boolean canWrite() {
// TODO we can actually do evaluation more accurately:
// if we cache last data size check, since we account for how many records
// were written we can accurately project avg record size, and therefore
// estimate how many more records we can write before cut off
if (lastCachedDataSize == -1 || getWrittenRecordCount() % WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK == 0) {
lastCachedDataSize = getDataSize();
}
return lastCachedDataSize < maxFileSize;
}
@Override
public void write(R object) throws IOException {
super.write(object);
writtenRecordCount.incrementAndGet();
}
protected long getWrittenRecordCount() {
return writtenRecordCount.get();
}
}

View File

@@ -18,14 +18,12 @@
package org.apache.hudi.io.storage;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
public interface HoodieFileWriter<R extends IndexedRecord> {
@@ -38,8 +36,8 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
void writeAvro(String key, R oldRecord) throws IOException;
default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, long recordIndex, String fileName) {
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName);
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
return;

View File

@@ -81,7 +81,7 @@ public class HoodieFileWriterFactory {
config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());
return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, taskContextSupplier, populateMetaFields);
return new HoodieAvroParquetWriter<>(path, parquetConfig, instantTime, taskContextSupplier, populateMetaFields);
}
static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newHFileFileWriter(

View File

@@ -113,7 +113,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
if (populateMetaFields) {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement(), file.getName());
writeAvro(key.getRecordKey(), avroRecord);
} else {
writeAvro(key.getRecordKey(), avroRecord);

View File

@@ -97,7 +97,7 @@ public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRec
@Override
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName());
taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX.getAndIncrement(), file.getName());
writeAvro(key.getRecordKey(), avroRecord);
}

View File

@@ -41,7 +41,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
import org.apache.hudi.io.storage.HoodieParquetWriter;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.avro.Schema;
@@ -113,10 +113,9 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable {
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()));
try (HoodieParquetWriter writer = new HoodieParquetWriter(
currentInstantTime,
new Path(Paths.get(basePath, partition, fileName).toString()),
config, schema, contextSupplier, populateMetaFields)) {
try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>(
new Path(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime,
contextSupplier, populateMetaFields)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) ((HoodieRecordPayload) record.getData()).getInsertValue(schema).get();

View File

@@ -379,7 +379,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
List<HoodieRecord> records = new ArrayList<>();
// Approx 1150 records are written for block size of 64KB
for (int i = 0; i < 2000; i++) {
for (int i = 0; i < 2050; i++) {
String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString()
+ "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}";
RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
@@ -402,7 +402,8 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
counts++;
}
}
assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file");
// we check canWrite only once every 1000 records. and so 2 files with 1000 records and 3rd file with 50 records.
assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file");
}
@Test

View File

@@ -19,11 +19,7 @@
package org.apache.hudi.io.storage.row;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
import org.apache.spark.sql.catalyst.InternalRow;
import java.io.IOException;
@@ -31,32 +27,16 @@ import java.io.IOException;
/**
* Parquet's impl of {@link HoodieInternalRowFileWriter} to write {@link InternalRow}s.
*/
public class HoodieInternalRowParquetWriter extends ParquetWriter<InternalRow>
public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter<InternalRow>
implements HoodieInternalRowFileWriter {
private final Path file;
private final HoodieWrapperFileSystem fs;
private final long maxFileSize;
private final HoodieRowParquetWriteSupport writeSupport;
public HoodieInternalRowParquetWriter(Path file, HoodieRowParquetConfig parquetConfig)
throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(),
DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED,
DEFAULT_WRITER_VERSION, FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file,
parquetConfig.getHadoopConf()));
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
this.writeSupport = parquetConfig.getWriteSupport();
}
super(file, parquetConfig);
@Override
public boolean canWrite() {
return getDataSize() < maxFileSize;
this.writeSupport = parquetConfig.getWriteSupport();
}
@Override
@@ -69,9 +49,4 @@ public class HoodieInternalRowParquetWriter extends ParquetWriter<InternalRow>
public void writeRow(InternalRow row) throws IOException {
super.write(row);
}
@Override
public void close() throws IOException {
super.close();
}
}

View File

@@ -49,7 +49,7 @@ public class TestHoodieFileWriterFactory extends HoodieClientTestBase {
SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
HoodieFileWriter<IndexedRecord> parquetWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
assertTrue(parquetWriter instanceof HoodieParquetWriter);
assertTrue(parquetWriter instanceof HoodieAvroParquetWriter);
// hfile format.
final Path hfilePath = new Path(basePath + "/partition/path/f1_1-0-1_000.hfile");

View File

@@ -419,7 +419,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
List<HoodieRecord> records = new ArrayList<>();
// Approx 1150 records are written for block size of 64KB
for (int i = 0; i < 2000; i++) {
for (int i = 0; i < 2050; i++) {
String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString()
+ "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}";
RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
@@ -441,7 +441,8 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
counts++;
}
}
assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file");
// we check canWrite only once every 1000 records. and so 2 files with 1000 records and 3rd file with 50 records.
assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file");
}
@Test