diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index d87a2af70..29c1b205b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -365,7 +365,9 @@ public class ParquetSplitReaderUtil { "TIME_MICROS original type is not "); return new HeapTimestampVector(batchSize); case DECIMAL: - checkArgument(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + checkArgument( + (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + || typeName == PrimitiveType.PrimitiveTypeName.BINARY) && primitiveType.getOriginalType() == OriginalType.DECIMAL, "Unexpected type: %s", typeName); return new HeapBytesVector(batchSize); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 7d54a9801..2844261a6 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -985,8 +985,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } - @Test - void testWriteReadDecimals() { + @ParameterizedTest + @ValueSource(strings = {"bulk_insert", "upsert"}) + void testWriteReadDecimals(String operation) { TableEnvironment tableEnv = batchTableEnv; String createTable = sql("decimals") .field("f0 decimal(3, 2)") @@ -994,7 +995,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { .field("f2 decimal(20, 2)") .field("f3 decimal(38, 18)") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.OPERATION, "bulk_insert") + .option(FlinkOptions.OPERATION, operation) .option(FlinkOptions.PRECOMBINE_FIELD, "f1") .pkField("f0") .noPartition()