feat(sync): 字段类型不存在不抛出异常 默认使用String
This commit is contained in:
@@ -4,13 +4,13 @@ import cn.hutool.core.util.ObjectUtil;
|
|||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.lanyuanxiaoyao.service.common.Constants;
|
import com.lanyuanxiaoyao.service.common.Constants;
|
||||||
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
|
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
|
||||||
|
import com.lanyuanxiaoyao.service.common.utils.LogHelper;
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import org.apache.flink.table.data.DecimalData;
|
import org.apache.flink.table.data.DecimalData;
|
||||||
import org.apache.flink.table.data.GenericRowData;
|
import org.apache.flink.table.data.GenericRowData;
|
||||||
@@ -18,6 +18,8 @@ import org.apache.flink.table.data.StringData;
|
|||||||
import org.apache.hudi.org.apache.avro.JsonProperties;
|
import org.apache.hudi.org.apache.avro.JsonProperties;
|
||||||
import org.apache.hudi.org.apache.avro.LogicalTypes;
|
import org.apache.hudi.org.apache.avro.LogicalTypes;
|
||||||
import org.apache.hudi.org.apache.avro.Schema;
|
import org.apache.hudi.org.apache.avro.Schema;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.hudi.org.apache.avro.Schema.*;
|
import static org.apache.hudi.org.apache.avro.Schema.*;
|
||||||
|
|
||||||
@@ -35,7 +37,7 @@ public class TypeConverterV2 implements TypeConverter {
|
|||||||
public static final Schema FLOAT_SCHEMA = create(Type.FLOAT);
|
public static final Schema FLOAT_SCHEMA = create(Type.FLOAT);
|
||||||
public static final Schema DOUBLE_SCHEMA = create(Type.DOUBLE);
|
public static final Schema DOUBLE_SCHEMA = create(Type.DOUBLE);
|
||||||
public static final Schema STRING_SCHEMA = create(Type.STRING);
|
public static final Schema STRING_SCHEMA = create(Type.STRING);
|
||||||
public static final BiFunction<Integer, Integer, Schema> FIXED_SCHEMA = (length, scala) -> createFixed(StrUtil.format("decimal_{}_{}", length, scala), null, null, length);
|
public static final BiFunction<Integer, Integer, Schema> FIXED_SCHEMA = (length, scala) -> createFixed(StrUtil.format("decimal_{}_{}", length, scala), null, null, length);
|
||||||
public static final BiFunction<Integer, Integer, Schema> DECIMAL_SCHEMA = (length, scala) -> LogicalTypes.decimal(length, scala).addToSchema(FIXED_SCHEMA.apply(length, scala));
|
public static final BiFunction<Integer, Integer, Schema> DECIMAL_SCHEMA = (length, scala) -> LogicalTypes.decimal(length, scala).addToSchema(FIXED_SCHEMA.apply(length, scala));
|
||||||
public static final BiFunction<Integer, Integer, Schema> NULLABLE_DECIMAL_SCHEMA = (length, scala) -> createUnion(NULL_SCHEMA, DECIMAL_SCHEMA.apply(length, scala));
|
public static final BiFunction<Integer, Integer, Schema> NULLABLE_DECIMAL_SCHEMA = (length, scala) -> createUnion(NULL_SCHEMA, DECIMAL_SCHEMA.apply(length, scala));
|
||||||
public static final Schema NULLABLE_BOOLEAN_SCHEMA = createUnion(NULL_SCHEMA, BOOLEAN_SCHEMA);
|
public static final Schema NULLABLE_BOOLEAN_SCHEMA = createUnion(NULL_SCHEMA, BOOLEAN_SCHEMA);
|
||||||
@@ -44,6 +46,7 @@ public class TypeConverterV2 implements TypeConverter {
|
|||||||
public static final Schema NULLABLE_FLOAT_SCHEMA = createUnion(NULL_SCHEMA, FLOAT_SCHEMA);
|
public static final Schema NULLABLE_FLOAT_SCHEMA = createUnion(NULL_SCHEMA, FLOAT_SCHEMA);
|
||||||
public static final Schema NULLABLE_DOUBLE_SCHEMA = createUnion(NULL_SCHEMA, DOUBLE_SCHEMA);
|
public static final Schema NULLABLE_DOUBLE_SCHEMA = createUnion(NULL_SCHEMA, DOUBLE_SCHEMA);
|
||||||
public static final Schema NULLABLE_STRING_SCHEMA = createUnion(NULL_SCHEMA, STRING_SCHEMA);
|
public static final Schema NULLABLE_STRING_SCHEMA = createUnion(NULL_SCHEMA, STRING_SCHEMA);
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(TypeConverterV2.class);
|
||||||
private static final Pattern BOOLEAN_REGEX = Pattern.compile("^boolean|bool$");
|
private static final Pattern BOOLEAN_REGEX = Pattern.compile("^boolean|bool$");
|
||||||
private static final Pattern INT_REGEX = Pattern.compile("^(tinyint|smallint|int|smallserial|integer)(\\(\\d+\\))?$");
|
private static final Pattern INT_REGEX = Pattern.compile("^(tinyint|smallint|int|smallserial|integer)(\\(\\d+\\))?$");
|
||||||
private static final Pattern LONG_REGEX = Pattern.compile("^(bigint unsigned)|((bigint|serial|long)(\\(\\d+\\))?)$");
|
private static final Pattern LONG_REGEX = Pattern.compile("^(bigint unsigned)|((bigint|serial|long)(\\(\\d+\\))?)$");
|
||||||
@@ -64,7 +67,7 @@ public class TypeConverterV2 implements TypeConverter {
|
|||||||
fields.add(new Field(Constants.UNION_KEY_NAME, STRING_SCHEMA, null, ""));
|
fields.add(new Field(Constants.UNION_KEY_NAME, STRING_SCHEMA, null, ""));
|
||||||
|
|
||||||
for (TableMeta.FieldMeta field : meta.getFields()) {
|
for (TableMeta.FieldMeta field : meta.getFields()) {
|
||||||
fields.add(new Field(field.getName(), convertType(field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE));
|
fields.add(new Field(field.getName(), convertType(meta, field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE));
|
||||||
}
|
}
|
||||||
|
|
||||||
fields.add(new Field(Constants.UPDATE_TIMESTAMP_KEY_NAME, LONG_SCHEMA, null, -1));
|
fields.add(new Field(Constants.UPDATE_TIMESTAMP_KEY_NAME, LONG_SCHEMA, null, -1));
|
||||||
@@ -72,7 +75,7 @@ public class TypeConverterV2 implements TypeConverter {
|
|||||||
return Schema.createRecord(meta.getTable(), null, null, false, fields);
|
return Schema.createRecord(meta.getTable(), null, null, false, fields);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Schema convertType(String type, Long length, Integer scala) {
|
private Schema convertType(TableMeta meta, String type, Long length, Integer scala) {
|
||||||
type = type.trim().toLowerCase();
|
type = type.trim().toLowerCase();
|
||||||
if (BOOLEAN_REGEX.matcher(type).matches()) {
|
if (BOOLEAN_REGEX.matcher(type).matches()) {
|
||||||
return NULLABLE_BOOLEAN_SCHEMA;
|
return NULLABLE_BOOLEAN_SCHEMA;
|
||||||
@@ -95,7 +98,8 @@ public class TypeConverterV2 implements TypeConverter {
|
|||||||
return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), scala);
|
return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), scala);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new RuntimeException(Constants.LOG_POINT_FIELD_TYPE_NOT_FOUND + " Cannot find correct type for source type: " + type + " length: " + length);
|
LogHelper.warn(logger, LogHelper.LogPoint.FIELD_TYPE_NOT_FOUND, "{} Cannot find correct type for source type: {} length: {} scala: {}", meta.getAlias(), type, length, scala);
|
||||||
|
return NULLABLE_STRING_SCHEMA;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user