diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java index 1d54898..80334a5 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java @@ -10,7 +10,6 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.function.BiFunction; import java.util.regex.Pattern; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -37,9 +36,6 @@ public class TypeConverterV2 implements TypeConverter { public static final Schema FLOAT_SCHEMA = create(Type.FLOAT); public static final Schema DOUBLE_SCHEMA = create(Type.DOUBLE); public static final Schema STRING_SCHEMA = create(Type.STRING); - public static final BiFunction FIXED_SCHEMA = (length, scala) -> createFixed(StrUtil.format("decimal_{}_{}", length, scala), null, null, length); - public static final BiFunction DECIMAL_SCHEMA = (length, scala) -> LogicalTypes.decimal(length, scala).addToSchema(FIXED_SCHEMA.apply(length, scala)); - public static final BiFunction NULLABLE_DECIMAL_SCHEMA = (length, scala) -> createUnion(NULL_SCHEMA, DECIMAL_SCHEMA.apply(length, scala)); public static final Schema NULLABLE_BOOLEAN_SCHEMA = createUnion(NULL_SCHEMA, BOOLEAN_SCHEMA); public static final Schema NULLABLE_INT_SCHEMA = createUnion(NULL_SCHEMA, INT_SCHEMA); public static final Schema NULLABLE_LONG_SCHEMA = createUnion(NULL_SCHEMA, LONG_SCHEMA); @@ -60,6 +56,18 @@ public class TypeConverterV2 implements TypeConverter { private static final Pattern YYYYMMDD = Pattern.compile("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}"); private static final DateTimeFormatter YYYYMMDD_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + public static Schema FIXED_SCHEMA(String name, int length, int scala) { + return createFixed(StrUtil.format("{}_decimal_{}_{}", name.toLowerCase(), length, scala), null, null, length); + } + + public static Schema DECIMAL_SCHEMA(String name, int length, int scala) { + return LogicalTypes.decimal(length, scala).addToSchema(FIXED_SCHEMA(name, length, scala)); + } + + public static Schema NULLABLE_DECIMAL_SCHEMA(String name, int length, int scala) { + return createUnion(NULL_SCHEMA, DECIMAL_SCHEMA(name, length, scala)); + } + @Override public Schema convertToSchema(TableMeta meta) { List fields = new ArrayList<>(meta.getFields().size() + 4); @@ -67,7 +75,7 @@ public class TypeConverterV2 implements TypeConverter { fields.add(new Field(Constants.UNION_KEY_NAME, STRING_SCHEMA, null, "")); for (TableMeta.FieldMeta field : meta.getFields()) { - fields.add(new Field(field.getName(), convertType(meta.getAlias(), field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE)); + fields.add(new Field(field.getName(), convertType(meta.getAlias(), field.getName(), field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE)); } fields.add(new Field(Constants.UPDATE_TIMESTAMP_KEY_NAME, LONG_SCHEMA, null, -1)); @@ -75,7 +83,7 @@ public class TypeConverterV2 implements TypeConverter { return Schema.createRecord(meta.getTable(), null, null, false, fields); } - private Schema convertType(String table, String type, Long length, Integer scala) { + private Schema convertType(String table, String field, String type, Long length, Integer scala) { type = type.trim().toLowerCase(); if (BOOLEAN_REGEX.matcher(type).matches()) { return NULLABLE_BOOLEAN_SCHEMA; @@ -90,12 +98,12 @@ public class TypeConverterV2 implements TypeConverter { } else if (DOUBLE_REGEX.matcher(type).matches()) { return NULLABLE_DOUBLE_SCHEMA; } else if (FIXED_REGEX.matcher(type).matches()) { - return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), 0); + return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), 0); } else if (DECIMAL_REGEX.matcher(type).matches() || NUMERIC_REGEX.matcher(type).matches()) { if (ObjectUtil.isNull(scala)) { - return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), 6); + return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), 6); } else { - return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), scala); + return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), scala); } } else { LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", table, type, length, scala); diff --git a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/TypeConvertTest.java b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/TypeConvertTest.java index d6379e4..492364f 100644 --- a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/TypeConvertTest.java +++ b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/TypeConvertTest.java @@ -21,68 +21,68 @@ import static org.junit.jupiter.api.Assertions.assertNull; public class TypeConvertTest { @Test public void testConvertType() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException, NoSuchFieldException { - Executor convertType = new Executor("convertType", String.class, String.class, Long.class, Integer.class); + Executor convertType = new Executor("convertType", String.class, String.class, String.class, Long.class, Integer.class); // boolean - assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "boolean", 0L, null)); - assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "Boolean", 0L, null)); - assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "BOOLEAN", 0L, null)); - assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "bool", 0L, null)); + assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "test", "boolean", 0L, null)); + assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "test", "Boolean", 0L, null)); + assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "test", "BOOLEAN", 0L, null)); + assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "test", "bool", 0L, null)); // int - assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "tinyint(1)", 0L, null)); - assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "tinyint", 1L, null)); - assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "tinyint", 0L, null)); - assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "int", 0L, null)); - assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "integer", 0L, null)); - assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "smallint", 0L, null)); - assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "smallserial", 0L, null)); - assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "SMALLSERIAL", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "tinyint(1)", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "tinyint", 1L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "tinyint", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "int", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "integer", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "smallint", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "smallserial", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "test", "SMALLSERIAL", 0L, null)); // long - assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "int unsigned", 0L, null)); - assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "bigint", 0L, null)); - assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "bigint unsigned", 0L, null)); - assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "serial", 0L, null)); - assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "long", 0L, null)); - assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "Long", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "int unsigned", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "bigint", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "bigint unsigned", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "serial", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "long", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "test", "Long", 0L, null)); // float - assertEquals(NULLABLE_FLOAT_SCHEMA, convertType.exec("test", "float", 0L, null)); - assertEquals(NULLABLE_FLOAT_SCHEMA, convertType.exec("test", "FLOAT", 0L, null)); + assertEquals(NULLABLE_FLOAT_SCHEMA, convertType.exec("test", "test", "float", 0L, null)); + assertEquals(NULLABLE_FLOAT_SCHEMA, convertType.exec("test", "test", "FLOAT", 0L, null)); // double - assertEquals(NULLABLE_DOUBLE_SCHEMA, convertType.exec("test", "double", 0L, null)); - assertEquals(NULLABLE_DOUBLE_SCHEMA, convertType.exec("test", "Double", 0L, null)); + assertEquals(NULLABLE_DOUBLE_SCHEMA, convertType.exec("test", "test", "double", 0L, null)); + assertEquals(NULLABLE_DOUBLE_SCHEMA, convertType.exec("test", "test", "Double", 0L, null)); // date - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "date", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "datetime", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "timestamp", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "timestamp without time zone", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "time", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "date", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "datetime", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "timestamp", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "timestamp without time zone", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "time", 0L, null)); // string - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "varchar", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "char", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "text", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "longtext", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "mediumtext", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "clob", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "binary", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "bit", 0L, null)); - assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "character varying", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "varchar", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "char", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "text", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "longtext", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "mediumtext", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "clob", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "binary", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "bit", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "test", "character varying", 0L, null)); // decimal - assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(2, 0), convertType.exec("test", "number", 2L, null)); - assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(4, 0), convertType.exec("test", "money", 4L, null)); - assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 0), convertType.exec("test", "bigserial", 6L, null)); - assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 3), convertType.exec("test", "decimal(6,3)", 6L, 3)); - assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 6), convertType.exec("test", "decimal(6)", 6L, null)); - assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 6), convertType.exec("test", "decimal", 6L, null)); - assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 6), convertType.exec("test", "double precision", 6L, null)); - assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(2, 1), convertType.exec("test", "numeric(2,1)", 2L, 1)); - assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(2, 1), convertType.exec("test", "numeric", 2L, 1)); + assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 2, 0), convertType.exec("test", "test", "number", 2L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 4, 0), convertType.exec("test", "test", "money", 4L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 6, 0), convertType.exec("test", "test", "bigserial", 6L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 6, 3), convertType.exec("test", "test", "decimal(6,3)", 6L, 3)); + assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 6, 6), convertType.exec("test", "test", "decimal(6)", 6L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 6, 6), convertType.exec("test", "test", "decimal", 6L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 6, 6), convertType.exec("test", "test", "double precision", 6L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 2, 1), convertType.exec("test", "test", "numeric(2,1)", 2L, 1)); + assertEquals(NULLABLE_DECIMAL_SCHEMA("test", 2, 1), convertType.exec("test", "test", "numeric", 2L, 1)); } @Test @@ -123,8 +123,8 @@ public class TypeConvertTest { assertEquals(StringData.fromString("123.456"), convertValue.exec(NULLABLE_STRING_SCHEMA, "123.456")); // decimal - assertEquals(DecimalData.fromBigDecimal(new BigDecimal("123.0"), 10, 0), convertValue.exec(NULLABLE_DECIMAL_SCHEMA.apply(10, 0), "123.0")); - assertEquals(DecimalData.fromBigDecimal(new BigDecimal("123456789987654321.123456789987654321"), 37, 18), convertValue.exec(NULLABLE_DECIMAL_SCHEMA.apply(37, 18), "123456789987654321.123456789987654321")); + assertEquals(DecimalData.fromBigDecimal(new BigDecimal("123.0"), 10, 0), convertValue.exec(NULLABLE_DECIMAL_SCHEMA("test", 10, 0), "123.0")); + assertEquals(DecimalData.fromBigDecimal(new BigDecimal("123456789987654321.123456789987654321"), 37, 18), convertValue.exec(NULLABLE_DECIMAL_SCHEMA("test", 37, 18), "123456789987654321.123456789987654321")); } private static final class Executor {