[HUDI-2392] Make flink parquet reader compatible with decimal BINARY encoding (#4057)
This commit is contained in:
@@ -365,7 +365,9 @@ public class ParquetSplitReaderUtil {
|
|||||||
"TIME_MICROS original type is not ");
|
"TIME_MICROS original type is not ");
|
||||||
return new HeapTimestampVector(batchSize);
|
return new HeapTimestampVector(batchSize);
|
||||||
case DECIMAL:
|
case DECIMAL:
|
||||||
checkArgument(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|
checkArgument(
|
||||||
|
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|
||||||
|
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
|
||||||
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
|
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
|
||||||
"Unexpected type: %s", typeName);
|
"Unexpected type: %s", typeName);
|
||||||
return new HeapBytesVector(batchSize);
|
return new HeapBytesVector(batchSize);
|
||||||
|
|||||||
@@ -985,8 +985,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
|
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
void testWriteReadDecimals() {
|
@ValueSource(strings = {"bulk_insert", "upsert"})
|
||||||
|
void testWriteReadDecimals(String operation) {
|
||||||
TableEnvironment tableEnv = batchTableEnv;
|
TableEnvironment tableEnv = batchTableEnv;
|
||||||
String createTable = sql("decimals")
|
String createTable = sql("decimals")
|
||||||
.field("f0 decimal(3, 2)")
|
.field("f0 decimal(3, 2)")
|
||||||
@@ -994,7 +995,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
.field("f2 decimal(20, 2)")
|
.field("f2 decimal(20, 2)")
|
||||||
.field("f3 decimal(38, 18)")
|
.field("f3 decimal(38, 18)")
|
||||||
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||||
.option(FlinkOptions.OPERATION, "bulk_insert")
|
.option(FlinkOptions.OPERATION, operation)
|
||||||
.option(FlinkOptions.PRECOMBINE_FIELD, "f1")
|
.option(FlinkOptions.PRECOMBINE_FIELD, "f1")
|
||||||
.pkField("f0")
|
.pkField("f0")
|
||||||
.noPartition()
|
.noPartition()
|
||||||
|
|||||||
Reference in New Issue
Block a user