diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java index 8d06309..1d54898 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java @@ -49,7 +49,7 @@ public class TypeConverterV2 implements TypeConverter { private static final Logger logger = LoggerFactory.getLogger(TypeConverterV2.class); private static final Pattern BOOLEAN_REGEX = Pattern.compile("^boolean|bool$"); private static final Pattern INT_REGEX = Pattern.compile("^(tinyint|smallint|int|smallserial|integer)(\\(\\d+\\))?$"); - private static final Pattern LONG_REGEX = Pattern.compile("^(bigint unsigned)|((bigint|serial|long)(\\(\\d+\\))?)$"); + private static final Pattern LONG_REGEX = Pattern.compile("^((int|bigint) unsigned)|((bigint|serial|long)(\\(\\d+\\))?)$"); private static final Pattern DATE_REGEX = Pattern.compile("^date|timestamp|timestamp without time zone|datetime|time$"); private static final Pattern FLOAT_REGEX = Pattern.compile("^float(\\(\\d+\\))?$"); private static final Pattern DOUBLE_REGEX = Pattern.compile("^double(\\(\\d+\\))?$"); @@ -67,7 +67,7 @@ public class TypeConverterV2 implements TypeConverter { fields.add(new Field(Constants.UNION_KEY_NAME, STRING_SCHEMA, null, "")); for (TableMeta.FieldMeta field : meta.getFields()) { - fields.add(new Field(field.getName(), convertType(meta, field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE)); + fields.add(new Field(field.getName(), convertType(meta.getAlias(), field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE)); } fields.add(new Field(Constants.UPDATE_TIMESTAMP_KEY_NAME, LONG_SCHEMA, null, -1)); @@ -75,7 +75,7 @@ public class TypeConverterV2 implements TypeConverter { return Schema.createRecord(meta.getTable(), null, null, false, fields); } - private Schema convertType(TableMeta meta, String type, Long length, Integer scala) { + private Schema convertType(String table, String type, Long length, Integer scala) { type = type.trim().toLowerCase(); if (BOOLEAN_REGEX.matcher(type).matches()) { return NULLABLE_BOOLEAN_SCHEMA; @@ -98,7 +98,7 @@ public class TypeConverterV2 implements TypeConverter { return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), scala); } } else { - LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", meta.getAlias(), type, length, scala); + LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", table, type, length, scala); return NULLABLE_STRING_SCHEMA; } } diff --git a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/TestType.java b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/TestType.java deleted file mode 100644 index fd2bc8c..0000000 --- a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/TestType.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.lanyuanxiaoyao.service.sync; - -import cn.hutool.http.HttpUtil; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.lanyuanxiaoyao.service.common.entity.TableMeta; -import com.lanyuanxiaoyao.service.sync.functions.type.TypeConverterV2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author lanyuanxiaoyao - * @date 2024-03-01 - */ -public class TestType { - private static final Logger logger = LoggerFactory.getLogger(TestType.class); - - public static void main(String[] args) throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - String content = HttpUtil - .createGet("http://b12s7.hdp.dc:15123/info/table_meta/detail?flink_job_id=1749270121369489408&alias=irms_main_pr_type_type") - .basicAuth("AxhEbscwsJDbYMH2", "cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4") - .execute() - .body(); - TableMeta meta = mapper.readValue(content, TableMeta.class); - logger.info("{}", new TypeConverterV2().convertToSchema(meta).toString()); - } -} diff --git a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/TypeConvertTest.java b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/TypeConvertTest.java new file mode 100644 index 0000000..d6379e4 --- /dev/null +++ b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/TypeConvertTest.java @@ -0,0 +1,143 @@ +package com.lanyuanxiaoyao.service.sync; + +import com.lanyuanxiaoyao.service.sync.functions.type.TypeConverterV2; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.math.BigDecimal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.hudi.org.apache.avro.Schema; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static com.lanyuanxiaoyao.service.sync.functions.type.TypeConverterV2.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** + * @author ZhangJiacheng + * @date 2023-07-19 + */ +public class TypeConvertTest { + @Test + public void testConvertType() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException, NoSuchFieldException { + Executor convertType = new Executor("convertType", String.class, String.class, Long.class, Integer.class); + + // boolean + assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "boolean", 0L, null)); + assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "Boolean", 0L, null)); + assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "BOOLEAN", 0L, null)); + assertEquals(NULLABLE_BOOLEAN_SCHEMA, convertType.exec("test", "bool", 0L, null)); + + // int + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "tinyint(1)", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "tinyint", 1L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "tinyint", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "int", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "integer", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "smallint", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "smallserial", 0L, null)); + assertEquals(NULLABLE_INT_SCHEMA, convertType.exec("test", "SMALLSERIAL", 0L, null)); + + // long + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "int unsigned", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "bigint", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "bigint unsigned", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "serial", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "long", 0L, null)); + assertEquals(NULLABLE_LONG_SCHEMA, convertType.exec("test", "Long", 0L, null)); + + // float + assertEquals(NULLABLE_FLOAT_SCHEMA, convertType.exec("test", "float", 0L, null)); + assertEquals(NULLABLE_FLOAT_SCHEMA, convertType.exec("test", "FLOAT", 0L, null)); + + // double + assertEquals(NULLABLE_DOUBLE_SCHEMA, convertType.exec("test", "double", 0L, null)); + assertEquals(NULLABLE_DOUBLE_SCHEMA, convertType.exec("test", "Double", 0L, null)); + + // date + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "date", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "datetime", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "timestamp", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "timestamp without time zone", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "time", 0L, null)); + + // string + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "varchar", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "char", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "text", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "longtext", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "mediumtext", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "clob", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "binary", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "bit", 0L, null)); + assertEquals(NULLABLE_STRING_SCHEMA, convertType.exec("test", "character varying", 0L, null)); + + // decimal + assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(2, 0), convertType.exec("test", "number", 2L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(4, 0), convertType.exec("test", "money", 4L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 0), convertType.exec("test", "bigserial", 6L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 3), convertType.exec("test", "decimal(6,3)", 6L, 3)); + assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 6), convertType.exec("test", "decimal(6)", 6L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 6), convertType.exec("test", "decimal", 6L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(6, 6), convertType.exec("test", "double precision", 6L, null)); + assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(2, 1), convertType.exec("test", "numeric(2,1)", 2L, 1)); + assertEquals(NULLABLE_DECIMAL_SCHEMA.apply(2, 1), convertType.exec("test", "numeric", 2L, 1)); + } + + @Test + public void testConvertValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Executor convertValue = new Executor("covertValue", Schema.class, Object.class); + + // null + assertNull(convertValue.exec(NULLABLE_BOOLEAN_SCHEMA, null)); + + // boolean + assertEquals(true, convertValue.exec(NULLABLE_BOOLEAN_SCHEMA, "true")); + assertEquals(false, convertValue.exec(NULLABLE_BOOLEAN_SCHEMA, "false")); + assertEquals(false, convertValue.exec(NULLABLE_BOOLEAN_SCHEMA, "1")); + assertEquals(false, convertValue.exec(NULLABLE_BOOLEAN_SCHEMA, "hello")); + + // int + assertEquals(1, convertValue.exec(NULLABLE_INT_SCHEMA, "1")); + assertEquals(10, convertValue.exec(NULLABLE_INT_SCHEMA, "10")); + Assertions.assertThrows(InvocationTargetException.class, () -> convertValue.exec(NULLABLE_INT_SCHEMA, "10.0")); + + // long + assertEquals(1L, convertValue.exec(NULLABLE_LONG_SCHEMA, "1")); + assertEquals(10L, convertValue.exec(NULLABLE_LONG_SCHEMA, "10")); + Assertions.assertThrows(InvocationTargetException.class, () -> convertValue.exec(NULLABLE_LONG_SCHEMA, "10.0")); + + // float + assertEquals(1.0f, convertValue.exec(NULLABLE_FLOAT_SCHEMA, "1.0")); + assertEquals(10.0f, convertValue.exec(NULLABLE_FLOAT_SCHEMA, "10.0")); + assertEquals(100f, convertValue.exec(NULLABLE_FLOAT_SCHEMA, "100")); + + // double + assertEquals(1.0, convertValue.exec(NULLABLE_DOUBLE_SCHEMA, "1.0")); + assertEquals(100d, convertValue.exec(NULLABLE_DOUBLE_SCHEMA, "100")); + + // string + assertEquals(StringData.fromString("2020-10-10 01:01:01"), convertValue.exec(NULLABLE_STRING_SCHEMA, "2020-10-10 01:01:01")); + assertEquals(StringData.fromString("123"), convertValue.exec(NULLABLE_STRING_SCHEMA, "123")); + assertEquals(StringData.fromString("123.456"), convertValue.exec(NULLABLE_STRING_SCHEMA, "123.456")); + + // decimal + assertEquals(DecimalData.fromBigDecimal(new BigDecimal("123.0"), 10, 0), convertValue.exec(NULLABLE_DECIMAL_SCHEMA.apply(10, 0), "123.0")); + assertEquals(DecimalData.fromBigDecimal(new BigDecimal("123456789987654321.123456789987654321"), 37, 18), convertValue.exec(NULLABLE_DECIMAL_SCHEMA.apply(37, 18), "123456789987654321.123456789987654321")); + } + + private static final class Executor { + private final TypeConverterV2 converter = new TypeConverterV2(); + private final Method method; + + Executor(String method, Class... params) throws NoSuchMethodException { + this.method = converter.getClass().getDeclaredMethod(method, params); + this.method.setAccessible(true); + } + + public Object exec(Object... params) throws InvocationTargetException, IllegalAccessException { + return method.invoke(this.converter, params); + } + } +}