fix(sync): 修复decimal字段类型重名

This commit is contained in:
v-zhangjc9
2024-03-15 15:10:07 +08:00
parent 9f92f600b1
commit a6845675d1
2 changed files with 65 additions and 57 deletions

View File

@@ -10,7 +10,6 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
@@ -37,9 +36,6 @@ public class TypeConverterV2 implements TypeConverter {
public static final Schema FLOAT_SCHEMA = create(Type.FLOAT);
public static final Schema DOUBLE_SCHEMA = create(Type.DOUBLE);
public static final Schema STRING_SCHEMA = create(Type.STRING);
public static final BiFunction<Integer, Integer, Schema> FIXED_SCHEMA = (length, scala) -> createFixed(StrUtil.format("decimal_{}_{}", length, scala), null, null, length);
public static final BiFunction<Integer, Integer, Schema> DECIMAL_SCHEMA = (length, scala) -> LogicalTypes.decimal(length, scala).addToSchema(FIXED_SCHEMA.apply(length, scala));
public static final BiFunction<Integer, Integer, Schema> NULLABLE_DECIMAL_SCHEMA = (length, scala) -> createUnion(NULL_SCHEMA, DECIMAL_SCHEMA.apply(length, scala));
public static final Schema NULLABLE_BOOLEAN_SCHEMA = createUnion(NULL_SCHEMA, BOOLEAN_SCHEMA);
public static final Schema NULLABLE_INT_SCHEMA = createUnion(NULL_SCHEMA, INT_SCHEMA);
public static final Schema NULLABLE_LONG_SCHEMA = createUnion(NULL_SCHEMA, LONG_SCHEMA);
@@ -60,6 +56,18 @@ public class TypeConverterV2 implements TypeConverter {
private static final Pattern YYYYMMDD = Pattern.compile("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}");
private static final DateTimeFormatter YYYYMMDD_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static Schema FIXED_SCHEMA(String name, int length, int scala) {
return createFixed(StrUtil.format("{}_decimal_{}_{}", name.toLowerCase(), length, scala), null, null, length);
}
public static Schema DECIMAL_SCHEMA(String name, int length, int scala) {
return LogicalTypes.decimal(length, scala).addToSchema(FIXED_SCHEMA(name, length, scala));
}
public static Schema NULLABLE_DECIMAL_SCHEMA(String name, int length, int scala) {
return createUnion(NULL_SCHEMA, DECIMAL_SCHEMA(name, length, scala));
}
@Override
public Schema convertToSchema(TableMeta meta) {
List<Field> fields = new ArrayList<>(meta.getFields().size() + 4);
@@ -67,7 +75,7 @@ public class TypeConverterV2 implements TypeConverter {
fields.add(new Field(Constants.UNION_KEY_NAME, STRING_SCHEMA, null, ""));
for (TableMeta.FieldMeta field : meta.getFields()) {
fields.add(new Field(field.getName(), convertType(meta.getAlias(), field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE));
fields.add(new Field(field.getName(), convertType(meta.getAlias(), field.getName(), field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE));
}
fields.add(new Field(Constants.UPDATE_TIMESTAMP_KEY_NAME, LONG_SCHEMA, null, -1));
@@ -75,7 +83,7 @@ public class TypeConverterV2 implements TypeConverter {
return Schema.createRecord(meta.getTable(), null, null, false, fields);
}
private Schema convertType(String table, String type, Long length, Integer scala) {
private Schema convertType(String table, String field, String type, Long length, Integer scala) {
type = type.trim().toLowerCase();
if (BOOLEAN_REGEX.matcher(type).matches()) {
return NULLABLE_BOOLEAN_SCHEMA;
@@ -90,12 +98,12 @@ public class TypeConverterV2 implements TypeConverter {
} else if (DOUBLE_REGEX.matcher(type).matches()) {
return NULLABLE_DOUBLE_SCHEMA;
} else if (FIXED_REGEX.matcher(type).matches()) {
return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), 0);
return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), 0);
} else if (DECIMAL_REGEX.matcher(type).matches() || NUMERIC_REGEX.matcher(type).matches()) {
if (ObjectUtil.isNull(scala)) {
return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), 6);
return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), 6);
} else {
return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), scala);
return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), scala);
}
} else {
LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", table, type, length, scala);

View File

@@ -21,68 +21,68 @@ import static org.junit.jupiter.api.Assertions.assertNull;
public class TypeConvertTest {
@Test
public void testConvertType() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException, NoSuchFieldException {
Executor convertType = new Executor("convertType", String.class, String.class, Long.class, Integer.class);
Executor convertType = new Executor("convertType", String.class, String.class, String.class, Long.class, Integer.class);
// boolean
assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "boolean", 0L, null));
assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "Boolean", 0L, null));
assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "BOOLEAN", 0L, null));
assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "bool", 0L, null));
assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "test", "boolean", 0L, null));
assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "test", "Boolean", 0L, null));
assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "test", "BOOLEAN", 0L, null));
assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "test", "bool", 0L, null));
// int
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "tinyint(1)", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "tinyint", 1L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "tinyint", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "int", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "integer", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "smallint", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "smallserial", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "SMALLSERIAL", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "tinyint(1)", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "tinyint", 1L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "tinyint", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "int", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "integer", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "smallint", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "smallserial", 0L, null));
assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "SMALLSERIAL", 0L, null));
// long
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "int unsigned", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "bigint", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "bigint unsigned", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "serial", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "long", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "Long", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "int unsigned", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "bigint", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "bigint unsigned", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "serial", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "long", 0L, null));
assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "Long", 0L, null));
// float
assertEquals(NULLABLE_FLOAT_SCHEMA, convertType.exec("test", "float", 0L, null));
assertEquals(NULLABLE_FLOAT_SCHEMA, convertType.exec("test", "FLOAT", 0L, null));
assertEquals(NULLABLE_FLOAT_SCHEMA, convertType.exec("test", "test", "float", 0L, null));
assertEquals(NULLABLE_FLOAT_SCHEMA, convertType.exec("test", "test", "FLOAT", 0L, null));
// double
assertEquals(NULLABLE_DOUBLE_SCHEMA, convertType.exec("test", "double", 0L, null));
assertEquals(NULLABLE_DOUBLE_SCHEMA, convertType.exec("test", "Double", 0L, null));
assertEquals(NULLABLE_DOUBLE_SCHEMA, convertType.exec("test", "test", "double", 0L, null));
assertEquals(NULLABLE_DOUBLE_SCHEMA, convertType.exec("test", "test", "Double", 0L, null));
// date
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "date", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "datetime", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "timestamp", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "timestamp without time zone", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "time", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "date", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "datetime", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "timestamp", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "timestamp without time zone", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "time", 0L, null));
// string
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "varchar", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "char", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "text", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "longtext", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "mediumtext", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "clob", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "binary", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "bit", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "character varying", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "varchar", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "char", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "text", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "longtext", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "mediumtext", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "clob", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "binary", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "bit", 0L, null));
assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "character varying", 0L, null));
// decimal
assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(2, 0), convertType.exec("test", "number", 2L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(4, 0), convertType.exec("test", "money", 4L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 0), convertType.exec("test", "bigserial", 6L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 3), convertType.exec("test", "decimal(6,3)", 6L, 3));
assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 6), convertType.exec("test", "decimal(6)", 6L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 6), convertType.exec("test", "decimal", 6L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 6), convertType.exec("test", "double precision", 6L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(2, 1), convertType.exec("test", "numeric(2,1)", 2L, 1));
assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(2, 1), convertType.exec("test", "numeric", 2L, 1));
assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 2, 0), convertType.exec("test", "test", "number", 2L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 4, 0), convertType.exec("test", "test", "money", 4L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 6, 0), convertType.exec("test", "test", "bigserial", 6L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 6, 3), convertType.exec("test", "test", "decimal(6,3)", 6L, 3));
assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 6, 6), convertType.exec("test", "test", "decimal(6)", 6L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 6, 6), convertType.exec("test", "test", "decimal", 6L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 6, 6), convertType.exec("test", "test", "double precision", 6L, null));
assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 2, 1), convertType.exec("test", "test", "numeric(2,1)", 2L, 1));
assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 2, 1), convertType.exec("test", "test", "numeric", 2L, 1));
}
@Test
@@ -123,8 +123,8 @@ public class TypeConvertTest {
assertEquals(StringData.fromString("123.456"), convertValue.exec(NULLABLE_STRING_SCHEMA, "123.456"));
// decimal
assertEquals(DecimalData.fromBigDecimal(new BigDecimal("123.0"), 10, 0), convertValue.exec(NULLABLE_DECIMAL_SCHEMA.apply(10, 0), "123.0"));
assertEquals(DecimalData.fromBigDecimal(new BigDecimal("123456789987654321.123456789987654321"), 37, 18), convertValue.exec(NULLABLE_DECIMAL_SCHEMA.apply(37, 18), "123456789987654321.123456789987654321"));
assertEquals(DecimalData.fromBigDecimal(new BigDecimal("123.0"), 10, 0), convertValue.exec(NULLABLE_DECIMAL_SCHEMA("test", 10, 0), "123.0"));
assertEquals(DecimalData.fromBigDecimal(new BigDecimal("123456789987654321.123456789987654321"), 37, 18), convertValue.exec(NULLABLE_DECIMAL_SCHEMA("test", 37, 18), "123456789987654321.123456789987654321"));
}
private static final class Executor {