diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java index 6955ebb..bf1d4ad 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java @@ -4,13 +4,13 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.TableMeta; +import com.lanyuanxiaoyao.service.common.utils.LogHelper; import java.math.BigDecimal; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.regex.Pattern; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -18,6 +18,8 @@ import org.apache.flink.table.data.StringData; import org.apache.hudi.org.apache.avro.JsonProperties; import org.apache.hudi.org.apache.avro.LogicalTypes; import org.apache.hudi.org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.hudi.org.apache.avro.Schema.*; @@ -35,7 +37,7 @@ public class TypeConverterV2 implements TypeConverter { public static final Schema FLOAT_SCHEMA = create(Type.FLOAT); public static final Schema DOUBLE_SCHEMA = create(Type.DOUBLE); public static final Schema STRING_SCHEMA = create(Type.STRING); - public static final BiFunction FIXED_SCHEMA = (length, scala) -> createFixed(StrUtil.format("decimal_{}_{}", length, scala), null, null, length); + public static final BiFunction FIXED_SCHEMA = (length, scala) -> createFixed(StrUtil.format("decimal_{}_{}", length, scala), null, null, length); public static final BiFunction DECIMAL_SCHEMA = (length, scala) -> LogicalTypes.decimal(length, scala).addToSchema(FIXED_SCHEMA.apply(length, scala)); public static final BiFunction NULLABLE_DECIMAL_SCHEMA = (length, scala) -> createUnion(NULL_SCHEMA, DECIMAL_SCHEMA.apply(length, scala)); public static final Schema NULLABLE_BOOLEAN_SCHEMA = createUnion(NULL_SCHEMA, BOOLEAN_SCHEMA); @@ -44,6 +46,7 @@ public class TypeConverterV2 implements TypeConverter { public static final Schema NULLABLE_FLOAT_SCHEMA = createUnion(NULL_SCHEMA, FLOAT_SCHEMA); public static final Schema NULLABLE_DOUBLE_SCHEMA = createUnion(NULL_SCHEMA, DOUBLE_SCHEMA); public static final Schema NULLABLE_STRING_SCHEMA = createUnion(NULL_SCHEMA, STRING_SCHEMA); + private static final Logger logger = LoggerFactory.getLogger(TypeConverterV2.class); private static final Pattern BOOLEAN_REGEX = Pattern.compile("^boolean|bool$"); private static final Pattern INT_REGEX = Pattern.compile("^(tinyint|smallint|int|smallserial|integer)(\\(\\d+\\))?$"); private static final Pattern LONG_REGEX = Pattern.compile("^(bigint unsigned)|((bigint|serial|long)(\\(\\d+\\))?)$"); @@ -64,7 +67,7 @@ public class TypeConverterV2 implements TypeConverter { fields.add(new Field(Constants.UNION_KEY_NAME, STRING_SCHEMA, null, "")); for (TableMeta.FieldMeta field : meta.getFields()) { - fields.add(new Field(field.getName(), convertType(field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE)); + fields.add(new Field(field.getName(), convertType(meta, field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE)); } fields.add(new Field(Constants.UPDATE_TIMESTAMP_KEY_NAME, LONG_SCHEMA, null, -1)); @@ -72,7 +75,7 @@ public class TypeConverterV2 implements TypeConverter { return Schema.createRecord(meta.getTable(), null, null, false, fields); } - private Schema convertType(String type, Long length, Integer scala) { + private Schema convertType(TableMeta meta, String type, Long length, Integer scala) { type = type.trim().toLowerCase(); if (BOOLEAN_REGEX.matcher(type).matches()) { return NULLABLE_BOOLEAN_SCHEMA; @@ -95,7 +98,8 @@ public class TypeConverterV2 implements TypeConverter { return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), scala); } } else { - throw new RuntimeException(Constants.LOG_POINT_FIELD_TYPE_NOT_FOUND + " Cannot find correct type for source type: " + type + " length: " + length); + LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", meta.getAlias(), type, length, scala); + return NULLABLE_STRING_SCHEMA; } }