Add compression codec configurations for HoodieParquetWriter.
This commit is contained in:
committed by
vinoth chandar
parent
621f2b878d
commit
48797b1ae1
@@ -44,6 +44,9 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
|||||||
public static final String PARQUET_COMPRESSION_RATIO = "hoodie.parquet.compression.ratio";
|
public static final String PARQUET_COMPRESSION_RATIO = "hoodie.parquet.compression.ratio";
|
||||||
// Default compression ratio for parquet
|
// Default compression ratio for parquet
|
||||||
public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1);
|
public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1);
|
||||||
|
public static final String PARQUET_COMPRESSION_CODEC = "hoodie.parquet.compression.codec";
|
||||||
|
// Default compression codec for parquet
|
||||||
|
public static final String DEFAULT_PARQUET_COMPRESSION_CODEC = "gzip";
|
||||||
public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio";
|
public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio";
|
||||||
// Default compression ratio for log file to parquet, general 3x
|
// Default compression ratio for log file to parquet, general 3x
|
||||||
public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35);
|
public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35);
|
||||||
@@ -105,6 +108,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder parquetCompressionCodec(String parquetCompressionCodec) {
|
||||||
|
props.setProperty(PARQUET_COMPRESSION_CODEC, parquetCompressionCodec);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder logFileToParquetCompressionRatio(double logFileToParquetCompressionRatio) {
|
public Builder logFileToParquetCompressionRatio(double logFileToParquetCompressionRatio) {
|
||||||
props.setProperty(LOGFILE_TO_PARQUET_COMPRESSION_RATIO, String.valueOf(logFileToParquetCompressionRatio));
|
props.setProperty(LOGFILE_TO_PARQUET_COMPRESSION_RATIO, String.valueOf(logFileToParquetCompressionRatio));
|
||||||
return this;
|
return this;
|
||||||
@@ -124,6 +132,8 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
|||||||
LOGFILE_SIZE_MAX_BYTES, DEFAULT_LOGFILE_SIZE_MAX_BYTES);
|
LOGFILE_SIZE_MAX_BYTES, DEFAULT_LOGFILE_SIZE_MAX_BYTES);
|
||||||
setDefaultOnCondition(props, !props.containsKey(PARQUET_COMPRESSION_RATIO),
|
setDefaultOnCondition(props, !props.containsKey(PARQUET_COMPRESSION_RATIO),
|
||||||
PARQUET_COMPRESSION_RATIO, DEFAULT_STREAM_COMPRESSION_RATIO);
|
PARQUET_COMPRESSION_RATIO, DEFAULT_STREAM_COMPRESSION_RATIO);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(PARQUET_COMPRESSION_CODEC),
|
||||||
|
PARQUET_COMPRESSION_CODEC, DEFAULT_PARQUET_COMPRESSION_CODEC);
|
||||||
setDefaultOnCondition(props, !props.containsKey(LOGFILE_TO_PARQUET_COMPRESSION_RATIO),
|
setDefaultOnCondition(props, !props.containsKey(LOGFILE_TO_PARQUET_COMPRESSION_RATIO),
|
||||||
LOGFILE_TO_PARQUET_COMPRESSION_RATIO, DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO);
|
LOGFILE_TO_PARQUET_COMPRESSION_RATIO, DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO);
|
||||||
return config;
|
return config;
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ import java.io.InputStream;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import javax.annotation.concurrent.Immutable;
|
import javax.annotation.concurrent.Immutable;
|
||||||
|
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||||
import org.apache.spark.storage.StorageLevel;
|
import org.apache.spark.storage.StorageLevel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -355,6 +356,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO));
|
return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CompressionCodecName getParquetCompressionCodec() {
|
||||||
|
return CompressionCodecName.fromConf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC));
|
||||||
|
}
|
||||||
|
|
||||||
public double getLogFileToParquetCompressionRatio() {
|
public double getLogFileToParquetCompressionRatio() {
|
||||||
return Double.valueOf(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO));
|
return Double.valueOf(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ import org.apache.avro.Schema;
|
|||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
|
||||||
|
|
||||||
public class HoodieStorageWriterFactory {
|
public class HoodieStorageWriterFactory {
|
||||||
|
|
||||||
@@ -47,7 +46,7 @@ public class HoodieStorageWriterFactory {
|
|||||||
new AvroSchemaConverter().convert(schema), schema, filter);
|
new AvroSchemaConverter().convert(schema), schema, filter);
|
||||||
|
|
||||||
HoodieParquetConfig parquetConfig =
|
HoodieParquetConfig parquetConfig =
|
||||||
new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
|
new HoodieParquetConfig(writeSupport, config.getParquetCompressionCodec(),
|
||||||
config.getParquetBlockSize(), config.getParquetPageSize(),
|
config.getParquetBlockSize(), config.getParquetPageSize(),
|
||||||
config.getParquetMaxFileSize(), hoodieTable.getHadoopConf(),
|
config.getParquetMaxFileSize(), hoodieTable.getHadoopConf(),
|
||||||
config.getParquetCompressionRatio());
|
config.getParquetCompressionRatio());
|
||||||
|
|||||||
Reference in New Issue
Block a user