From c86edfc28e811c86d82f98ea07d44dae4f3c5cb1 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 24 Jun 2022 20:52:28 -0700 Subject: [PATCH] [HUDI-4319] Fixed Parquet's `PLAIN_DICTIONARY` encoding not being applied when bulk-inserting (#5966) * Fixed Dictionary encoding config not being properly propagated to Parquet writer (making it unable to apply it, substantially bloating the storage footprint) --- .../org/apache/hudi/cli/SparkHelpers.scala | 5 +-- .../io/storage/HoodieAvroParquetWriter.java | 4 +- .../io/storage/HoodieBaseParquetWriter.java | 2 +- .../io/storage/HoodieFileWriterFactory.java | 2 +- .../testutils/HoodieWriteableTestTable.java | 19 ++++----- .../row/HoodieRowDataFileWriterFactory.java | 3 +- .../row/HoodieRowDataParquetConfig.java | 36 ---------------- .../row/HoodieRowDataParquetWriter.java | 3 +- .../HoodieInternalRowFileWriterFactory.java | 14 +++++-- .../row/HoodieInternalRowParquetWriter.java | 3 +- .../storage/row/HoodieRowParquetConfig.java | 36 ---------------- .../TestHoodieInternalRowParquetWriter.java | 5 ++- .../log/block/HoodieParquetDataBlock.java | 22 +++++----- .../io/storage/HoodieAvroParquetConfig.java | 42 ------------------- ...etConfig.java => HoodieParquetConfig.java} | 10 ++--- .../io/storage/HoodieParquetStreamWriter.java | 2 +- 16 files changed, 50 insertions(+), 158 deletions(-) delete mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java delete mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java delete mode 100644 hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java rename hudi-common/src/main/java/org/apache/hudi/io/storage/{HoodieBaseParquetConfig.java => HoodieParquetConfig.java} (82%) diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index fbfc1d8ec..b9f8df5fc 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -23,12 +23,11 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.avro.HoodieAvroWriteSupport import org.apache.hudi.client.SparkTaskContextSupplier -import org.apache.hudi.common.HoodieJsonPayload import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.util.BaseFileUtils import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig} -import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter} +import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig} import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.{DataFrame, SQLContext} @@ -45,7 +44,7 @@ object SparkHelpers { val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble, HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue); val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter)) - val parquetConfig: HoodieAvroParquetConfig = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble) + val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble) // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java index 6f7940d04..06631dc53 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java @@ -47,11 +47,11 @@ public class HoodieAvroParquetWriter @SuppressWarnings({"unchecked", "rawtypes"}) public HoodieAvroParquetWriter(Path file, - HoodieAvroParquetConfig parquetConfig, + HoodieParquetConfig parquetConfig, String instantTime, TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException { - super(file, (HoodieBaseParquetConfig) parquetConfig); + super(file, (HoodieParquetConfig) parquetConfig); this.fileName = file.getName(); this.writeSupport = parquetConfig.getWriteSupport(); this.instantTime = instantTime; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java index b4aa6de1b..e38b41d42 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java @@ -43,7 +43,7 @@ public abstract class HoodieBaseParquetWriter extends ParquetWriter { private long lastCachedDataSize = -1; public HoodieBaseParquetWriter(Path file, - HoodieBaseParquetConfig> parquetConfig) throws IOException { + HoodieParquetConfig> parquetConfig) throws IOException { super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index ffdff2573..9ee8571eb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -77,7 +77,7 @@ public class HoodieFileWriterFactory { Option filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty(); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(conf).convert(schema), schema, filter); - HoodieAvroParquetConfig parquetConfig = new HoodieAvroParquetConfig(writeSupport, config.getParquetCompressionCodec(), + HoodieParquetConfig parquetConfig = new HoodieParquetConfig<>(writeSupport, config.getParquetCompressionCodec(), config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(), conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled()); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index 6b847d496..2f00b8277 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -19,6 +19,12 @@ package org.apache.hudi.testutils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -38,18 +44,11 @@ import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieStorageConfig; -import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieAvroParquetWriter; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.io.storage.HoodieOrcWriter; -import org.apache.hudi.io.storage.HoodieAvroParquetWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.orc.CompressionKind; @@ -110,7 +109,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable { if (HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.PARQUET)) { HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( new AvroSchemaConverter().convert(schema), schema, Option.of(filter)); - HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, + HoodieParquetConfig config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue())); try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>( diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java index 8cd8bc89a..98d4a866e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.table.HoodieTable; import org.apache.flink.table.types.logical.RowType; @@ -67,7 +68,7 @@ public class HoodieRowDataFileWriterFactory { HoodieRowDataParquetWriteSupport writeSupport = new HoodieRowDataParquetWriteSupport(table.getHadoopConf(), rowType, filter); return new HoodieRowDataParquetWriter( - path, new HoodieRowDataParquetConfig( + path, new HoodieParquetConfig<>( writeSupport, writeConfig.getParquetCompressionCodec(), writeConfig.getParquetBlockSize(), diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java deleted file mode 100644 index 99b72da22..000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.io.storage.row; - -import org.apache.hudi.io.storage.HoodieBaseParquetConfig; - -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; - -/** - * ParquetConfig for datasource implementation with {@link org.apache.flink.table.data.RowData}. - */ -public class HoodieRowDataParquetConfig extends HoodieBaseParquetConfig { - - public HoodieRowDataParquetConfig(HoodieRowDataParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName, - int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, - double compressionRatio) { - super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio); - } -} \ No newline at end of file diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java index 373e6b1f5..7b2a87512 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.flink.table.data.RowData; import org.apache.hadoop.fs.Path; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; @@ -39,7 +40,7 @@ public class HoodieRowDataParquetWriter extends ParquetWriter private final long maxFileSize; private final HoodieRowDataParquetWriteSupport writeSupport; - public HoodieRowDataParquetWriter(Path file, HoodieRowDataParquetConfig parquetConfig) + public HoodieRowDataParquetWriter(Path file, HoodieParquetConfig parquetConfig) throws IOException { super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(), diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java index 8dd19d888..eb408f81c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.fs.Path; @@ -68,14 +69,17 @@ public class HoodieInternalRowFileWriterFactory { HoodieRowParquetWriteSupport writeSupport = new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter, writeConfig); return new HoodieInternalRowParquetWriter( - path, new HoodieRowParquetConfig( + path, + new HoodieParquetConfig<>( writeSupport, writeConfig.getParquetCompressionCodec(), writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), writeSupport.getHadoopConf(), - writeConfig.getParquetCompressionRatio())); + writeConfig.getParquetCompressionRatio(), + writeConfig.parquetDictionaryEnabled() + )); } public static HoodieInternalRowFileWriter getInternalRowFileWriterWithoutMetaFields( @@ -93,13 +97,15 @@ public class HoodieInternalRowFileWriterFactory { HoodieRowParquetWriteSupport writeSupport = new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null, writeConfig); return new HoodieInternalRowParquetWriter( - path, new HoodieRowParquetConfig( + path, new HoodieParquetConfig<>( writeSupport, writeConfig.getParquetCompressionCodec(), writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), writeSupport.getHadoopConf(), - writeConfig.getParquetCompressionRatio())); + writeConfig.getParquetCompressionRatio(), + writeConfig.parquetDictionaryEnabled()) + ); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java index 5a0a60ea0..1d1152935 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java @@ -19,6 +19,7 @@ package org.apache.hudi.io.storage.row; import org.apache.hadoop.fs.Path; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieBaseParquetWriter; import org.apache.spark.sql.catalyst.InternalRow; @@ -32,7 +33,7 @@ public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter parquetConfig) throws IOException { super(file, parquetConfig); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java deleted file mode 100644 index ac187dcdd..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.io.storage.row; - -import org.apache.hudi.io.storage.HoodieBaseParquetConfig; - -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; - -/** - * ParquetConfig for datasource implementation with {@link org.apache.hudi.client.model.HoodieInternalRow}. - */ -public class HoodieRowParquetConfig extends HoodieBaseParquetConfig { - - public HoodieRowParquetConfig(HoodieRowParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName, - int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, - double compressionRatio) { - super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio); - } -} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java index dd509a86b..d6c060c6b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.SparkDatasetTestUtils; @@ -73,9 +74,9 @@ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness // init write support and parquet config HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled); HoodieWriteConfig cfg = writeConfigBuilder.build(); - HoodieRowParquetConfig parquetConfig = new HoodieRowParquetConfig(writeSupport, + HoodieParquetConfig parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio()); + writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled()); // prepare path String fileId = UUID.randomUUID().toString(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index 5e7bef90a..afb448f84 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -18,21 +18,20 @@ package org.apache.hudi.common.table.log.block; -import org.apache.hudi.avro.HoodieAvroWriteSupport; -import org.apache.hudi.common.fs.inline.InLineFSUtils; -import org.apache.hudi.common.fs.inline.InLineFileSystem; -import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetReaderIterator; -import org.apache.hudi.io.storage.HoodieAvroParquetConfig; -import org.apache.hudi.io.storage.HoodieParquetStreamWriter; - import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetReaderIterator; +import org.apache.hudi.io.storage.HoodieParquetConfig; +import org.apache.hudi.io.storage.HoodieParquetStreamWriter; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroSchemaConverter; @@ -43,7 +42,6 @@ import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.InputFile; import javax.annotation.Nonnull; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; @@ -97,8 +95,8 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( new AvroSchemaConverter().convert(writerSchema), writerSchema, Option.empty()); - HoodieAvroParquetConfig avroParquetConfig = - new HoodieAvroParquetConfig( + HoodieParquetConfig avroParquetConfig = + new HoodieParquetConfig<>( writeSupport, compressionCodecName.get(), ParquetWriter.DEFAULT_BLOCK_SIZE, diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java deleted file mode 100644 index 1a10e6a71..000000000 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.io.storage; - -import org.apache.hudi.avro.HoodieAvroWriteSupport; - -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; - -/** - * ParquetConfig for writing avro records in Parquet files. - */ -public class HoodieAvroParquetConfig extends HoodieBaseParquetConfig { - - public HoodieAvroParquetConfig(HoodieAvroWriteSupport writeSupport, CompressionCodecName compressionCodecName, - int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, - double compressionRatio) { - super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio); - } - - public HoodieAvroParquetConfig(HoodieAvroWriteSupport writeSupport, CompressionCodecName compressionCodecName, - int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, - double compressionRatio, boolean directoryEnabled) { - super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, directoryEnabled); - } -} diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java similarity index 82% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java rename to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java index 6db1de012..77fea6bee 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java @@ -25,7 +25,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; * Base ParquetConfig to hold config params for writing to Parquet. * @param */ -public class HoodieBaseParquetConfig { +public class HoodieParquetConfig { private final T writeSupport; private final CompressionCodecName compressionCodecName; private final int blockSize; @@ -35,13 +35,13 @@ public class HoodieBaseParquetConfig { private final double compressionRatio; private final boolean dictionaryEnabled; - public HoodieBaseParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, - int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio) { + public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, + int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio) { this(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, false); } - public HoodieBaseParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, - int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) { + public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, + int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) { this.writeSupport = writeSupport; this.compressionCodecName = compressionCodecName; this.blockSize = blockSize; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java index a27360182..c8f78c350 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java @@ -38,7 +38,7 @@ public class HoodieParquetStreamWriter implements AutoC private final HoodieAvroWriteSupport writeSupport; public HoodieParquetStreamWriter(FSDataOutputStream outputStream, - HoodieAvroParquetConfig parquetConfig) throws IOException { + HoodieParquetConfig parquetConfig) throws IOException { this.writeSupport = parquetConfig.getWriteSupport(); this.writer = new Builder(new OutputStreamBackedOutputFile(outputStream), writeSupport) .withWriteMode(ParquetFileWriter.Mode.CREATE)