From 396bc01f94c8c607ed42f9ac84687e202f9d8fc4 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Mon, 4 Mar 2024 10:16:17 +0800 Subject: [PATCH] =?UTF-8?q?feat(sync):=20=E5=AD=97=E6=AE=B5=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=E4=B8=8D=E5=AD=98=E5=9C=A8=E4=B8=8D=E6=8A=9B=E5=87=BA?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=20=E9=BB=98=E8=AE=A4=E4=BD=BF=E7=94=A8String?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sync/functions/type/TypeConverterV2.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java index 6955ebb..bf1d4ad 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java @@ -4,13 +4,13 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.TableMeta; +import com.lanyuanxiaoyao.service.common.utils.LogHelper; import java.math.BigDecimal; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.regex.Pattern; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -18,6 +18,8 @@ import org.apache.flink.table.data.StringData; import org.apache.hudi.org.apache.avro.JsonProperties; import org.apache.hudi.org.apache.avro.LogicalTypes; import org.apache.hudi.org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.hudi.org.apache.avro.Schema.*; @@ -35,7 +37,7 @@ public class TypeConverterV2 implements TypeConverter { public static final Schema FLOAT_SCHEMA = create(Type.FLOAT); public static final Schema DOUBLE_SCHEMA = create(Type.DOUBLE); public static final Schema STRING_SCHEMA = create(Type.STRING); - public static final BiFunction FIXED_SCHEMA = (length, scala) -> createFixed(StrUtil.format("decimal_{}_{}", length, scala), null, null, length); + public static final BiFunction FIXED_SCHEMA = (length, scala) -> createFixed(StrUtil.format("decimal_{}_{}", length, scala), null, null, length); public static final BiFunction DECIMAL_SCHEMA = (length, scala) -> LogicalTypes.decimal(length, scala).addToSchema(FIXED_SCHEMA.apply(length, scala)); public static final BiFunction NULLABLE_DECIMAL_SCHEMA = (length, scala) -> createUnion(NULL_SCHEMA, DECIMAL_SCHEMA.apply(length, scala)); public static final Schema NULLABLE_BOOLEAN_SCHEMA = createUnion(NULL_SCHEMA, BOOLEAN_SCHEMA); @@ -44,6 +46,7 @@ public class TypeConverterV2 implements TypeConverter { public static final Schema NULLABLE_FLOAT_SCHEMA = createUnion(NULL_SCHEMA, FLOAT_SCHEMA); public static final Schema NULLABLE_DOUBLE_SCHEMA = createUnion(NULL_SCHEMA, DOUBLE_SCHEMA); public static final Schema NULLABLE_STRING_SCHEMA = createUnion(NULL_SCHEMA, STRING_SCHEMA); + private static final Logger logger = LoggerFactory.getLogger(TypeConverterV2.class); private static final Pattern BOOLEAN_REGEX = Pattern.compile("^boolean|bool$"); private static final Pattern INT_REGEX = Pattern.compile("^(tinyint|smallint|int|smallserial|integer)(\\(\\d+\\))?$"); private static final Pattern LONG_REGEX = Pattern.compile("^(bigint unsigned)|((bigint|serial|long)(\\(\\d+\\))?)$"); @@ -64,7 +67,7 @@ public class TypeConverterV2 implements TypeConverter { fields.add(new Field(Constants.UNION_KEY_NAME, STRING_SCHEMA, null, "")); for (TableMeta.FieldMeta field : meta.getFields()) { - fields.add(new Field(field.getName(), convertType(field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE)); + fields.add(new Field(field.getName(), convertType(meta, field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE)); } fields.add(new Field(Constants.UPDATE_TIMESTAMP_KEY_NAME, LONG_SCHEMA, null, -1)); @@ -72,7 +75,7 @@ public class TypeConverterV2 implements TypeConverter { return Schema.createRecord(meta.getTable(), null, null, false, fields); } - private Schema convertType(String type, Long length, Integer scala) { + private Schema convertType(TableMeta meta, String type, Long length, Integer scala) { type = type.trim().toLowerCase(); if (BOOLEAN_REGEX.matcher(type).matches()) { return NULLABLE_BOOLEAN_SCHEMA; @@ -95,7 +98,8 @@ public class TypeConverterV2 implements TypeConverter { return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), scala); } } else { - throw new RuntimeException(Constants.LOG_POINT_FIELD_TYPE_NOT_FOUND + " Cannot find correct type for source type: " + type + " length: " + length); + LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", meta.getAlias(), type, length, scala); + return NULLABLE_STRING_SCHEMA; } }