revert(common): 回滚Record消息结构更新

This reverts commit 6c9f43d310.
This commit is contained in:
v-zhangjc9
2024-07-29 15:42:06 +08:00
parent 738af7a85f
commit b0c5d04476
11 changed files with 80 additions and 455 deletions

View File

@@ -23,12 +23,6 @@
<groupId>io.github.dragons96</groupId> <groupId>io.github.dragons96</groupId>
<artifactId>sql-builder</artifactId> <artifactId>sql-builder</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.7.2</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@@ -12,7 +12,6 @@ import java.util.Map;
*/ */
public class Record implements Serializable { public class Record implements Serializable {
private Source source; private Source source;
private Info info;
private Statement statement; private Statement statement;
public Record() { public Record() {
@@ -26,14 +25,6 @@ public class Record implements Serializable {
this.source = source; this.source = source;
} }
public Info getInfo() {
return info;
}
public void setInfo(Info info) {
this.info = info;
}
public Statement getStatement() { public Statement getStatement() {
return statement; return statement;
} }
@@ -46,15 +37,14 @@ public class Record implements Serializable {
public String toString() { public String toString() {
return "Record{" + return "Record{" +
"source=" + source + "source=" + source +
", info=" + info +
", statement=" + statement + ", statement=" + statement +
'}'; '}';
} }
public static class Source implements Serializable { public static class Source implements Serializable {
private String sourceId; private String sourceId;
private String sourcePos;
private String sourceType; private String sourceType;
private String sourcePos;
private String currentTs; private String currentTs;
public Source() { public Source() {
@@ -68,14 +58,6 @@ public class Record implements Serializable {
this.sourceId = sourceId; this.sourceId = sourceId;
} }
public String getSourcePos() {
return sourcePos;
}
public void setSourcePos(String sourcePos) {
this.sourcePos = sourcePos;
}
public String getSourceType() { public String getSourceType() {
return sourceType; return sourceType;
} }
@@ -84,6 +66,14 @@ public class Record implements Serializable {
this.sourceType = sourceType; this.sourceType = sourceType;
} }
public String getSourcePos() {
return sourcePos;
}
public void setSourcePos(String sourcePos) {
this.sourcePos = sourcePos;
}
public String getCurrentTs() { public String getCurrentTs() {
return currentTs; return currentTs;
} }
@@ -96,32 +86,26 @@ public class Record implements Serializable {
public String toString() { public String toString() {
return "Source{" + return "Source{" +
"sourceId='" + sourceId + '\'' + "sourceId='" + sourceId + '\'' +
", sourcePos='" + sourcePos + '\'' +
", sourceType='" + sourceType + '\'' + ", sourceType='" + sourceType + '\'' +
", sourcePos='" + sourcePos + '\'' +
", currentTs='" + currentTs + '\'' + ", currentTs='" + currentTs + '\'' +
'}'; '}';
} }
} }
public static class Info implements Serializable { public static class Statement implements Serializable {
/**
* 无加密
*/
public static final String ENCODE_ENCRYPT_NONE = "0";
/**
* 加密
*/
public static final String ENCODE_ENCRYPT_STATEMENT = "1";
private String schema; private String schema;
private String table; private String table;
private String opStatement;
private String opType; private String opType;
private String op;
private String opTs; private String opTs;
/** private String version;
* 0: 无加密 {@link Info#ENCODE_ENCRYPT_NONE} private Map<String, Object> before;
* 1: 加密 {@link Info#ENCODE_ENCRYPT_STATEMENT} private Map<String, Object> after;
*/
private String encode; public Statement() {
}
public String getSchema() { public String getSchema() {
return schema; return schema;
@@ -139,6 +123,14 @@ public class Record implements Serializable {
this.table = table; this.table = table;
} }
public String getOpStatement() {
return opStatement;
}
public void setOpStatement(String opStatement) {
this.opStatement = opStatement;
}
public String getOpType() { public String getOpType() {
return opType; return opType;
} }
@@ -147,6 +139,14 @@ public class Record implements Serializable {
this.opType = opType; this.opType = opType;
} }
public String getOp() {
return op;
}
public void setOp(String op) {
this.op = op;
}
public String getOpTs() { public String getOpTs() {
return opTs; return opTs;
} }
@@ -155,39 +155,6 @@ public class Record implements Serializable {
this.opTs = opTs; this.opTs = opTs;
} }
public String getEncode() {
return encode;
}
public void setEncode(String encode) {
this.encode = encode;
}
@Override
public String toString() {
return "Info{" +
"schema='" + schema + '\'' +
", table='" + table + '\'' +
", opType='" + opType + '\'' +
", opTs='" + opTs + '\'' +
", encode='" + encode + '\'' +
'}';
}
}
public static class Statement implements Serializable {
// Version
private String version;
// DML
private Map<String, Object> before;
private Map<String, Object> after;
public Statement() {
}
public String getVersion() { public String getVersion() {
return version; return version;
} }
@@ -215,7 +182,13 @@ public class Record implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "Statement{" + return "Statement{" +
"version='" + version + '\'' + "schema='" + schema + '\'' +
", table='" + table + '\'' +
", opStatement='" + opStatement + '\'' +
", opType='" + opType + '\'' +
", op='" + op + '\'' +
", opTs='" + opTs + '\'' +
", version='" + version + '\'' +
", before=" + before + ", before=" + before +
", after=" + after + ", after=" + after +
'}'; '}';

View File

@@ -1,16 +1,9 @@
package com.lanyuanxiaoyao.service.common.utils; package com.lanyuanxiaoyao.service.common.utils;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.Record; import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import java.util.HashMap; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -35,8 +28,9 @@ public class RecordHelper {
} }
public static Boolean isVersionUpdateRecord(Record record) { public static Boolean isVersionUpdateRecord(Record record) {
// Record{source=Source{sourceId='versionUpdate', sourceType='null', sourcePos='null', currentTs='2022-11-15 22:17:44'}, statement=Statement{schema='crm_ivpn_cust', table='customer', opStatement='null', opType='version', op='null', opTs='2022-11-15 00:17:43', version='20220925', before=null, after=null}}
return Constants.VERSION_UPDATE_KEY.equals(record.getSource().getSourceId()) return Constants.VERSION_UPDATE_KEY.equals(record.getSource().getSourceId())
&& Constants.VERSION_KEY.equals(record.getInfo().getOpType()); && Constants.VERSION_KEY.equals(record.getStatement().getOpType());
} }
/** /**
@@ -79,7 +73,7 @@ public class RecordHelper {
} }
public static Map<String, Object> addExtraMetadata(Map<String, Object> current, TableMeta tableMeta, Record record) { public static Map<String, Object> addExtraMetadata(Map<String, Object> current, TableMeta tableMeta, Record record) {
String operationType = record.getInfo().getOpType(); String operationType = record.getStatement().getOpType();
return addExtraMetadata(current, tableMeta, record, Constants.DELETE.equals(operationType)); return addExtraMetadata(current, tableMeta, record, Constants.DELETE.equals(operationType));
} }
@@ -87,7 +81,7 @@ public class RecordHelper {
Map<String, Object> newMap = new HashMap<>(current); Map<String, Object> newMap = new HashMap<>(current);
newMap.put(Constants.UNION_KEY_NAME, RecordHelper.createUnionKey(tableMeta, current)); newMap.put(Constants.UNION_KEY_NAME, RecordHelper.createUnionKey(tableMeta, current));
newMap.put(Constants.UPDATE_TIMESTAMP_KEY_NAME, SnowFlakeHelper.next()); newMap.put(Constants.UPDATE_TIMESTAMP_KEY_NAME, SnowFlakeHelper.next());
newMap.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, record.getInfo().getOpTs()); newMap.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, record.getStatement().getOpTs());
newMap.put(Constants.HUDI_DELETE_KEY_NAME, isDelete); newMap.put(Constants.HUDI_DELETE_KEY_NAME, isDelete);
return newMap; return newMap;
} }
@@ -130,22 +124,4 @@ public class RecordHelper {
return primaryKey + "_" + partitionKey; return primaryKey + "_" + partitionKey;
} }
} }
public static Record parse(String json) {
JSONObject root = JSONUtil.parseObj(json);
if (!root.containsKey("info")) {
throw new RuntimeException(StrUtil.format("Info statement not found ({})", json));
}
JSONObject info = root.getJSONObject("info");
if (!info.containsKey("encode")) {
throw new RuntimeException(StrUtil.format("encode not found ({})", info.toString()));
}
String encode = info.getStr("encode");
if (StrUtil.equals(encode, Record.Info.ENCODE_ENCRYPT_STATEMENT)) {
String statementText = SecureHelper.decryptStatement(root.getStr("statement"));
JSONObject statement = JSONUtil.parseObj(statementText);
root.putOpt("statement", statement);
}
return root.toBean(Record.class);
}
} }

View File

@@ -1,17 +0,0 @@
package com.lanyuanxiaoyao.service.common.utils;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.Mode;
import cn.hutool.crypto.Padding;
import cn.hutool.crypto.symmetric.AES;
/**
* @author lanyuanxiaoyao
*/
public class SecureHelper {
private static final AES aes = new AES(Mode.ECB, Padding.PKCS5Padding, StrUtil.bytes("6fa22c779ec14b98"));
public static String decryptStatement(String text) {
return aes.decryptStr(text);
}
}

View File

@@ -1,301 +0,0 @@
package com.lanyuanxiaoyao.service.common;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import com.lanyuanxiaoyao.service.common.utils.SecureHelper;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Record解析
*
* @author lanyuanxiaoyao
*/
public class TestRecordParse {
// language=JSON
private static final String dmlJsonPlainText = "{\n" +
" \"source\": {\n" +
" \"sourceId\": \"set_7778798043_kafka\",\n" +
" \"sourcePos\": \"mysql-bin.014567:1015413544\",\n" +
" \"sourceType\": null,\n" +
" \"currentTs\": \"2024-06-24 00:00:29\"\n" +
" },\n" +
" \"info\": {\n" +
" \"opTs\": \"2024-06-24 00:00:29\",\n" +
" \"opType\": \"U\",\n" +
" \"schema\": \"acct_dg\",\n" +
" \"table\": \"acct_item\",\n" +
" \"encode\": \"0\"\n" +
" },\n" +
" \"statement\": {\n" +
" \"before\": {\n" +
" \"REGION_ID\": \"12123174\",\n" +
" \"CUST_ID\": \"0\",\n" +
" \"AMOUNT_FIX\": \"10456\",\n" +
" \"ONE_ACCT_ITEM_ID\": \"0\",\n" +
" \"BILL_ID\": \"0\",\n" +
" \"PARTY_ROLE_ID\": \"100\"\n" +
" },\n" +
" \"after\": {\n" +
" \"REGION_ID\": \"12123174\",\n" +
" \"CUST_ID\": \"0\",\n" +
" \"AMOUNT_FIX\": \"10456\",\n" +
" \"ONE_ACCT_ITEM_ID\": \"0\",\n" +
" \"BILL_ID\": \"0\",\n" +
" \"PARTY_ROLE_ID\": \"100\"\n" +
" }\n" +
" }\n" +
"}";
// language=JSON
private static final String dmlJsonEncodeText = "{\n" +
" \"source\": {\n" +
" \"sourceId\": \"set_7778798043_kafka\",\n" +
" \"sourcePos\": \"mysql-bin.014567:1015413544\",\n" +
" \"sourceType\": null,\n" +
" \"currentTs\": \"2024-06-24 00:00:29\"\n" +
" },\n" +
" \"info\": {\n" +
" \"opTs\": \"2024-06-24 00:00:29\",\n" +
" \"opType\": \"U\",\n" +
" \"schema\": \"acct_dg\",\n" +
" \"table\": \"acct_item\",\n" +
" \"encode\": \"1\"\n" +
" },\n" +
" \"statement\": \"I1bXXe4yjPK3fbfcMfYAdmcpLWGeewvGTxPf8MbVcQDFJXkEFpQrdG4gF/pslug/HMTFbG9Ks8SHfrnkZBqETjC5htDu+0aFXKhJllrYuPJUFY6gJBBrA2CuOAK7TMtfuQB7q58WLo4jkJ/8TM3EX47QPQfqHjLxSDIEXVkrcAHcjlO4fNIEzZe0iQSWIpqkuzLq99EEozK8Y0fDVUuuw2qxwcy1TZNyw2cfchP+KVscrNchGhfwc8hcQ+fiV5ml8ei0ADOsLYMOrbJxgkqlP63P6y40GWHmSCwkGXVnN/9JziMwI/izXzSFjfkLy8H2q3MwDEreclGmbEtibV7LocwzZLHnpHg1tLpS65A9v9E\\u003d\"\n" +
"}";
// language=JSON
private static final String versionJsonPlainText = "{\n" +
" \"source\": {\n" +
" \"currentTs\": \"2024-06-24 16:52:07\",\n" +
" \"sourceId\": \"versionUpdate\",\n" +
" \"sourcePos\": null,\n" +
" \"sourceType\": null\n" +
" },\n" +
" \"info\": {\n" +
" \"opTs\": \"2024-06-24 16:52:07\",\n" +
" \"opType\": \"version\",\n" +
" \"schema\": \"schema\",\n" +
" \"table\": \"table\",\n" +
" \"encode\": \"0\"\n" +
" },\n" +
" \"statement\": {\n" +
" \"version\": \"20240623\"\n" +
" }\n" +
"}";
// language=JSON
private static final String checkJsonPlainText = "{\n" +
" \"source\": {\n" +
" \"sourceId\": \"set_1351341338064\",\n" +
" \"sourceType\": null,\n" +
" \"sourcePos\": \"mysql-bin.008440:466807502\",\n" +
" \"currentTs\": \"2020-11-24 22:17:44\"\n" +
" },\n" +
" \"info\": {\n" +
" \"opType\": \"check\",\n" +
" \"schema\": \"schema1\",\n" +
" \"table\": \"table1\",\n" +
" \"opTs\": \"2024-06-24 16:52:07\",\n" +
" \"encode\": \"0\"\n" +
" },\n" +
" \"statement\": {\n" +
" \"checkNum\": \"\",\n" +
" \"topic\": \"topic1\",\n" +
" \"I\": \"2\",\n" +
" \"U\": \"3\",\n" +
" \"D\": \"4\",\n" +
" \"DDL\": \"0\"\n" +
" }\n" +
"}";
// language=JSON
private static final String ddlJsonPlainText = "{\n" +
" \"source\": {\n" +
" \"sourceId\": \"set_1021031048027_kafka\",\n" +
" \"sourceType\": null,\n" +
" \"sourcePos\": \"mysql-bin.017236:426272072\",\n" +
" \"currentTs\": \"2024-06-18 23:25:29\"\n" +
" },\n" +
" \"info\": {\n" +
" \"opTs\": \"2024-06-18 23:25:25\",\n" +
" \"opType\": \"ddl\",\n" +
" \"schema\": \"crm_cfguse\",\n" +
" \"table\": \"tar_grp\",\n" +
" \"encode\": \"0\"\n" +
" },\n" +
" \"statement\": {\n" +
" \"sql\": \"alter table crm_cfguse.tar_grp add column is_zx_spec varchar(10) NOT NULL DEFAULT '' COMMENT '是否包含直销客户特殊标签';\",\n" +
" \"op\": \"add_column\",\n" +
" \"tablePk\": null,\n" +
" \"tableComent\": null,\n" +
" \"column\": {\n" +
" \"before\": null,\n" +
" \"after\": {\n" +
" \"isNotNull\": \"true\",\n" +
" \"columnType\": \"string\",\n" +
" \"columnLength\": \"10\",\n" +
" \"comment\": \"'是否包含直销客户特殊标签'\",\n" +
" \"isPrimaryKey\": \"false\",\n" +
" \"columnName\": \"is_zx_spec\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
private static void timeRunnable(String name, Runnable runnable) {
long start = System.currentTimeMillis();
runnable.run();
System.out.println(StrUtil.format("{} \tcost time: {}s", name, (System.currentTimeMillis() - start) / 1000.0));
}
public static void main(String[] args) {
int times = 10_000_000;
timeRunnable("Plain", () -> {
for (int index = 0; index < times; index++) {
RecordHelper.parse(dmlJsonPlainText);
}
});
timeRunnable("Encode", () -> {
for (int index = 0; index < times; index++) {
RecordHelper.parse(dmlJsonEncodeText);
}
});
}
private void testSource(JSONObject root) {
assertTrue(root.containsKey("sourceId"));
assertTrue(root.containsKey("sourcePos"));
assertTrue(root.containsKey("sourceType"));
assertTrue(root.containsKey("currentTs"));
}
private void testInfo(JSONObject root) {
assertTrue(root.containsKey("opTs"));
assertTrue(root.containsKey("opType"));
assertTrue(root.containsKey("schema"));
assertTrue(root.containsKey("table"));
assertTrue(root.containsKey("encode"));
}
private void testDmlStatement(JSONObject root) {
assertTrue(root.containsKey("before"));
assertTrue(root.containsKey("after"));
}
private void testVersionStatement(JSONObject root) {
assertTrue(root.containsKey("version"));
}
private void testCheckStatement(JSONObject root) {
}
private void testDdlStatement(JSONObject root) {
}
@Test
public void testRecordDMLParse() {
assertDoesNotThrow(() -> {JSONUtil.parseObj(dmlJsonPlainText);});
JSONObject root = JSONUtil.parseObj(dmlJsonPlainText);
assertTrue(root.containsKey("source"));
testSource(root.getJSONObject("source"));
assertTrue(root.containsKey("info"));
testInfo(root.getJSONObject("info"));
assertTrue(root.containsKey("statement"));
testDmlStatement(root.getJSONObject("statement"));
Record record = RecordHelper.parse(dmlJsonPlainText);
assertNotNull(record);
assertNotNull(record.getSource());
assertNotNull(record.getInfo());
assertNotNull(record.getStatement());
assertNotNull(record.getStatement().getBefore());
assertNotNull(record.getStatement().getAfter());
}
@Test
public void testRecordDMLEncodeParse() {
assertDoesNotThrow(() -> {JSONUtil.parseObj(dmlJsonEncodeText);});
JSONObject root = JSONUtil.parseObj(dmlJsonEncodeText);
assertTrue(root.containsKey("source"));
testSource(root.getJSONObject("source"));
assertTrue(root.containsKey("info"));
testInfo(root.getJSONObject("info"));
assertTrue(root.containsKey("statement"));
String statementText = root.getStr("statement");
assertTrue(StrUtil.isNotBlank(statementText));
assertEquals("{\"before\":{\"REGION_ID\":\"12123174\",\"CUST_ID\":\"0\",\"AMOUNT_FIX\":\"10456\",\"ONE_ACCT_ITEM_ID\":\"0\",\"BILL_ID\":\"0\",\"PARTY_ROLE_ID\":\"100\"},\"after\":{\"REGION_ID\":\"12123174\",\"CUST_ID\":\"0\",\"AMOUNT_FIX\":\"10456\",\"ONE_ACCT_ITEM_ID\":\"0\",\"BILL_ID\":\"0\",\"PARTY_ROLE_ID\":\"100\"}}", SecureHelper.decryptStatement(statementText));
testDmlStatement(JSONUtil.parseObj(SecureHelper.decryptStatement(statementText)));
Record record = RecordHelper.parse(dmlJsonEncodeText);
assertNotNull(record);
assertNotNull(record.getSource());
assertNotNull(record.getInfo());
assertNotNull(record.getStatement());
assertNotNull(record.getStatement().getBefore());
assertNotNull(record.getStatement().getAfter());
}
@Test
public void textRecordVersionParse() {
assertDoesNotThrow(() -> {JSONUtil.parseObj(versionJsonPlainText);});
JSONObject root = JSONUtil.parseObj(versionJsonPlainText);
assertTrue(root.containsKey("source"));
testSource(root.getJSONObject("source"));
assertTrue(root.containsKey("info"));
testInfo(root.getJSONObject("info"));
assertTrue(root.containsKey("statement"));
testVersionStatement(root.getJSONObject("statement"));
Record record = RecordHelper.parse(versionJsonPlainText);
assertNotNull(record);
assertNotNull(record.getSource());
assertNotNull(record.getInfo());
assertNotNull(record.getStatement());
assertNotNull(record.getStatement().getVersion());
}
@Test
public void textRecordCheckParse() {
assertDoesNotThrow(() -> {JSONUtil.parseObj(checkJsonPlainText);});
JSONObject root = JSONUtil.parseObj(checkJsonPlainText);
assertTrue(root.containsKey("source"));
testSource(root.getJSONObject("source"));
assertTrue(root.containsKey("info"));
testInfo(root.getJSONObject("info"));
assertTrue(root.containsKey("statement"));
testCheckStatement(root.getJSONObject("statement"));
Record record = RecordHelper.parse(checkJsonPlainText);
assertNotNull(record);
assertNotNull(record.getSource());
assertNotNull(record.getInfo());
assertNotNull(record.getStatement());
}
@Test
public void textRecordDDLParse() {
assertDoesNotThrow(() -> {JSONUtil.parseObj(ddlJsonPlainText);});
JSONObject root = JSONUtil.parseObj(ddlJsonPlainText);
assertTrue(root.containsKey("source"));
testSource(root.getJSONObject("source"));
assertTrue(root.containsKey("info"));
testInfo(root.getJSONObject("info"));
assertTrue(root.containsKey("statement"));
testDdlStatement(root.getJSONObject("statement"));
Record record = RecordHelper.parse(ddlJsonPlainText);
assertNotNull(record);
assertNotNull(record.getSource());
assertNotNull(record.getInfo());
assertNotNull(record.getStatement());
}
}

View File

@@ -4,15 +4,17 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.Record; import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.utils.RecordHelper; import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner; import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
import com.lanyuanxiaoyao.service.executor.task.helper.JacksonHelper;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Map; import java.util.Map;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Lists;
/** /**
@@ -21,6 +23,7 @@ import org.eclipse.collections.api.factory.Lists;
public class PulsarMessage2Prisoner extends RichMapFunction<RecordView, Prisoner> { public class PulsarMessage2Prisoner extends RichMapFunction<RecordView, Prisoner> {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$"); private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$");
private final ObjectMapper mapper = JacksonHelper.getMapper();
private final long startTime; private final long startTime;
private final long endTime; private final long endTime;
@@ -38,19 +41,18 @@ public class PulsarMessage2Prisoner extends RichMapFunction<RecordView, Prisoner
public Prisoner map(RecordView value) { public Prisoner map(RecordView value) {
Record record; Record record;
try { try {
record = RecordHelper.parse(value.getData()); record = mapper.readValue(value.getData(), Record.class);
} catch (Throwable e) { } catch (Throwable e) {
return new Prisoner(value.getFile(), StrUtil.format("{}: {}", e.getMessage(), value.getData())); return new Prisoner(value.getFile(), StrUtil.format("{}: {}", e.getMessage(), value.getData()));
} }
Record.Info info = record.getInfo();
Record.Statement statement = record.getStatement(); Record.Statement statement = record.getStatement();
if (ObjectUtil.isNull(statement) || StrUtil.isBlank(info.getOpTs())) { if (ObjectUtil.isNull(statement) || StrUtil.isBlank(statement.getOpTs())) {
return new Prisoner(value.getFile(), StrUtil.format("Invalid statement: {}", value.getData())); return new Prisoner(value.getFile(), StrUtil.format("Invalid statement: {}", value.getData()));
} }
long timestamp; long timestamp;
try { try {
if (OPTS_PATTERN.matcher(info.getOpTs()).matches()) { if (OPTS_PATTERN.matcher(statement.getOpTs()).matches()) {
timestamp = LocalDateTime.parse(info.getOpTs(), FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli(); timestamp = LocalDateTime.parse(statement.getOpTs(), FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
} else { } else {
throw new Exception("opts not match " + OPTS_PATTERN.pattern()); throw new Exception("opts not match " + OPTS_PATTERN.pattern());
} }
@@ -61,9 +63,9 @@ public class PulsarMessage2Prisoner extends RichMapFunction<RecordView, Prisoner
return null; return null;
} }
Map<String, Object> fields; Map<String, Object> fields;
if (StrUtil.equalsAny(info.getOpType(), Constants.INSERT, Constants.UPDATE)) { if (StrUtil.equalsAny(statement.getOpType(), Constants.INSERT, Constants.UPDATE)) {
fields = record.getStatement().getAfter(); fields = record.getStatement().getAfter();
} else if (StrUtil.equals(info.getOpType(), Constants.DELETE)) { } else if (StrUtil.equals(statement.getOpType(), Constants.DELETE)) {
fields = record.getStatement().getBefore(); fields = record.getStatement().getBefore();
} else { } else {
return new Prisoner(value.getFile(), StrUtil.format("Invalid opType: {}", value.getData())); return new Prisoner(value.getFile(), StrUtil.format("Invalid opType: {}", value.getData()));
@@ -76,6 +78,6 @@ public class PulsarMessage2Prisoner extends RichMapFunction<RecordView, Prisoner
if (StrUtil.isBlank(partitionKey)) { if (StrUtil.isBlank(partitionKey)) {
return new Prisoner(value.getFile(), StrUtil.format("Invalid partitionKey: {}", value.getData())); return new Prisoner(value.getFile(), StrUtil.format("Invalid partitionKey: {}", value.getData()));
} }
return new Prisoner(value.getFile(), record.getInfo().getOpType(), primaryKey, partitionKey, timestamp); return new Prisoner(value.getFile(), record.getStatement().getOpType(), primaryKey, partitionKey, timestamp);
} }
} }

View File

@@ -51,7 +51,7 @@ public class OperationTypeFilter extends RichFilterFunction<Record> {
@Override @Override
public boolean filter(Record record) { public boolean filter(Record record) {
String opType = record.getInfo().getOpType(); String opType = record.getStatement().getOpType();
switch (opType) { switch (opType) {
case Constants.INSERT: case Constants.INSERT:
insertCounter.inc(); insertCounter.inc();
@@ -68,6 +68,6 @@ public class OperationTypeFilter extends RichFilterFunction<Record> {
default: default:
unknownCounter.inc(); unknownCounter.inc();
} }
return !Constants.DDL.equals(record.getInfo().getOpType()); return !Constants.DDL.equals(record.getStatement().getOpType());
} }
} }

View File

@@ -7,6 +7,7 @@ import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.RecordHelper; import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils;
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils; import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
@@ -17,6 +18,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -38,6 +40,7 @@ public class PulsarMessage2RecordFunction extends RichMapFunction<String, Record
private final GlobalConfiguration globalConfiguration; private final GlobalConfiguration globalConfiguration;
private final FlinkJob flinkJob; private final FlinkJob flinkJob;
private final TableMeta tableMeta; private final TableMeta tableMeta;
private final ObjectMapper mapper = JacksonUtils.getMapper();
public PulsarMessage2RecordFunction(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) { public PulsarMessage2RecordFunction(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) {
this.globalConfiguration = globalConfiguration; this.globalConfiguration = globalConfiguration;
@@ -49,12 +52,12 @@ public class PulsarMessage2RecordFunction extends RichMapFunction<String, Record
public Record map(String message) throws JsonProcessingException { public Record map(String message) throws JsonProcessingException {
Record record = null; Record record = null;
try { try {
record = RecordHelper.parse(message); record = mapper.readValue(message, Record.class);
if (RecordHelper.isNotVersionUpdateRecord(record)) { if (RecordHelper.isNotVersionUpdateRecord(record)) {
latestOperationTime.set(record.getInfo().getOpTs()); latestOperationTime.set(record.getStatement().getOpTs());
} }
} catch (Exception exception) { } catch (Exception exception) {
logger.error(StrUtil.format("Message json parse failure: {}", message), exception); logger.error("Message json parse failure", exception);
} }
return record; return record;
} }

View File

@@ -84,11 +84,10 @@ public class Record2RowDataFunction extends RichMapFunction<Record, List<RowData
List<Map<String, Object>> result = ListUtil.list(false); List<Map<String, Object>> result = ListUtil.list(false);
if (RecordHelper.isVersionUpdateRecord(record)) { if (RecordHelper.isVersionUpdateRecord(record)) {
Record.Info info = record.getInfo();
Record.Statement statement = record.getStatement(); Record.Statement statement = record.getStatement();
LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(info.getSchema()), statement.getVersion(), statement.getVersion()); LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(statement.getSchema()), statement.getVersion(), statement.getVersion());
LogHelper.info(logger, VERSION_UPDATE, "Raw: {}", mapper.writeValueAsString(record)); LogHelper.info(logger, VERSION_UPDATE, "Raw: {}", mapper.writeValueAsString(record));
StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), info.getOpTs()); StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), statement.getOpTs());
return ListUtil.empty(); return ListUtil.empty();
} }

View File

@@ -31,10 +31,6 @@ public class ValidateRecordFilter extends RichFilterFunction<Record> {
logger.warn("Record Source is null"); logger.warn("Record Source is null");
return false; return false;
} }
if (ObjectUtil.isNull(record.getInfo())) {
logger.warn("Record Info is null");
return false;
}
if (ObjectUtil.isNull(record.getStatement())) { if (ObjectUtil.isNull(record.getStatement())) {
logger.warn("Record Statement is null"); logger.warn("Record Statement is null");
return false; return false;

View File

@@ -3,7 +3,6 @@ package com.lanyuanxiaoyao.service.sync;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.common.entity.Record; import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -100,7 +99,8 @@ public class MessageParseTest {
" }\n" + " }\n" +
" }\n" + " }\n" +
"}"; "}";
Record record = RecordHelper.parse(message); ObjectMapper mapper = new ObjectMapper();
Record record = mapper.readValue(message, Record.class);
logger.info("Record: {}", record); logger.info("Record: {}", record);
} }
} }