diff --git a/service-common/pom.xml b/service-common/pom.xml
index 032b1e8..5867723 100644
--- a/service-common/pom.xml
+++ b/service-common/pom.xml
@@ -23,6 +23,12 @@
io.github.dragons96
sql-builder
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.7.2
+ test
+
diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java
index c73ea5e..97627bb 100644
--- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java
+++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java
@@ -12,6 +12,7 @@ import java.util.Map;
*/
public class Record implements Serializable {
private Source source;
+ private Info info;
private Statement statement;
public Record() {
@@ -25,6 +26,14 @@ public class Record implements Serializable {
this.source = source;
}
+ public Info getInfo() {
+ return info;
+ }
+
+ public void setInfo(Info info) {
+ this.info = info;
+ }
+
public Statement getStatement() {
return statement;
}
@@ -36,15 +45,16 @@ public class Record implements Serializable {
@Override
public String toString() {
return "Record{" +
- "source=" + source +
- ", statement=" + statement +
- '}';
+ "source=" + source +
+ ", info=" + info +
+ ", statement=" + statement +
+ '}';
}
public static class Source implements Serializable {
private String sourceId;
- private String sourceType;
private String sourcePos;
+ private String sourceType;
private String currentTs;
public Source() {
@@ -58,14 +68,6 @@ public class Record implements Serializable {
this.sourceId = sourceId;
}
- public String getSourceType() {
- return sourceType;
- }
-
- public void setSourceType(String sourceType) {
- this.sourceType = sourceType;
- }
-
public String getSourcePos() {
return sourcePos;
}
@@ -74,6 +76,14 @@ public class Record implements Serializable {
this.sourcePos = sourcePos;
}
+ public String getSourceType() {
+ return sourceType;
+ }
+
+ public void setSourceType(String sourceType) {
+ this.sourceType = sourceType;
+ }
+
public String getCurrentTs() {
return currentTs;
}
@@ -85,27 +95,33 @@ public class Record implements Serializable {
@Override
public String toString() {
return "Source{" +
- "sourceId='" + sourceId + '\'' +
- ", sourceType='" + sourceType + '\'' +
- ", sourcePos='" + sourcePos + '\'' +
- ", currentTs='" + currentTs + '\'' +
- '}';
+ "sourceId='" + sourceId + '\'' +
+ ", sourcePos='" + sourcePos + '\'' +
+ ", sourceType='" + sourceType + '\'' +
+ ", currentTs='" + currentTs + '\'' +
+ '}';
}
}
- public static class Statement implements Serializable {
+ public static class Info implements Serializable {
+ /**
+ * 无加密
+ */
+ public static final String ENCODE_ENCRYPT_NONE = "0";
+ /**
+ * 加密
+ */
+ public static final String ENCODE_ENCRYPT_STATEMENT = "1";
+
private String schema;
private String table;
- private String opStatement;
private String opType;
- private String op;
private String opTs;
- private String version;
- private Map before;
- private Map after;
-
- public Statement() {
- }
+ /**
+ * 0: 无加密 {@link Info#ENCODE_ENCRYPT_NONE}
+ * 1: 加密 {@link Info#ENCODE_ENCRYPT_STATEMENT}
+ */
+ private String encode;
public String getSchema() {
return schema;
@@ -123,14 +139,6 @@ public class Record implements Serializable {
this.table = table;
}
- public String getOpStatement() {
- return opStatement;
- }
-
- public void setOpStatement(String opStatement) {
- this.opStatement = opStatement;
- }
-
public String getOpType() {
return opType;
}
@@ -139,14 +147,6 @@ public class Record implements Serializable {
this.opType = opType;
}
- public String getOp() {
- return op;
- }
-
- public void setOp(String op) {
- this.op = op;
- }
-
public String getOpTs() {
return opTs;
}
@@ -155,6 +155,39 @@ public class Record implements Serializable {
this.opTs = opTs;
}
+ public String getEncode() {
+ return encode;
+ }
+
+ public void setEncode(String encode) {
+ this.encode = encode;
+ }
+
+ @Override
+ public String toString() {
+ return "Info{" +
+ "schema='" + schema + '\'' +
+ ", table='" + table + '\'' +
+ ", opType='" + opType + '\'' +
+ ", opTs='" + opTs + '\'' +
+ ", encode='" + encode + '\'' +
+ '}';
+ }
+ }
+
+ public static class Statement implements Serializable {
+ // Version
+
+ private String version;
+
+ // DML
+
+ private Map before;
+ private Map after;
+
+ public Statement() {
+ }
+
public String getVersion() {
return version;
}
@@ -182,16 +215,10 @@ public class Record implements Serializable {
@Override
public String toString() {
return "Statement{" +
- "schema='" + schema + '\'' +
- ", table='" + table + '\'' +
- ", opStatement='" + opStatement + '\'' +
- ", opType='" + opType + '\'' +
- ", op='" + op + '\'' +
- ", opTs='" + opTs + '\'' +
- ", version='" + version + '\'' +
- ", before=" + before +
- ", after=" + after +
- '}';
+ "version='" + version + '\'' +
+ ", before=" + before +
+ ", after=" + after +
+ '}';
}
}
}
diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java
index 53b9be3..5a29dc8 100644
--- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java
+++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java
@@ -1,9 +1,16 @@
package com.lanyuanxiaoyao.service.common.utils;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -28,9 +35,8 @@ public class RecordHelper {
}
public static Boolean isVersionUpdateRecord(Record record) {
- // Record{source=Source{sourceId='versionUpdate', sourceType='null', sourcePos='null', currentTs='2022-11-15 22:17:44'}, statement=Statement{schema='crm_ivpn_cust', table='customer', opStatement='null', opType='version', op='null', opTs='2022-11-15 00:17:43', version='20220925', before=null, after=null}}
return Constants.VERSION_UPDATE_KEY.equals(record.getSource().getSourceId())
- && Constants.VERSION_KEY.equals(record.getStatement().getOpType());
+ && Constants.VERSION_KEY.equals(record.getInfo().getOpType());
}
/**
@@ -73,7 +79,7 @@ public class RecordHelper {
}
public static Map addExtraMetadata(Map current, TableMeta tableMeta, Record record) {
- String operationType = record.getStatement().getOpType();
+ String operationType = record.getInfo().getOpType();
return addExtraMetadata(current, tableMeta, record, Constants.DELETE.equals(operationType));
}
@@ -81,7 +87,7 @@ public class RecordHelper {
Map newMap = new HashMap<>(current);
newMap.put(Constants.UNION_KEY_NAME, RecordHelper.createUnionKey(tableMeta, current));
newMap.put(Constants.UPDATE_TIMESTAMP_KEY_NAME, SnowFlakeHelper.next());
- newMap.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, record.getStatement().getOpTs());
+ newMap.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, record.getInfo().getOpTs());
newMap.put(Constants.HUDI_DELETE_KEY_NAME, isDelete);
return newMap;
}
@@ -124,4 +130,22 @@ public class RecordHelper {
return primaryKey + "_" + partitionKey;
}
}
+
+ public static Record parse(String json) {
+ JSONObject root = JSONUtil.parseObj(json);
+ if (!root.containsKey("info")) {
+ throw new RuntimeException(StrUtil.format("Info statement not found ({})", json));
+ }
+ JSONObject info = root.getJSONObject("info");
+ if (!info.containsKey("encode")) {
+ throw new RuntimeException(StrUtil.format("encode not found ({})", info.toString()));
+ }
+ String encode = info.getStr("encode");
+ if (StrUtil.equals(encode, Record.Info.ENCODE_ENCRYPT_STATEMENT)) {
+ String statementText = SecureHelper.decryptStatement(root.getStr("statement"));
+ JSONObject statement = JSONUtil.parseObj(statementText);
+ root.putOpt("statement", statement);
+ }
+ return root.toBean(Record.class);
+ }
}
diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java
new file mode 100644
index 0000000..da7a431
--- /dev/null
+++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java
@@ -0,0 +1,17 @@
+package com.lanyuanxiaoyao.service.common.utils;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.crypto.Mode;
+import cn.hutool.crypto.Padding;
+import cn.hutool.crypto.symmetric.AES;
+
+/**
+ * @author lanyuanxiaoyao
+ */
+public class SecureHelper {
+ private static final AES aes = new AES(Mode.ECB, Padding.PKCS5Padding, StrUtil.bytes("6fa22c779ec14b98"));
+
+ public static String decryptStatement(String text) {
+ return aes.decryptStr(text);
+ }
+}
diff --git a/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java b/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java
new file mode 100644
index 0000000..2fa053e
--- /dev/null
+++ b/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java
@@ -0,0 +1,301 @@
+package com.lanyuanxiaoyao.service.common;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.lanyuanxiaoyao.service.common.entity.Record;
+import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
+import com.lanyuanxiaoyao.service.common.utils.SecureHelper;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Record解析
+ *
+ * @author lanyuanxiaoyao
+ */
+public class TestRecordParse {
+ // language=JSON
+ private static final String dmlJsonPlainText = "{\n" +
+ " \"source\": {\n" +
+ " \"sourceId\": \"set_7778798043_kafka\",\n" +
+ " \"sourcePos\": \"mysql-bin.014567:1015413544\",\n" +
+ " \"sourceType\": null,\n" +
+ " \"currentTs\": \"2024-06-24 00:00:29\"\n" +
+ " },\n" +
+ " \"info\": {\n" +
+ " \"opTs\": \"2024-06-24 00:00:29\",\n" +
+ " \"opType\": \"U\",\n" +
+ " \"schema\": \"acct_dg\",\n" +
+ " \"table\": \"acct_item\",\n" +
+ " \"encode\": \"0\"\n" +
+ " },\n" +
+ " \"statement\": {\n" +
+ " \"before\": {\n" +
+ " \"REGION_ID\": \"12123174\",\n" +
+ " \"CUST_ID\": \"0\",\n" +
+ " \"AMOUNT_FIX\": \"10456\",\n" +
+ " \"ONE_ACCT_ITEM_ID\": \"0\",\n" +
+ " \"BILL_ID\": \"0\",\n" +
+ " \"PARTY_ROLE_ID\": \"100\"\n" +
+ " },\n" +
+ " \"after\": {\n" +
+ " \"REGION_ID\": \"12123174\",\n" +
+ " \"CUST_ID\": \"0\",\n" +
+ " \"AMOUNT_FIX\": \"10456\",\n" +
+ " \"ONE_ACCT_ITEM_ID\": \"0\",\n" +
+ " \"BILL_ID\": \"0\",\n" +
+ " \"PARTY_ROLE_ID\": \"100\"\n" +
+ " }\n" +
+ " }\n" +
+ "}";
+
+ // language=JSON
+ private static final String dmlJsonEncodeText = "{\n" +
+ " \"source\": {\n" +
+ " \"sourceId\": \"set_7778798043_kafka\",\n" +
+ " \"sourcePos\": \"mysql-bin.014567:1015413544\",\n" +
+ " \"sourceType\": null,\n" +
+ " \"currentTs\": \"2024-06-24 00:00:29\"\n" +
+ " },\n" +
+ " \"info\": {\n" +
+ " \"opTs\": \"2024-06-24 00:00:29\",\n" +
+ " \"opType\": \"U\",\n" +
+ " \"schema\": \"acct_dg\",\n" +
+ " \"table\": \"acct_item\",\n" +
+ " \"encode\": \"1\"\n" +
+ " },\n" +
+ " \"statement\": \"I1bXXe4yjPK3fbfcMfYAdmcpLWGeewvGTxPf8MbVcQDFJXkEFpQrdG4gF/pslug/HMTFbG9Ks8SHfrnkZBqETjC5htDu+0aFXKhJllrYuPJUFY6gJBBrA2CuOAK7TMtfuQB7q58WLo4jkJ/8TM3EX47QPQfqHjLxSDIEXVkrcAHcjlO4fNIEzZe0iQSWIpqkuzLq99EEozK8Y0fDVUuuw2qxwcy1TZNyw2cfchP+KVscrNchGhfwc8hcQ+fiV5ml8ei0ADOsLYMOrbJxgkqlP63P6y40GWHmSCwkGXVnN/9JziMwI/izXzSFjfkLy8H2q3MwDEreclGmbEtibV7LocwzZLHnpHg1tLpS65A9v9E\\u003d\"\n" +
+ "}";
+
+ // language=JSON
+ private static final String versionJsonPlainText = "{\n" +
+ " \"source\": {\n" +
+ " \"currentTs\": \"2024-06-24 16:52:07\",\n" +
+ " \"sourceId\": \"versionUpdate\",\n" +
+ " \"sourcePos\": null,\n" +
+ " \"sourceType\": null\n" +
+ " },\n" +
+ " \"info\": {\n" +
+ " \"opTs\": \"2024-06-24 16:52:07\",\n" +
+ " \"opType\": \"version\",\n" +
+ " \"schema\": \"schema\",\n" +
+ " \"table\": \"table\",\n" +
+ " \"encode\": \"0\"\n" +
+ " },\n" +
+ " \"statement\": {\n" +
+ " \"version\": \"20240623\"\n" +
+ " }\n" +
+ "}";
+
+ // language=JSON
+ private static final String checkJsonPlainText = "{\n" +
+ " \"source\": {\n" +
+ " \"sourceId\": \"set_1351341338064\",\n" +
+ " \"sourceType\": null,\n" +
+ " \"sourcePos\": \"mysql-bin.008440:466807502\",\n" +
+ " \"currentTs\": \"2020-11-24 22:17:44\"\n" +
+ " },\n" +
+ " \"info\": {\n" +
+ " \"opType\": \"check\",\n" +
+ " \"schema\": \"schema1\",\n" +
+ " \"table\": \"table1\",\n" +
+ " \"opTs\": \"2024-06-24 16:52:07\",\n" +
+ " \"encode\": \"0\"\n" +
+ " },\n" +
+ " \"statement\": {\n" +
+ " \"checkNum\": \"\",\n" +
+ " \"topic\": \"topic1\",\n" +
+ " \"I\": \"2\",\n" +
+ " \"U\": \"3\",\n" +
+ " \"D\": \"4\",\n" +
+ " \"DDL\": \"0\"\n" +
+ " }\n" +
+ "}";
+
+ // language=JSON
+ private static final String ddlJsonPlainText = "{\n" +
+ " \"source\": {\n" +
+ " \"sourceId\": \"set_1021031048027_kafka\",\n" +
+ " \"sourceType\": null,\n" +
+ " \"sourcePos\": \"mysql-bin.017236:426272072\",\n" +
+ " \"currentTs\": \"2024-06-18 23:25:29\"\n" +
+ " },\n" +
+ " \"info\": {\n" +
+ " \"opTs\": \"2024-06-18 23:25:25\",\n" +
+ " \"opType\": \"ddl\",\n" +
+ " \"schema\": \"crm_cfguse\",\n" +
+ " \"table\": \"tar_grp\",\n" +
+ " \"encode\": \"0\"\n" +
+ " },\n" +
+ " \"statement\": {\n" +
+ " \"sql\": \"alter table crm_cfguse.tar_grp add column is_zx_spec varchar(10) NOT NULL DEFAULT '' COMMENT '是否包含直销客户特殊标签';\",\n" +
+ " \"op\": \"add_column\",\n" +
+ " \"tablePk\": null,\n" +
+ " \"tableComent\": null,\n" +
+ " \"column\": {\n" +
+ " \"before\": null,\n" +
+ " \"after\": {\n" +
+ " \"isNotNull\": \"true\",\n" +
+ " \"columnType\": \"string\",\n" +
+ " \"columnLength\": \"10\",\n" +
+ " \"comment\": \"'是否包含直销客户特殊标签'\",\n" +
+ " \"isPrimaryKey\": \"false\",\n" +
+ " \"columnName\": \"is_zx_spec\"\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ "}";
+
+ private static void timeRunnable(String name, Runnable runnable) {
+ long start = System.currentTimeMillis();
+ runnable.run();
+ System.out.println(StrUtil.format("{} \tcost time: {}s", name, (System.currentTimeMillis() - start) / 1000.0));
+ }
+
+ public static void main(String[] args) {
+ int times = 10_000_000;
+ timeRunnable("Plain", () -> {
+ for (int index = 0; index < times; index++) {
+ RecordHelper.parse(dmlJsonPlainText);
+ }
+ });
+ timeRunnable("Encode", () -> {
+ for (int index = 0; index < times; index++) {
+ RecordHelper.parse(dmlJsonEncodeText);
+ }
+ });
+ }
+
+ private void testSource(JSONObject root) {
+ assertTrue(root.containsKey("sourceId"));
+ assertTrue(root.containsKey("sourcePos"));
+ assertTrue(root.containsKey("sourceType"));
+ assertTrue(root.containsKey("currentTs"));
+ }
+
+ private void testInfo(JSONObject root) {
+ assertTrue(root.containsKey("opTs"));
+ assertTrue(root.containsKey("opType"));
+ assertTrue(root.containsKey("schema"));
+ assertTrue(root.containsKey("table"));
+ assertTrue(root.containsKey("encode"));
+ }
+
+ private void testDmlStatement(JSONObject root) {
+ assertTrue(root.containsKey("before"));
+ assertTrue(root.containsKey("after"));
+ }
+
+ private void testVersionStatement(JSONObject root) {
+ assertTrue(root.containsKey("version"));
+ }
+
+ private void testCheckStatement(JSONObject root) {
+ }
+
+ private void testDdlStatement(JSONObject root) {
+ }
+
+ @Test
+ public void testRecordDMLParse() {
+ assertDoesNotThrow(() -> {JSONUtil.parseObj(dmlJsonPlainText);});
+ JSONObject root = JSONUtil.parseObj(dmlJsonPlainText);
+ assertTrue(root.containsKey("source"));
+ testSource(root.getJSONObject("source"));
+ assertTrue(root.containsKey("info"));
+ testInfo(root.getJSONObject("info"));
+ assertTrue(root.containsKey("statement"));
+ testDmlStatement(root.getJSONObject("statement"));
+
+ Record record = RecordHelper.parse(dmlJsonPlainText);
+ assertNotNull(record);
+ assertNotNull(record.getSource());
+ assertNotNull(record.getInfo());
+ assertNotNull(record.getStatement());
+ assertNotNull(record.getStatement().getBefore());
+ assertNotNull(record.getStatement().getAfter());
+ }
+
+ @Test
+ public void testRecordDMLEncodeParse() {
+ assertDoesNotThrow(() -> {JSONUtil.parseObj(dmlJsonEncodeText);});
+ JSONObject root = JSONUtil.parseObj(dmlJsonEncodeText);
+ assertTrue(root.containsKey("source"));
+ testSource(root.getJSONObject("source"));
+ assertTrue(root.containsKey("info"));
+ testInfo(root.getJSONObject("info"));
+ assertTrue(root.containsKey("statement"));
+ String statementText = root.getStr("statement");
+ assertTrue(StrUtil.isNotBlank(statementText));
+ assertEquals("{\"before\":{\"REGION_ID\":\"12123174\",\"CUST_ID\":\"0\",\"AMOUNT_FIX\":\"10456\",\"ONE_ACCT_ITEM_ID\":\"0\",\"BILL_ID\":\"0\",\"PARTY_ROLE_ID\":\"100\"},\"after\":{\"REGION_ID\":\"12123174\",\"CUST_ID\":\"0\",\"AMOUNT_FIX\":\"10456\",\"ONE_ACCT_ITEM_ID\":\"0\",\"BILL_ID\":\"0\",\"PARTY_ROLE_ID\":\"100\"}}", SecureHelper.decryptStatement(statementText));
+ testDmlStatement(JSONUtil.parseObj(SecureHelper.decryptStatement(statementText)));
+
+ Record record = RecordHelper.parse(dmlJsonEncodeText);
+ assertNotNull(record);
+ assertNotNull(record.getSource());
+ assertNotNull(record.getInfo());
+ assertNotNull(record.getStatement());
+ assertNotNull(record.getStatement().getBefore());
+ assertNotNull(record.getStatement().getAfter());
+ }
+
+ @Test
+ public void textRecordVersionParse() {
+ assertDoesNotThrow(() -> {JSONUtil.parseObj(versionJsonPlainText);});
+ JSONObject root = JSONUtil.parseObj(versionJsonPlainText);
+ assertTrue(root.containsKey("source"));
+ testSource(root.getJSONObject("source"));
+ assertTrue(root.containsKey("info"));
+ testInfo(root.getJSONObject("info"));
+ assertTrue(root.containsKey("statement"));
+ testVersionStatement(root.getJSONObject("statement"));
+
+ Record record = RecordHelper.parse(versionJsonPlainText);
+ assertNotNull(record);
+ assertNotNull(record.getSource());
+ assertNotNull(record.getInfo());
+ assertNotNull(record.getStatement());
+ assertNotNull(record.getStatement().getVersion());
+ }
+
+ @Test
+ public void textRecordCheckParse() {
+ assertDoesNotThrow(() -> {JSONUtil.parseObj(checkJsonPlainText);});
+ JSONObject root = JSONUtil.parseObj(checkJsonPlainText);
+ assertTrue(root.containsKey("source"));
+ testSource(root.getJSONObject("source"));
+ assertTrue(root.containsKey("info"));
+ testInfo(root.getJSONObject("info"));
+ assertTrue(root.containsKey("statement"));
+ testCheckStatement(root.getJSONObject("statement"));
+
+ Record record = RecordHelper.parse(checkJsonPlainText);
+ assertNotNull(record);
+ assertNotNull(record.getSource());
+ assertNotNull(record.getInfo());
+ assertNotNull(record.getStatement());
+ }
+
+ @Test
+ public void textRecordDDLParse() {
+ assertDoesNotThrow(() -> {JSONUtil.parseObj(ddlJsonPlainText);});
+ JSONObject root = JSONUtil.parseObj(ddlJsonPlainText);
+ assertTrue(root.containsKey("source"));
+ testSource(root.getJSONObject("source"));
+ assertTrue(root.containsKey("info"));
+ testInfo(root.getJSONObject("info"));
+ assertTrue(root.containsKey("statement"));
+ testDdlStatement(root.getJSONObject("statement"));
+
+ Record record = RecordHelper.parse(ddlJsonPlainText);
+ assertNotNull(record);
+ assertNotNull(record.getSource());
+ assertNotNull(record.getInfo());
+ assertNotNull(record.getStatement());
+ }
+}
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java
index e6437c7..ac74f29 100644
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java
@@ -4,17 +4,15 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.Record;
-import com.lanyuanxiaoyao.service.executor.core.TaskContext;
+import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
-import com.lanyuanxiaoyao.service.executor.task.helper.JacksonHelper;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.collections.api.factory.Lists;
/**
@@ -23,7 +21,6 @@ import org.eclipse.collections.api.factory.Lists;
public class PulsarMessage2Prisoner extends RichMapFunction {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$");
- private final ObjectMapper mapper = JacksonHelper.getMapper();
private final long startTime;
private final long endTime;
@@ -41,18 +38,19 @@ public class PulsarMessage2Prisoner extends RichMapFunction fields;
- if (StrUtil.equalsAny(statement.getOpType(), Constants.INSERT, Constants.UPDATE)) {
+ if (StrUtil.equalsAny(info.getOpType(), Constants.INSERT, Constants.UPDATE)) {
fields = record.getStatement().getAfter();
- } else if (StrUtil.equals(statement.getOpType(), Constants.DELETE)) {
+ } else if (StrUtil.equals(info.getOpType(), Constants.DELETE)) {
fields = record.getStatement().getBefore();
} else {
return new Prisoner(value.getFile(), StrUtil.format("Invalid opType: {}", value.getData()));
@@ -78,6 +76,6 @@ public class PulsarMessage2Prisoner extends RichMapFunction {
@Override
public boolean filter(Record record) {
- String opType = record.getStatement().getOpType();
+ String opType = record.getInfo().getOpType();
switch (opType) {
case Constants.INSERT:
insertCounter.inc();
@@ -68,6 +68,6 @@ public class OperationTypeFilter extends RichFilterFunction {
default:
unknownCounter.inc();
}
- return !Constants.DDL.equals(record.getStatement().getOpType());
+ return !Constants.DDL.equals(record.getInfo().getOpType());
}
}
diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java
index 8881261..7897503 100644
--- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java
+++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java
@@ -7,7 +7,6 @@ import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
-import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils;
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
@@ -18,7 +17,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +38,6 @@ public class PulsarMessage2RecordFunction extends RichMapFunction> result = ListUtil.list(false);
if (RecordHelper.isVersionUpdateRecord(record)) {
+ Record.Info info = record.getInfo();
Record.Statement statement = record.getStatement();
- LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(statement.getSchema()), statement.getVersion(), statement.getVersion());
+ LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(info.getSchema()), statement.getVersion(), statement.getVersion());
LogHelper.info(logger, VERSION_UPDATE, "Raw: {}", mapper.writeValueAsString(record));
- StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), statement.getOpTs());
+ StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), info.getOpTs());
return ListUtil.empty();
}
diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java
index 0e9e434..961709b 100644
--- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java
+++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java
@@ -31,6 +31,10 @@ public class ValidateRecordFilter extends RichFilterFunction {
logger.warn("Record Source is null");
return false;
}
+ if (ObjectUtil.isNull(record.getInfo())) {
+ logger.warn("Record Info is null");
+ return false;
+ }
if (ObjectUtil.isNull(record.getStatement())) {
logger.warn("Record Statement is null");
return false;
diff --git a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java
index 78a7235..53b84bf 100644
--- a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java
+++ b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java
@@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.sync;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.common.entity.Record;
+import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,8 +100,7 @@ public class MessageParseTest {
" }\n" +
" }\n" +
"}";
- ObjectMapper mapper = new ObjectMapper();
- Record record = mapper.readValue(message, Record.class);
+ Record record = RecordHelper.parse(message);
logger.info("Record: {}", record);
}
}