[HUDI-3096] fixed the bug that the cow table(contains decimalType) write by flink cannot be read by spark. (#4421)
This commit is contained in:
@@ -245,10 +245,13 @@ public class AvroSchemaConverter {
|
|||||||
return nullable ? nullableSchema(time) : time;
|
return nullable ? nullableSchema(time) : time;
|
||||||
case DECIMAL:
|
case DECIMAL:
|
||||||
DecimalType decimalType = (DecimalType) logicalType;
|
DecimalType decimalType = (DecimalType) logicalType;
|
||||||
// store BigDecimal as byte[]
|
// store BigDecimal as Fixed
|
||||||
|
// for spark compatibility.
|
||||||
Schema decimal =
|
Schema decimal =
|
||||||
LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale())
|
LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale())
|
||||||
.addToSchema(SchemaBuilder.builder().bytesType());
|
.addToSchema(SchemaBuilder
|
||||||
|
.fixed(String.format("%s.fixed", rowName))
|
||||||
|
.size(computeMinBytesForDecimlPrecision(decimalType.getPrecision())));
|
||||||
return nullable ? nullableSchema(decimal) : decimal;
|
return nullable ? nullableSchema(decimal) : decimal;
|
||||||
case ROW:
|
case ROW:
|
||||||
RowType rowType = (RowType) logicalType;
|
RowType rowType = (RowType) logicalType;
|
||||||
@@ -324,5 +327,13 @@ public class AvroSchemaConverter {
|
|||||||
? schema
|
? schema
|
||||||
: Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
|
: Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static int computeMinBytesForDecimlPrecision(int precision) {
|
||||||
|
int numBytes = 1;
|
||||||
|
while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
|
||||||
|
numBytes += 1;
|
||||||
|
}
|
||||||
|
return numBytes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.util;
|
package org.apache.hudi.util;
|
||||||
|
|
||||||
|
import org.apache.avro.Conversions;
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
@@ -34,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType;
|
|||||||
import org.apache.flink.table.types.logical.TimestampType;
|
import org.apache.flink.table.types.logical.TimestampType;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.math.BigDecimal;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
@@ -50,6 +52,8 @@ import java.util.Map;
|
|||||||
@Internal
|
@Internal
|
||||||
public class RowDataToAvroConverters {
|
public class RowDataToAvroConverters {
|
||||||
|
|
||||||
|
private static Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion();
|
||||||
|
|
||||||
// --------------------------------------------------------------------------------
|
// --------------------------------------------------------------------------------
|
||||||
// Runtime Converters
|
// Runtime Converters
|
||||||
// --------------------------------------------------------------------------------
|
// --------------------------------------------------------------------------------
|
||||||
@@ -186,7 +190,8 @@ public class RowDataToAvroConverters {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object convert(Schema schema, Object object) {
|
public Object convert(Schema schema, Object object) {
|
||||||
return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
|
BigDecimal javaDecimal = ((DecimalData) object).toBigDecimal();
|
||||||
|
return decimalConversion.toFixed(javaDecimal, schema, schema.getLogicalType());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
break;
|
break;
|
||||||
|
|||||||
Reference in New Issue
Block a user