1
0

[HUDI-4320] Make sure HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED could be specified by the writer (#5970)

Fixed sequence determining whether Parquet's legacy-format writing property should be overridden to only kick in when it has not been explicitly specified by the caller
This commit is contained in:
Alexey Kudinkin
2022-06-28 12:27:32 -07:00
committed by GitHub
parent efb9719018
commit ed823f1c6f
5 changed files with 89 additions and 54 deletions

View File

@@ -18,6 +18,9 @@
package org.apache.hudi;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
@@ -45,10 +48,6 @@ import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -327,14 +326,39 @@ public class DataSourceUtils {
return hiveSyncConfig;
}
// Now by default ParquetWriteSupport will write DecimalType to parquet as int32/int64 when the scale of decimalType < Decimal.MAX_LONG_DIGITS(),
// but AvroParquetReader which used by HoodieParquetReader cannot support read int32/int64 as DecimalType.
// try to find current schema whether contains that DecimalType, and auto set the value of "hoodie.parquet.writelegacyformat.enabled"
public static void mayBeOverwriteParquetWriteLegacyFormatProp(Map<String, String> properties, StructType schema) {
if (DataTypeUtils.foundSmallPrecisionDecimalType(schema)
&& !Boolean.parseBoolean(properties.getOrDefault(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), "false"))) {
/**
* Checks whether default value (false) of "hoodie.parquet.writelegacyformat.enabled" should be
* overridden in case:
*
* <ul>
* <li>Property has not been explicitly set by the writer</li>
* <li>Data schema contains {@code DecimalType} that would be affected by it</li>
* </ul>
*
* If both of the aforementioned conditions are true, will override the default value of the config
* (by essentially setting the value) to make sure that the produced Parquet data files could be
* read by {@code AvroParquetReader}
*
* @param properties properties specified by the writer
* @param schema schema of the dataset being written
*/
public static void tryOverrideParquetWriteLegacyFormatProperty(Map<String, String> properties, StructType schema) {
if (DataTypeUtils.hasSmallPrecisionDecimalType(schema)
&& properties.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key()) == null) {
// ParquetWriteSupport writes DecimalType to parquet as INT32/INT64 when the scale of decimalType
// is less than {@code Decimal.MAX_LONG_DIGITS}, but {@code AvroParquetReader} which is used by
// {@code HoodieParquetReader} does not support DecimalType encoded as INT32/INT64 as.
//
// To work this problem around we're checking whether
// - Schema contains any decimals that could be encoded as INT32/INT64
// - {@code HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} has not been explicitly
// set by the writer
//
// If both of these conditions are true, than we override the default value of {@code
// HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} and set it to "true"
LOG.warn("Small Decimal Type found in the persisted schema, reverting default value of 'hoodie.parquet.writelegacyformat.enabled' to true");
properties.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), "true");
LOG.warn("Small Decimal Type found in current schema, auto set the value of hoodie.parquet.writelegacyformat.enabled to true");
}
}
}