1
0

[HUDI-4319] Fixed Parquet's PLAIN_DICTIONARY encoding not being applied when bulk-inserting (#5966)

* Fixed Dictionary encoding config not being properly propagated to Parquet writer (making it unable to apply it, substantially bloating the storage footprint)
This commit is contained in:
Alexey Kudinkin
2022-06-24 20:52:28 -07:00
committed by GitHub
parent 360df576a9
commit c86edfc28e
16 changed files with 50 additions and 158 deletions

View File

@@ -47,11 +47,11 @@ public class HoodieAvroParquetWriter<R extends IndexedRecord>
@SuppressWarnings({"unchecked", "rawtypes"})
public HoodieAvroParquetWriter(Path file,
HoodieAvroParquetConfig parquetConfig,
HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig,
String instantTime,
TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
super(file, (HoodieBaseParquetConfig) parquetConfig);
super(file, (HoodieParquetConfig) parquetConfig);
this.fileName = file.getName();
this.writeSupport = parquetConfig.getWriteSupport();
this.instantTime = instantTime;

View File

@@ -43,7 +43,7 @@ public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
private long lastCachedDataSize = -1;
public HoodieBaseParquetWriter(Path file,
HoodieBaseParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE,
parquetConfig.getWriteSupport(),

View File

@@ -77,7 +77,7 @@ public class HoodieFileWriterFactory {
Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty();
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(conf).convert(schema), schema, filter);
HoodieAvroParquetConfig parquetConfig = new HoodieAvroParquetConfig(writeSupport, config.getParquetCompressionCodec(),
HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport, config.getParquetCompressionCodec(),
config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());

View File

@@ -19,6 +19,12 @@
package org.apache.hudi.testutils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
@@ -38,18 +44,11 @@ import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.CompressionKind;
@@ -110,7 +109,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable {
if (HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.PARQUET)) {
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(schema), schema, Option.of(filter));
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
HoodieParquetConfig<HoodieAvroWriteSupport> config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()));
try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>(

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.flink.table.types.logical.RowType;
@@ -67,7 +68,7 @@ public class HoodieRowDataFileWriterFactory {
HoodieRowDataParquetWriteSupport writeSupport =
new HoodieRowDataParquetWriteSupport(table.getHadoopConf(), rowType, filter);
return new HoodieRowDataParquetWriter(
path, new HoodieRowDataParquetConfig(
path, new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),

View File

@@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage.row;
import org.apache.hudi.io.storage.HoodieBaseParquetConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
/**
* ParquetConfig for datasource implementation with {@link org.apache.flink.table.data.RowData}.
*/
public class HoodieRowDataParquetConfig extends HoodieBaseParquetConfig<HoodieRowDataParquetWriteSupport> {
public HoodieRowDataParquetConfig(HoodieRowDataParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName,
int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf,
double compressionRatio) {
super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio);
}
}

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -39,7 +40,7 @@ public class HoodieRowDataParquetWriter extends ParquetWriter<RowData>
private final long maxFileSize;
private final HoodieRowDataParquetWriteSupport writeSupport;
public HoodieRowDataParquetWriter(Path file, HoodieRowDataParquetConfig parquetConfig)
public HoodieRowDataParquetWriter(Path file, HoodieParquetConfig<HoodieRowDataParquetWriteSupport> parquetConfig)
throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.Path;
@@ -68,14 +69,17 @@ public class HoodieInternalRowFileWriterFactory {
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter, writeConfig);
return new HoodieInternalRowParquetWriter(
path, new HoodieRowParquetConfig(
path,
new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
writeSupport.getHadoopConf(),
writeConfig.getParquetCompressionRatio()));
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled()
));
}
public static HoodieInternalRowFileWriter getInternalRowFileWriterWithoutMetaFields(
@@ -93,13 +97,15 @@ public class HoodieInternalRowFileWriterFactory {
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null, writeConfig);
return new HoodieInternalRowParquetWriter(
path, new HoodieRowParquetConfig(
path, new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
writeSupport.getHadoopConf(),
writeConfig.getParquetCompressionRatio()));
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled())
);
}
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.io.storage.row;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -32,7 +33,7 @@ public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter<Inte
private final HoodieRowParquetWriteSupport writeSupport;
public HoodieInternalRowParquetWriter(Path file, HoodieRowParquetConfig parquetConfig)
public HoodieInternalRowParquetWriter(Path file, HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig)
throws IOException {
super(file, parquetConfig);

View File

@@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage.row;
import org.apache.hudi.io.storage.HoodieBaseParquetConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
/**
* ParquetConfig for datasource implementation with {@link org.apache.hudi.client.model.HoodieInternalRow}.
*/
public class HoodieRowParquetConfig extends HoodieBaseParquetConfig<HoodieRowParquetWriteSupport> {
public HoodieRowParquetConfig(HoodieRowParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName,
int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf,
double compressionRatio) {
super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio);
}
}

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
@@ -73,9 +74,9 @@ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness
// init write support and parquet config
HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled);
HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieRowParquetConfig parquetConfig = new HoodieRowParquetConfig(writeSupport,
HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport,
CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio());
writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled());
// prepare path
String fileId = UUID.randomUUID().toString();