1
0

[HUDI-2430] Make decimal compatible with hudi for flink writer (#3658)

This commit is contained in:
Danny Chan
2021-09-15 12:04:46 +08:00
committed by GitHub
parent d90fd1f68c
commit 627f20f9c5
5 changed files with 84 additions and 78 deletions

View File

@@ -564,7 +564,7 @@ public class ParquetSchemaConverter {
int scale = ((DecimalType) type).getScale();
int numBytes = computeMinBytesForDecimalPrecision(precision);
return Types.primitive(
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.precision(precision)
.scale(scale)
.length(numBytes)

View File

@@ -19,12 +19,9 @@
package org.apache.hudi.table.format.cow;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.DecimalDataUtils;
import org.apache.flink.table.data.vector.BytesColumnVector;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.DecimalColumnVector;
import org.apache.flink.table.data.vector.IntColumnVector;
import org.apache.flink.table.data.vector.LongColumnVector;
/**
* Parquet write decimal as int32 and int64 and binary, this class wrap the real vector to
@@ -43,22 +40,10 @@ public class ParquetDecimalVector implements DecimalColumnVector {
@Override
public DecimalData getDecimal(int i, int precision, int scale) {
if (DecimalDataUtils.is32BitDecimal(precision)) {
return DecimalData.fromUnscaledLong(
((IntColumnVector) vector).getInt(i),
precision,
scale);
} else if (DecimalDataUtils.is64BitDecimal(precision)) {
return DecimalData.fromUnscaledLong(
((LongColumnVector) vector).getLong(i),
precision,
scale);
} else {
return DecimalData.fromUnscaledBytes(
((BytesColumnVector) vector).getBytes(i).getBytes(),
precision,
scale);
}
return DecimalData.fromUnscaledBytes(
((BytesColumnVector) vector).getBytes(i).getBytes(),
precision,
scale);
}
@Override

View File

@@ -31,7 +31,6 @@ import org.apache.flink.formats.parquet.vector.reader.LongColumnReader;
import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader;
import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.DecimalDataUtils;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.VectorizedColumnBatch;
@@ -46,7 +45,6 @@ import org.apache.flink.table.data.vector.heap.HeapShortVector;
import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -197,23 +195,10 @@ public class ParquetSplitReaderUtil {
DecimalData decimal = value == null
? null
: Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, precision, scale));
ColumnVector internalVector;
if (DecimalDataUtils.is32BitDecimal(precision)) {
internalVector = createVectorFromConstant(
new IntType(),
decimal == null ? null : (int) decimal.toUnscaledLong(),
batchSize);
} else if (DecimalDataUtils.is64BitDecimal(precision)) {
internalVector = createVectorFromConstant(
new BigIntType(),
decimal == null ? null : decimal.toUnscaledLong(),
batchSize);
} else {
internalVector = createVectorFromConstant(
new VarBinaryType(),
decimal == null ? null : decimal.toUnscaledBytes(),
batchSize);
}
ColumnVector internalVector = createVectorFromConstant(
new VarBinaryType(),
decimal == null ? null : decimal.toUnscaledBytes(),
batchSize);
return new ParquetDecimalVector(internalVector);
case FLOAT:
HeapFloatVector fv = new HeapFloatVector(batchSize);
@@ -365,29 +350,10 @@ public class ParquetSplitReaderUtil {
"TIME_MICROS original type is not ");
return new HeapTimestampVector(batchSize);
case DECIMAL:
DecimalType decimalType = (DecimalType) fieldType;
if (DecimalDataUtils.is32BitDecimal(decimalType.getPrecision())) {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.INT32)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapIntVector(batchSize);
} else if (DecimalDataUtils.is64BitDecimal(decimalType.getPrecision())) {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.INT64)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapLongVector(batchSize);
} else {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapBytesVector(batchSize);
}
checkArgument(typeName == PrimitiveType.PrimitiveTypeName.BINARY
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapBytesVector(batchSize);
default:
throw new UnsupportedOperationException(fieldType + " is not supported now.");
}

View File

@@ -349,7 +349,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false)
// generate compaction plan for each commit
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
.withPartition(false)
.noPartition()
.end();
streamTableEnv.executeSql(hoodieTableDDL);
@@ -399,7 +399,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_NAME, tableType.name())
.withPartition(false)
.noPartition()
.end();
tableEnv.executeSql(hoodieTableDDL);
@@ -563,7 +563,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, tableType)
.withPartition(false)
.noPartition()
.end();
tableEnv.executeSql(hoodieTableDDL);
@@ -770,7 +770,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "bulk_insert")
.withPartition(false)
.noPartition()
.end();
tableEnv.executeSql(hoodieTableDDL);
@@ -854,6 +854,31 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}
@Test
void testWriteReadDecimals() {
TableEnvironment tableEnv = batchTableEnv;
String createTable = sql("decimals")
.field("f0 decimal(3, 2)")
.field("f1 decimal(10, 2)")
.field("f2 decimal(20, 2)")
.field("f3 decimal(38, 18)")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "bulk_insert")
.option(FlinkOptions.PRECOMBINE_FIELD, "f1")
.pkField("f0")
.noPartition()
.end();
tableEnv.executeSql(createTable);
String insertInto = "insert into decimals values\n"
+ "(1.23, 12345678.12, 12345.12, 123456789.12345)";
execInsertSql(tableEnv, insertInto);
List<Row> result1 = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from decimals").execute().collect());
assertRowsEquals(result1, "[+I[1.23, 12345678.12, 12345.12, 123456789.123450000000000000]]");
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

View File

@@ -32,9 +32,12 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Configurations for the test.
@@ -57,6 +60,9 @@ public class TestConfigurations {
.fields(ROW_TYPE.getFieldNames(), ROW_DATA_TYPE.getChildren())
.build();
private static final List<String> FIELDS = ROW_TYPE.getFields().stream()
.map(RowType.RowField::asSummaryString).collect(Collectors.toList());
public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
return getCreateHoodieTableDDL(tableName, options, true, "partition");
}
@@ -66,15 +72,23 @@ public class TestConfigurations {
Map<String, String> options,
boolean havePartition,
String partitionField) {
return getCreateHoodieTableDDL(tableName, FIELDS, options, havePartition, "uuid", partitionField);
}
public static String getCreateHoodieTableDDL(
String tableName,
List<String> fields,
Map<String, String> options,
boolean havePartition,
String pkField,
String partitionField) {
StringBuilder builder = new StringBuilder();
builder.append("create table " + tableName + "(\n"
+ " uuid varchar(20),\n"
+ " name varchar(10),\n"
+ " age int,\n"
+ " ts timestamp(3),\n"
+ " `partition` varchar(20),\n"
+ " PRIMARY KEY(uuid) NOT ENFORCED\n"
+ ")\n");
builder.append("create table ").append(tableName).append("(\n");
for (String field : fields) {
builder.append(" ").append(field).append(",\n");
}
builder.append(" PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n")
.append(")\n");
if (havePartition) {
builder.append("PARTITIONED BY (`").append(partitionField).append("`)\n");
}
@@ -205,8 +219,10 @@ public class TestConfigurations {
*/
public static class Sql {
private final Map<String, String> options;
private String tableName;
private final String tableName;
private List<String> fields = new ArrayList<>();
private boolean withPartition = true;
private String pkField = "uuid";
private String partitionField = "partition";
public Sql(String tableName) {
@@ -219,8 +235,13 @@ public class TestConfigurations {
return this;
}
public Sql withPartition(boolean withPartition) {
this.withPartition = withPartition;
public Sql noPartition() {
this.withPartition = false;
return this;
}
public Sql pkField(String pkField) {
this.pkField = pkField;
return this;
}
@@ -229,8 +250,17 @@ public class TestConfigurations {
return this;
}
public Sql field(String fieldSchema) {
fields.add(fieldSchema);
return this;
}
public String end() {
return TestConfigurations.getCreateHoodieTableDDL(this.tableName, options, this.withPartition, this.partitionField);
if (this.fields.size() == 0) {
this.fields = FIELDS;
}
return TestConfigurations.getCreateHoodieTableDDL(this.tableName, this.fields, options,
this.withPartition, this.pkField, this.partitionField);
}
}
}