fix(sync): 修复字段判断空值时类型转换错误

把字段值统一当作string处理,但hudi特有字段不是字符串,类型强转失败
This commit is contained in:
v-zhangjc9
2024-03-20 12:10:10 +08:00
parent 40e387239b
commit 0608fa8344

View File

@@ -84,6 +84,7 @@ public class TypeConverterV2 implements TypeConverter {
} }
private Schema convertType(String table, String field, String type, Long length, Integer scala) { private Schema convertType(String table, String field, String type, Long length, Integer scala) {
try {
type = type.trim().toLowerCase(); type = type.trim().toLowerCase();
if (BOOLEAN_REGEX.matcher(type).matches()) { if (BOOLEAN_REGEX.matcher(type).matches()) {
return NULLABLE_BOOLEAN_SCHEMA; return NULLABLE_BOOLEAN_SCHEMA;
@@ -109,6 +110,10 @@ public class TypeConverterV2 implements TypeConverter {
LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", table, type, length, scala); LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", table, type, length, scala);
return NULLABLE_STRING_SCHEMA; return NULLABLE_STRING_SCHEMA;
} }
} catch (Throwable throwable) {
logger.error(StrUtil.format("Convert type failure {} {} {} length: {} scala: {}", table, field, type, length, scala), throwable);
throw throwable;
}
} }
@Override @Override
@@ -118,36 +123,37 @@ public class TypeConverterV2 implements TypeConverter {
for (int index = 0; index < fields.size(); index++) { for (int index = 0; index < fields.size(); index++) {
Field field = fields.get(index); Field field = fields.get(index);
Object value = data.getOrDefault(Constants.FIELD_COVERT.apply(meta, field.name()), null); Object value = data.getOrDefault(Constants.FIELD_COVERT.apply(meta, field.name()), null);
row.setField(index, covertValue(field.schema(), value)); row.setField(index, covertValue(field.schema(), field.name(), value));
} }
return row; return row;
} }
private Object covertValue(Schema schema, Object value) { private Object covertValue(Schema schema, String name, Object value) {
try {
if (ObjectUtil.isNull(value)) { if (ObjectUtil.isNull(value)) {
return value; return value;
} else if (NULLABLE_BOOLEAN_SCHEMA.equals(schema) || BOOLEAN_SCHEMA.equals(schema)) { } else if (NULLABLE_BOOLEAN_SCHEMA.equals(schema) || BOOLEAN_SCHEMA.equals(schema)) {
if (StrUtil.isBlankIfStr(value)) { if (isNullValue(value)) {
return null; return null;
} }
return value instanceof String ? Boolean.valueOf((String) value) : value; return value instanceof String ? Boolean.valueOf((String) value) : value;
} else if (NULLABLE_INT_SCHEMA.equals(schema) || INT_SCHEMA.equals(schema)) { } else if (NULLABLE_INT_SCHEMA.equals(schema) || INT_SCHEMA.equals(schema)) {
if (StrUtil.isBlankIfStr(value)) { if (isNullValue(value)) {
return null; return null;
} }
return value instanceof String ? Integer.valueOf((String) value) : value; return value instanceof String ? Integer.valueOf((String) value) : value;
} else if (NULLABLE_LONG_SCHEMA.equals(schema) || LONG_SCHEMA.equals(schema)) { } else if (NULLABLE_LONG_SCHEMA.equals(schema) || LONG_SCHEMA.equals(schema)) {
if (StrUtil.isBlankIfStr(value)) { if (isNullValue(value)) {
return null; return null;
} }
return value instanceof String ? Long.valueOf((String) value) : value; return value instanceof String ? Long.valueOf((String) value) : value;
} else if (NULLABLE_FLOAT_SCHEMA.equals(schema) || FLOAT_SCHEMA.equals(schema)) { } else if (NULLABLE_FLOAT_SCHEMA.equals(schema) || FLOAT_SCHEMA.equals(schema)) {
if (StrUtil.isBlankIfStr(value)) { if (isNullValue(value)) {
return null; return null;
} }
return value instanceof String ? Float.valueOf((String) value) : value; return value instanceof String ? Float.valueOf((String) value) : value;
} else if (NULLABLE_DOUBLE_SCHEMA.equals(schema) || DOUBLE_SCHEMA.equals(schema)) { } else if (NULLABLE_DOUBLE_SCHEMA.equals(schema) || DOUBLE_SCHEMA.equals(schema)) {
if (StrUtil.isBlankIfStr(value)) { if (isNullValue(value)) {
return null; return null;
} }
return value instanceof String ? Double.valueOf((String) value) : value; return value instanceof String ? Double.valueOf((String) value) : value;
@@ -156,7 +162,7 @@ public class TypeConverterV2 implements TypeConverter {
} else { } else {
for (Schema type : schema.getTypes()) { for (Schema type : schema.getTypes()) {
if (type.getLogicalType() instanceof LogicalTypes.Decimal) { if (type.getLogicalType() instanceof LogicalTypes.Decimal) {
if (StrUtil.isBlankIfStr(value)) { if (isNullValue(value)) {
return null; return null;
} }
LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) type.getLogicalType(); LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) type.getLogicalType();
@@ -167,5 +173,13 @@ public class TypeConverterV2 implements TypeConverter {
} }
return value; return value;
} }
} catch (Throwable throwable) {
logger.error(StrUtil.format("Convert value failure {} {} {}", schema.toString(), name, value), throwable);
throw throwable;
}
}
private boolean isNullValue(Object value) {
return StrUtil.isBlankIfStr(value) || ObjectUtil.equals("null", value);
} }
} }