1
0

[HUDI-2526] Make spark.sql.parquet.writeLegacyFormat configurable (#3917)

This commit is contained in:
Sagar Sumit
2021-11-05 22:33:41 +05:30
committed by GitHub
parent 844346c3ab
commit 08c35a55b3
5 changed files with 51 additions and 16 deletions

View File

@@ -66,7 +66,7 @@ public class HoodieInternalRowFileWriterFactory {
writeConfig.getDynamicBloomFilterMaxNumEntries(),
writeConfig.getBloomFilterType());
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter);
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter, writeConfig);
return new HoodieInternalRowParquetWriter(
path, new HoodieRowParquetConfig(
writeSupport,
@@ -91,7 +91,7 @@ public class HoodieInternalRowFileWriterFactory {
Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table)
throws IOException {
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null);
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null, writeConfig);
return new HoodieInternalRowParquetWriter(
path, new HoodieRowParquetConfig(
writeSupport,

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage.row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
import org.apache.spark.sql.types.StructType;
@@ -42,11 +43,11 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
private String minRecordKey;
private String maxRecordKey;
public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter) {
public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter, HoodieWriteConfig writeConfig) {
super();
Configuration hadoopConf = new Configuration(conf);
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", "false");
hadoopConf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS");
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
this.hadoopConf = hadoopConf;
setSchema(structType, hadoopConf);
this.bloomFilter = bloomFilter;

View File

@@ -18,14 +18,14 @@
package org.apache.hudi.io.storage.row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.sql.Dataset;
@@ -33,7 +33,8 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.List;
import java.util.Random;
@@ -62,12 +63,14 @@ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness
cleanupResources();
}
@Test
public void endToEndTest() throws Exception {
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build();
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void endToEndTest(boolean parquetWriteLegacyFormatEnabled) throws Exception {
HoodieWriteConfig.Builder writeConfigBuilder = SparkDatasetTestUtils.getConfigBuilder(basePath);
for (int i = 0; i < 5; i++) {
// init write support and parquet config
HoodieRowParquetWriteSupport writeSupport = getWriteSupport(cfg, hadoopConf);
HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled);
HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieRowParquetConfig parquetConfig = new HoodieRowParquetConfig(writeSupport,
CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio());
@@ -101,12 +104,14 @@ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness
}
}
private HoodieRowParquetWriteSupport getWriteSupport(HoodieWriteConfig writeConfig, Configuration hadoopConf) {
private HoodieRowParquetWriteSupport getWriteSupport(HoodieWriteConfig.Builder writeConfigBuilder, Configuration hadoopConf, boolean parquetWriteLegacyFormatEnabled) {
writeConfigBuilder.withStorageConfig(HoodieStorageConfig.newBuilder().parquetWriteLegacyFormat(String.valueOf(parquetWriteLegacyFormatEnabled)).build());
HoodieWriteConfig writeConfig = writeConfigBuilder.build();
BloomFilter filter = BloomFilterFactory.createBloomFilter(
writeConfig.getBloomFilterNumEntries(),
writeConfig.getBloomFilterFPP(),
writeConfig.getDynamicBloomFilterMaxNumEntries(),
writeConfig.getBloomFilterType());
return new HoodieRowParquetWriteSupport(hadoopConf, SparkDatasetTestUtils.STRUCT_TYPE, filter);
return new HoodieRowParquetWriteSupport(hadoopConf, SparkDatasetTestUtils.STRUCT_TYPE, filter, writeConfig);
}
}