diff --git a/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala b/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala index 8e1eccfee..f383e5fba 100644 --- a/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala +++ b/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala @@ -42,7 +42,7 @@ object SparkHelpers { val schema: Schema = sourceRecords.get(0).getSchema val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble) val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter) - val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf) + val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble) val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema) for (rec <- sourceRecords) { val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java index 168174c63..0eb0a5c0e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java @@ -40,6 +40,9 @@ public class HoodieStorageConfig extends DefaultHoodieConfig { // used to size data blocks in log file public static final String LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES = "hoodie.logfile.data.block.max.size"; public static final String DEFAULT_LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES = String.valueOf(256*1024*1024); // 256 MB + public static final String PARQUET_COMPRESSION_RATIO = "hoodie.parquet.compression.ratio"; + // Default compression ratio for parquet + public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1); private HoodieStorageConfig(Properties props) { super(props); @@ -93,6 +96,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig { return this; } + public Builder parquetCompressionRatio(double parquetCompressionRatio) { + props.setProperty(PARQUET_COMPRESSION_RATIO, String.valueOf(parquetCompressionRatio)); + return this; + } + public HoodieStorageConfig build() { HoodieStorageConfig config = new HoodieStorageConfig(props); setDefaultOnCondition(props, !props.containsKey(PARQUET_FILE_MAX_BYTES), @@ -105,6 +113,8 @@ public class HoodieStorageConfig extends DefaultHoodieConfig { LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES, DEFAULT_LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES); setDefaultOnCondition(props, !props.containsKey(LOGFILE_SIZE_MAX_BYTES), LOGFILE_SIZE_MAX_BYTES, DEFAULT_LOGFILE_SIZE_MAX_BYTES); + setDefaultOnCondition(props, !props.containsKey(PARQUET_COMPRESSION_RATIO), + PARQUET_COMPRESSION_RATIO, DEFAULT_STREAM_COMPRESSION_RATIO); return config; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index aa0dc101d..d0911dca1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -309,6 +309,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(HoodieStorageConfig.LOGFILE_SIZE_MAX_BYTES)); } + public double getParquetCompressionRatio() { + return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO)); + } + /** * metrics properties diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetConfig.java index f0aa544da..985e2cc28 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetConfig.java @@ -28,16 +28,18 @@ public class HoodieParquetConfig { private int pageSize; private long maxFileSize; private Configuration hadoopConf; + private double compressionRatio; public HoodieParquetConfig(HoodieAvroWriteSupport writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize, long maxFileSize, - Configuration hadoopConf) { + Configuration hadoopConf, double compressionRatio) { this.writeSupport = writeSupport; this.compressionCodecName = compressionCodecName; this.blockSize = blockSize; this.pageSize = pageSize; this.maxFileSize = maxFileSize; this.hadoopConf = hadoopConf; + this.compressionRatio = compressionRatio; } public HoodieAvroWriteSupport getWriteSupport() { @@ -63,4 +65,8 @@ public class HoodieParquetConfig { public Configuration getHadoopConf() { return hadoopConf; } + + public double getCompressionRatio() { + return compressionRatio; + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java index 48bdfda59..9978c0747 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java @@ -39,10 +39,8 @@ import org.apache.spark.TaskContext; public class HoodieParquetWriter extends ParquetWriter implements HoodieStorageWriter { - private static double STREAM_COMPRESSION_RATIO = 0.1; private static AtomicLong recordIndex = new AtomicLong(1); - private final Path file; private final HoodieWrapperFileSystem fs; private final long maxFileSize; @@ -75,7 +73,7 @@ public class HoodieParquetWriter(commitTime, path, parquetConfig, schema); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index c0b7f4a27..4bf91001f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -37,6 +37,7 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.io.storage.HoodieParquetConfig; import com.uber.hoodie.io.storage.HoodieParquetWriter; @@ -513,7 +514,8 @@ public class TestHoodieBloomIndex { String commitTime = FSUtils.getCommitTime(filename); HoodieParquetConfig config = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, - HoodieTestUtils.getDefaultHadoopConf()); + HoodieTestUtils.getDefaultHadoopConf(), + Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO)); HoodieParquetWriter writer = new HoodieParquetWriter( commitTime, new Path(basePath + "/" + partitionPath + "/" + filename),