diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java index d7c2a20a6..22118da47 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java @@ -115,6 +115,18 @@ public class HoodieStorageConfig extends HoodieConfig { .defaultValue(true) .withDocumentation("Whether to use dictionary encoding"); + public static final ConfigProperty PARQUET_WRITE_LEGACY_FORMAT_ENABLED = ConfigProperty + .key("hoodie.parquet.writeLegacyFormat.enabled") + .defaultValue("false") + .withDocumentation("Sets spark.sql.parquet.writeLegacyFormat. If true, data will be written in a way of Spark 1.4 and earlier. " + + "For example, decimal values will be written in Parquet's fixed-length byte array format which other systems such as Apache Hive and Apache Impala use. " + + "If false, the newer format in Parquet will be used. For example, decimals will be written in int-based format."); + + public static final ConfigProperty PARQUET_OUTPUT_TIMESTAMP_TYPE = ConfigProperty + .key("hoodie.parquet.outputTimestampType") + .defaultValue("TIMESTAMP_MILLIS") + .withDocumentation("Sets spark.sql.parquet.outputTimestampType. Parquet timestamp type to use when Spark writes data to Parquet files."); + public static final ConfigProperty HFILE_COMPRESSION_ALGORITHM_NAME = ConfigProperty .key("hoodie.hfile.compression.algorithm") .defaultValue("GZ") @@ -312,6 +324,16 @@ public class HoodieStorageConfig extends HoodieConfig { return this; } + public Builder parquetWriteLegacyFormat(String parquetWriteLegacyFormat) { + storageConfig.setValue(PARQUET_WRITE_LEGACY_FORMAT_ENABLED, parquetWriteLegacyFormat); + return this; + } + + public Builder parquetOutputTimestampType(String parquetOutputTimestampType) { + storageConfig.setValue(PARQUET_OUTPUT_TIMESTAMP_TYPE, parquetOutputTimestampType); + return this; + } + public Builder hfileCompressionAlgorithm(String hfileCompressionAlgorithm) { storageConfig.setValue(HFILE_COMPRESSION_ALGORITHM_NAME, hfileCompressionAlgorithm); return this; @@ -347,5 +369,4 @@ public class HoodieStorageConfig extends HoodieConfig { return storageConfig; } } - } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index aeb77db18..736fe3b47 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1439,6 +1439,14 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED); } + public String parquetWriteLegacyFormatEnabled() { + return getString(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED); + } + + public String parquetOutputTimestampType() { + return getString(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE); + } + public long getLogFileMaxSize() { return getLong(HoodieStorageConfig.LOGFILE_MAX_SIZE); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java index 774d7c0b7..8dd19d888 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java @@ -66,7 +66,7 @@ public class HoodieInternalRowFileWriterFactory { writeConfig.getDynamicBloomFilterMaxNumEntries(), writeConfig.getBloomFilterType()); HoodieRowParquetWriteSupport writeSupport = - new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter); + new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter, writeConfig); return new HoodieInternalRowParquetWriter( path, new HoodieRowParquetConfig( writeSupport, @@ -91,7 +91,7 @@ public class HoodieInternalRowFileWriterFactory { Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table) throws IOException { HoodieRowParquetWriteSupport writeSupport = - new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null); + new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null, writeConfig); return new HoodieInternalRowParquetWriter( path, new HoodieRowParquetConfig( writeSupport, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java index 83ec192e5..f7fe50776 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java @@ -21,6 +21,7 @@ package org.apache.hudi.io.storage.row; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; import org.apache.spark.sql.types.StructType; @@ -42,11 +43,11 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport { private String minRecordKey; private String maxRecordKey; - public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter) { + public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter, HoodieWriteConfig writeConfig) { super(); Configuration hadoopConf = new Configuration(conf); - hadoopConf.set("spark.sql.parquet.writeLegacyFormat", "false"); - hadoopConf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS"); + hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled()); + hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType()); this.hadoopConf = hadoopConf; setSchema(structType, hadoopConf); this.bloomFilter = bloomFilter; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java index fafa3fad1..de5555543 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java @@ -18,14 +18,14 @@ package org.apache.hudi.io.storage.row; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.testutils.HoodieClientTestHarness; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hudi.testutils.SparkDatasetTestUtils; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.spark.sql.Dataset; @@ -33,7 +33,8 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.List; import java.util.Random; @@ -62,12 +63,14 @@ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness cleanupResources(); } - @Test - public void endToEndTest() throws Exception { - HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void endToEndTest(boolean parquetWriteLegacyFormatEnabled) throws Exception { + HoodieWriteConfig.Builder writeConfigBuilder = SparkDatasetTestUtils.getConfigBuilder(basePath); for (int i = 0; i < 5; i++) { // init write support and parquet config - HoodieRowParquetWriteSupport writeSupport = getWriteSupport(cfg, hadoopConf); + HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled); + HoodieWriteConfig cfg = writeConfigBuilder.build(); HoodieRowParquetConfig parquetConfig = new HoodieRowParquetConfig(writeSupport, CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(), writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio()); @@ -101,12 +104,14 @@ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness } } - private HoodieRowParquetWriteSupport getWriteSupport(HoodieWriteConfig writeConfig, Configuration hadoopConf) { + private HoodieRowParquetWriteSupport getWriteSupport(HoodieWriteConfig.Builder writeConfigBuilder, Configuration hadoopConf, boolean parquetWriteLegacyFormatEnabled) { + writeConfigBuilder.withStorageConfig(HoodieStorageConfig.newBuilder().parquetWriteLegacyFormat(String.valueOf(parquetWriteLegacyFormatEnabled)).build()); + HoodieWriteConfig writeConfig = writeConfigBuilder.build(); BloomFilter filter = BloomFilterFactory.createBloomFilter( writeConfig.getBloomFilterNumEntries(), writeConfig.getBloomFilterFPP(), writeConfig.getDynamicBloomFilterMaxNumEntries(), writeConfig.getBloomFilterType()); - return new HoodieRowParquetWriteSupport(hadoopConf, SparkDatasetTestUtils.STRUCT_TYPE, filter); + return new HoodieRowParquetWriteSupport(hadoopConf, SparkDatasetTestUtils.STRUCT_TYPE, filter, writeConfig); } }