diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java index 6325c2bcc..6e87ff1fb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java @@ -245,10 +245,13 @@ public class AvroSchemaConverter { return nullable ? nullableSchema(time) : time; case DECIMAL: DecimalType decimalType = (DecimalType) logicalType; - // store BigDecimal as byte[] + // store BigDecimal as Fixed + // for spark compatibility. Schema decimal = LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale()) - .addToSchema(SchemaBuilder.builder().bytesType()); + .addToSchema(SchemaBuilder + .fixed(String.format("%s.fixed", rowName)) + .size(computeMinBytesForDecimlPrecision(decimalType.getPrecision()))); return nullable ? nullableSchema(decimal) : decimal; case ROW: RowType rowType = (RowType) logicalType; @@ -324,5 +327,13 @@ public class AvroSchemaConverter { ? schema : Schema.createUnion(SchemaBuilder.builder().nullType(), schema); } + + private static int computeMinBytesForDecimlPrecision(int precision) { + int numBytes = 1; + while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) { + numBytes += 1; + } + return numBytes; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java index d90670ff4..446a6d041 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java @@ -18,6 +18,7 @@ package org.apache.hudi.util; +import org.apache.avro.Conversions; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -34,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; import java.io.Serializable; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -50,6 +52,8 @@ import java.util.Map; @Internal public class RowDataToAvroConverters { + private static Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion(); + // -------------------------------------------------------------------------------- // Runtime Converters // -------------------------------------------------------------------------------- @@ -186,7 +190,8 @@ public class RowDataToAvroConverters { @Override public Object convert(Schema schema, Object object) { - return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes()); + BigDecimal javaDecimal = ((DecimalData) object).toBigDecimal(); + return decimalConversion.toFixed(javaDecimal, schema, schema.getLogicalType()); } }; break;