From 0608fa83449e3510a1c73691abbbc41f9fce0cc0 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Wed, 20 Mar 2024 12:10:10 +0800 Subject: [PATCH] =?UTF-8?q?fix(sync):=20=E4=BF=AE=E5=A4=8D=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E5=88=A4=E6=96=AD=E7=A9=BA=E5=80=BC=E6=97=B6=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=E8=BD=AC=E6=8D=A2=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 把字段值统一当作string处理,但hudi特有字段不是字符串,类型强转失败 --- .../sync/functions/type/TypeConverterV2.java | 142 ++++++++++-------- 1 file changed, 78 insertions(+), 64 deletions(-) diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java index 80334a5..aa38bba 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/type/TypeConverterV2.java @@ -84,30 +84,35 @@ public class TypeConverterV2 implements TypeConverter { } private Schema convertType(String table, String field, String type, Long length, Integer scala) { - type = type.trim().toLowerCase(); - if (BOOLEAN_REGEX.matcher(type).matches()) { - return NULLABLE_BOOLEAN_SCHEMA; - } else if (STRING_REGEX.matcher(type).matches() || DATE_REGEX.matcher(type).matches()) { - return NULLABLE_STRING_SCHEMA; - } else if (INT_REGEX.matcher(type).matches()) { - return NULLABLE_INT_SCHEMA; - } else if (LONG_REGEX.matcher(type).matches()) { - return NULLABLE_LONG_SCHEMA; - } else if (FLOAT_REGEX.matcher(type).matches()) { - return NULLABLE_FLOAT_SCHEMA; - } else if (DOUBLE_REGEX.matcher(type).matches()) { - return NULLABLE_DOUBLE_SCHEMA; - } else if (FIXED_REGEX.matcher(type).matches()) { - return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), 0); - } else if (DECIMAL_REGEX.matcher(type).matches() || NUMERIC_REGEX.matcher(type).matches()) { - if (ObjectUtil.isNull(scala)) { - return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), 6); + try { + type = type.trim().toLowerCase(); + if (BOOLEAN_REGEX.matcher(type).matches()) { + return NULLABLE_BOOLEAN_SCHEMA; + } else if (STRING_REGEX.matcher(type).matches() || DATE_REGEX.matcher(type).matches()) { + return NULLABLE_STRING_SCHEMA; + } else if (INT_REGEX.matcher(type).matches()) { + return NULLABLE_INT_SCHEMA; + } else if (LONG_REGEX.matcher(type).matches()) { + return NULLABLE_LONG_SCHEMA; + } else if (FLOAT_REGEX.matcher(type).matches()) { + return NULLABLE_FLOAT_SCHEMA; + } else if (DOUBLE_REGEX.matcher(type).matches()) { + return NULLABLE_DOUBLE_SCHEMA; + } else if (FIXED_REGEX.matcher(type).matches()) { + return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), 0); + } else if (DECIMAL_REGEX.matcher(type).matches() || NUMERIC_REGEX.matcher(type).matches()) { + if (ObjectUtil.isNull(scala)) { + return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), 6); + } else { + return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), scala); + } } else { - return NULLABLE_DECIMAL_SCHEMA(field, length.intValue(), scala); + LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", table, type, length, scala); + return NULLABLE_STRING_SCHEMA; } - } else { - LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", table, type, length, scala); - return NULLABLE_STRING_SCHEMA; + } catch (Throwable throwable) { + logger.error(StrUtil.format("Convert type failure {} {} {} length: {} scala: {}", table, field, type, length, scala), throwable); + throw throwable; } } @@ -118,54 +123,63 @@ public class TypeConverterV2 implements TypeConverter { for (int index = 0; index < fields.size(); index++) { Field field = fields.get(index); Object value = data.getOrDefault(Constants.FIELD_COVERT.apply(meta, field.name()), null); - row.setField(index, covertValue(field.schema(), value)); + row.setField(index, covertValue(field.schema(), field.name(), value)); } return row; } - private Object covertValue(Schema schema, Object value) { - if (ObjectUtil.isNull(value)) { - return value; - } else if (NULLABLE_BOOLEAN_SCHEMA.equals(schema) || BOOLEAN_SCHEMA.equals(schema)) { - if (StrUtil.isBlankIfStr(value)) { - return null; - } - return value instanceof String ? Boolean.valueOf((String) value) : value; - } else if (NULLABLE_INT_SCHEMA.equals(schema) || INT_SCHEMA.equals(schema)) { - if (StrUtil.isBlankIfStr(value)) { - return null; - } - return value instanceof String ? Integer.valueOf((String) value) : value; - } else if (NULLABLE_LONG_SCHEMA.equals(schema) || LONG_SCHEMA.equals(schema)) { - if (StrUtil.isBlankIfStr(value)) { - return null; - } - return value instanceof String ? Long.valueOf((String) value) : value; - } else if (NULLABLE_FLOAT_SCHEMA.equals(schema) || FLOAT_SCHEMA.equals(schema)) { - if (StrUtil.isBlankIfStr(value)) { - return null; - } - return value instanceof String ? Float.valueOf((String) value) : value; - } else if (NULLABLE_DOUBLE_SCHEMA.equals(schema) || DOUBLE_SCHEMA.equals(schema)) { - if (StrUtil.isBlankIfStr(value)) { - return null; - } - return value instanceof String ? Double.valueOf((String) value) : value; - } else if (NULLABLE_STRING_SCHEMA.equals(schema) || STRING_SCHEMA.equals(schema)) { - return StringData.fromString((String) value); - } else { - for (Schema type : schema.getTypes()) { - if (type.getLogicalType() instanceof LogicalTypes.Decimal) { - if (StrUtil.isBlankIfStr(value)) { - return null; - } - LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) type.getLogicalType(); - int precision = decimalType.getPrecision(); - int scala = decimalType.getScale(); - return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala); + private Object covertValue(Schema schema, String name, Object value) { + try { + if (ObjectUtil.isNull(value)) { + return value; + } else if (NULLABLE_BOOLEAN_SCHEMA.equals(schema) || BOOLEAN_SCHEMA.equals(schema)) { + if (isNullValue(value)) { + return null; } + return value instanceof String ? Boolean.valueOf((String) value) : value; + } else if (NULLABLE_INT_SCHEMA.equals(schema) || INT_SCHEMA.equals(schema)) { + if (isNullValue(value)) { + return null; + } + return value instanceof String ? Integer.valueOf((String) value) : value; + } else if (NULLABLE_LONG_SCHEMA.equals(schema) || LONG_SCHEMA.equals(schema)) { + if (isNullValue(value)) { + return null; + } + return value instanceof String ? Long.valueOf((String) value) : value; + } else if (NULLABLE_FLOAT_SCHEMA.equals(schema) || FLOAT_SCHEMA.equals(schema)) { + if (isNullValue(value)) { + return null; + } + return value instanceof String ? Float.valueOf((String) value) : value; + } else if (NULLABLE_DOUBLE_SCHEMA.equals(schema) || DOUBLE_SCHEMA.equals(schema)) { + if (isNullValue(value)) { + return null; + } + return value instanceof String ? Double.valueOf((String) value) : value; + } else if (NULLABLE_STRING_SCHEMA.equals(schema) || STRING_SCHEMA.equals(schema)) { + return StringData.fromString((String) value); + } else { + for (Schema type : schema.getTypes()) { + if (type.getLogicalType() instanceof LogicalTypes.Decimal) { + if (isNullValue(value)) { + return null; + } + LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) type.getLogicalType(); + int precision = decimalType.getPrecision(); + int scala = decimalType.getScale(); + return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala); + } + } + return value; } - return value; + } catch (Throwable throwable) { + logger.error(StrUtil.format("Convert value failure {} {} {}", schema.toString(), name, value), throwable); + throw throwable; } } + + private boolean isNullValue(Object value) { + return StrUtil.isBlankIfStr(value) || ObjectUtil.equals("null", value); + } }