Adding config for parquet compression ratio
This commit is contained in:
committed by
vinoth chandar
parent
48643795b8
commit
1b756db221
@@ -42,7 +42,7 @@ object SparkHelpers {
|
|||||||
val schema: Schema = sourceRecords.get(0).getSchema
|
val schema: Schema = sourceRecords.get(0).getSchema
|
||||||
val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble)
|
val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble)
|
||||||
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
|
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
|
||||||
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf)
|
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
|
||||||
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema)
|
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema)
|
||||||
for (rec <- sourceRecords) {
|
for (rec <- sourceRecords) {
|
||||||
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
|
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
|
||||||
|
|||||||
@@ -40,6 +40,9 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
|||||||
// used to size data blocks in log file
|
// used to size data blocks in log file
|
||||||
public static final String LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES = "hoodie.logfile.data.block.max.size";
|
public static final String LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES = "hoodie.logfile.data.block.max.size";
|
||||||
public static final String DEFAULT_LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES = String.valueOf(256*1024*1024); // 256 MB
|
public static final String DEFAULT_LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES = String.valueOf(256*1024*1024); // 256 MB
|
||||||
|
public static final String PARQUET_COMPRESSION_RATIO = "hoodie.parquet.compression.ratio";
|
||||||
|
// Default compression ratio for parquet
|
||||||
|
public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1);
|
||||||
|
|
||||||
private HoodieStorageConfig(Properties props) {
|
private HoodieStorageConfig(Properties props) {
|
||||||
super(props);
|
super(props);
|
||||||
@@ -93,6 +96,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder parquetCompressionRatio(double parquetCompressionRatio) {
|
||||||
|
props.setProperty(PARQUET_COMPRESSION_RATIO, String.valueOf(parquetCompressionRatio));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public HoodieStorageConfig build() {
|
public HoodieStorageConfig build() {
|
||||||
HoodieStorageConfig config = new HoodieStorageConfig(props);
|
HoodieStorageConfig config = new HoodieStorageConfig(props);
|
||||||
setDefaultOnCondition(props, !props.containsKey(PARQUET_FILE_MAX_BYTES),
|
setDefaultOnCondition(props, !props.containsKey(PARQUET_FILE_MAX_BYTES),
|
||||||
@@ -105,6 +113,8 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
|||||||
LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES, DEFAULT_LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES);
|
LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES, DEFAULT_LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES);
|
||||||
setDefaultOnCondition(props, !props.containsKey(LOGFILE_SIZE_MAX_BYTES),
|
setDefaultOnCondition(props, !props.containsKey(LOGFILE_SIZE_MAX_BYTES),
|
||||||
LOGFILE_SIZE_MAX_BYTES, DEFAULT_LOGFILE_SIZE_MAX_BYTES);
|
LOGFILE_SIZE_MAX_BYTES, DEFAULT_LOGFILE_SIZE_MAX_BYTES);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(PARQUET_COMPRESSION_RATIO),
|
||||||
|
PARQUET_COMPRESSION_RATIO, DEFAULT_STREAM_COMPRESSION_RATIO);
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -309,6 +309,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Integer.parseInt(props.getProperty(HoodieStorageConfig.LOGFILE_SIZE_MAX_BYTES));
|
return Integer.parseInt(props.getProperty(HoodieStorageConfig.LOGFILE_SIZE_MAX_BYTES));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public double getParquetCompressionRatio() {
|
||||||
|
return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* metrics properties
|
* metrics properties
|
||||||
|
|||||||
@@ -28,16 +28,18 @@ public class HoodieParquetConfig {
|
|||||||
private int pageSize;
|
private int pageSize;
|
||||||
private long maxFileSize;
|
private long maxFileSize;
|
||||||
private Configuration hadoopConf;
|
private Configuration hadoopConf;
|
||||||
|
private double compressionRatio;
|
||||||
|
|
||||||
public HoodieParquetConfig(HoodieAvroWriteSupport writeSupport,
|
public HoodieParquetConfig(HoodieAvroWriteSupport writeSupport,
|
||||||
CompressionCodecName compressionCodecName, int blockSize, int pageSize, long maxFileSize,
|
CompressionCodecName compressionCodecName, int blockSize, int pageSize, long maxFileSize,
|
||||||
Configuration hadoopConf) {
|
Configuration hadoopConf, double compressionRatio) {
|
||||||
this.writeSupport = writeSupport;
|
this.writeSupport = writeSupport;
|
||||||
this.compressionCodecName = compressionCodecName;
|
this.compressionCodecName = compressionCodecName;
|
||||||
this.blockSize = blockSize;
|
this.blockSize = blockSize;
|
||||||
this.pageSize = pageSize;
|
this.pageSize = pageSize;
|
||||||
this.maxFileSize = maxFileSize;
|
this.maxFileSize = maxFileSize;
|
||||||
this.hadoopConf = hadoopConf;
|
this.hadoopConf = hadoopConf;
|
||||||
|
this.compressionRatio = compressionRatio;
|
||||||
}
|
}
|
||||||
|
|
||||||
public HoodieAvroWriteSupport getWriteSupport() {
|
public HoodieAvroWriteSupport getWriteSupport() {
|
||||||
@@ -63,4 +65,8 @@ public class HoodieParquetConfig {
|
|||||||
public Configuration getHadoopConf() {
|
public Configuration getHadoopConf() {
|
||||||
return hadoopConf;
|
return hadoopConf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public double getCompressionRatio() {
|
||||||
|
return compressionRatio;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,10 +39,8 @@ import org.apache.spark.TaskContext;
|
|||||||
public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
|
public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
|
||||||
extends ParquetWriter<IndexedRecord> implements HoodieStorageWriter<R> {
|
extends ParquetWriter<IndexedRecord> implements HoodieStorageWriter<R> {
|
||||||
|
|
||||||
private static double STREAM_COMPRESSION_RATIO = 0.1;
|
|
||||||
private static AtomicLong recordIndex = new AtomicLong(1);
|
private static AtomicLong recordIndex = new AtomicLong(1);
|
||||||
|
|
||||||
|
|
||||||
private final Path file;
|
private final Path file;
|
||||||
private final HoodieWrapperFileSystem fs;
|
private final HoodieWrapperFileSystem fs;
|
||||||
private final long maxFileSize;
|
private final long maxFileSize;
|
||||||
@@ -75,7 +73,7 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
|
|||||||
// We cannot accurately measure the snappy compressed output file size. We are choosing a conservative 10%
|
// We cannot accurately measure the snappy compressed output file size. We are choosing a conservative 10%
|
||||||
// TODO - compute this compression ratio dynamically by looking at the bytes written to the stream and the actual file size reported by HDFS
|
// TODO - compute this compression ratio dynamically by looking at the bytes written to the stream and the actual file size reported by HDFS
|
||||||
this.maxFileSize = parquetConfig.getMaxFileSize() + Math
|
this.maxFileSize = parquetConfig.getMaxFileSize() + Math
|
||||||
.round(parquetConfig.getMaxFileSize() * STREAM_COMPRESSION_RATIO);
|
.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
|
||||||
this.writeSupport = parquetConfig.getWriteSupport();
|
this.writeSupport = parquetConfig.getWriteSupport();
|
||||||
this.commitTime = commitTime;
|
this.commitTime = commitTime;
|
||||||
this.schema = schema;
|
this.schema = schema;
|
||||||
|
|||||||
@@ -50,7 +50,8 @@ public class HoodieStorageWriterFactory {
|
|||||||
HoodieParquetConfig parquetConfig =
|
HoodieParquetConfig parquetConfig =
|
||||||
new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
|
new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
|
||||||
config.getParquetBlockSize(), config.getParquetPageSize(),
|
config.getParquetBlockSize(), config.getParquetPageSize(),
|
||||||
config.getParquetMaxFileSize(), hoodieTable.getHadoopConf());
|
config.getParquetMaxFileSize(), hoodieTable.getHadoopConf(),
|
||||||
|
config.getParquetCompressionRatio());
|
||||||
|
|
||||||
return new HoodieParquetWriter<>(commitTime, path, parquetConfig, schema);
|
return new HoodieParquetWriter<>(commitTime, path, parquetConfig, schema);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
|
|||||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||||
import com.uber.hoodie.common.util.FSUtils;
|
import com.uber.hoodie.common.util.FSUtils;
|
||||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||||
|
import com.uber.hoodie.config.HoodieStorageConfig;
|
||||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||||
import com.uber.hoodie.io.storage.HoodieParquetConfig;
|
import com.uber.hoodie.io.storage.HoodieParquetConfig;
|
||||||
import com.uber.hoodie.io.storage.HoodieParquetWriter;
|
import com.uber.hoodie.io.storage.HoodieParquetWriter;
|
||||||
@@ -513,7 +514,8 @@ public class TestHoodieBloomIndex {
|
|||||||
String commitTime = FSUtils.getCommitTime(filename);
|
String commitTime = FSUtils.getCommitTime(filename);
|
||||||
HoodieParquetConfig config = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
|
HoodieParquetConfig config = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
|
||||||
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
|
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
|
||||||
HoodieTestUtils.getDefaultHadoopConf());
|
HoodieTestUtils.getDefaultHadoopConf(),
|
||||||
|
Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
|
||||||
HoodieParquetWriter writer = new HoodieParquetWriter(
|
HoodieParquetWriter writer = new HoodieParquetWriter(
|
||||||
commitTime,
|
commitTime,
|
||||||
new Path(basePath + "/" + partitionPath + "/" + filename),
|
new Path(basePath + "/" + partitionPath + "/" + filename),
|
||||||
|
|||||||
Reference in New Issue
Block a user