diff --git a/service-common/pom.xml b/service-common/pom.xml
index 5867723..032b1e8 100644
--- a/service-common/pom.xml
+++ b/service-common/pom.xml
@@ -23,12 +23,6 @@
io.github.dragons96
sql-builder
-
- org.junit.jupiter
- junit-jupiter
- 5.7.2
- test
-
diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java
index 97627bb..c73ea5e 100644
--- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java
+++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java
@@ -12,7 +12,6 @@ import java.util.Map;
*/
public class Record implements Serializable {
private Source source;
- private Info info;
private Statement statement;
public Record() {
@@ -26,14 +25,6 @@ public class Record implements Serializable {
this.source = source;
}
- public Info getInfo() {
- return info;
- }
-
- public void setInfo(Info info) {
- this.info = info;
- }
-
public Statement getStatement() {
return statement;
}
@@ -45,16 +36,15 @@ public class Record implements Serializable {
@Override
public String toString() {
return "Record{" +
- "source=" + source +
- ", info=" + info +
- ", statement=" + statement +
- '}';
+ "source=" + source +
+ ", statement=" + statement +
+ '}';
}
public static class Source implements Serializable {
private String sourceId;
- private String sourcePos;
private String sourceType;
+ private String sourcePos;
private String currentTs;
public Source() {
@@ -68,14 +58,6 @@ public class Record implements Serializable {
this.sourceId = sourceId;
}
- public String getSourcePos() {
- return sourcePos;
- }
-
- public void setSourcePos(String sourcePos) {
- this.sourcePos = sourcePos;
- }
-
public String getSourceType() {
return sourceType;
}
@@ -84,6 +66,14 @@ public class Record implements Serializable {
this.sourceType = sourceType;
}
+ public String getSourcePos() {
+ return sourcePos;
+ }
+
+ public void setSourcePos(String sourcePos) {
+ this.sourcePos = sourcePos;
+ }
+
public String getCurrentTs() {
return currentTs;
}
@@ -95,33 +85,27 @@ public class Record implements Serializable {
@Override
public String toString() {
return "Source{" +
- "sourceId='" + sourceId + '\'' +
- ", sourcePos='" + sourcePos + '\'' +
- ", sourceType='" + sourceType + '\'' +
- ", currentTs='" + currentTs + '\'' +
- '}';
+ "sourceId='" + sourceId + '\'' +
+ ", sourceType='" + sourceType + '\'' +
+ ", sourcePos='" + sourcePos + '\'' +
+ ", currentTs='" + currentTs + '\'' +
+ '}';
}
}
- public static class Info implements Serializable {
- /**
- * 无加密
- */
- public static final String ENCODE_ENCRYPT_NONE = "0";
- /**
- * 加密
- */
- public static final String ENCODE_ENCRYPT_STATEMENT = "1";
-
+ public static class Statement implements Serializable {
private String schema;
private String table;
+ private String opStatement;
private String opType;
+ private String op;
private String opTs;
- /**
- * 0: 无加密 {@link Info#ENCODE_ENCRYPT_NONE}
- * 1: 加密 {@link Info#ENCODE_ENCRYPT_STATEMENT}
- */
- private String encode;
+ private String version;
+ private Map before;
+ private Map after;
+
+ public Statement() {
+ }
public String getSchema() {
return schema;
@@ -139,6 +123,14 @@ public class Record implements Serializable {
this.table = table;
}
+ public String getOpStatement() {
+ return opStatement;
+ }
+
+ public void setOpStatement(String opStatement) {
+ this.opStatement = opStatement;
+ }
+
public String getOpType() {
return opType;
}
@@ -147,6 +139,14 @@ public class Record implements Serializable {
this.opType = opType;
}
+ public String getOp() {
+ return op;
+ }
+
+ public void setOp(String op) {
+ this.op = op;
+ }
+
public String getOpTs() {
return opTs;
}
@@ -155,39 +155,6 @@ public class Record implements Serializable {
this.opTs = opTs;
}
- public String getEncode() {
- return encode;
- }
-
- public void setEncode(String encode) {
- this.encode = encode;
- }
-
- @Override
- public String toString() {
- return "Info{" +
- "schema='" + schema + '\'' +
- ", table='" + table + '\'' +
- ", opType='" + opType + '\'' +
- ", opTs='" + opTs + '\'' +
- ", encode='" + encode + '\'' +
- '}';
- }
- }
-
- public static class Statement implements Serializable {
- // Version
-
- private String version;
-
- // DML
-
- private Map before;
- private Map after;
-
- public Statement() {
- }
-
public String getVersion() {
return version;
}
@@ -215,10 +182,16 @@ public class Record implements Serializable {
@Override
public String toString() {
return "Statement{" +
- "version='" + version + '\'' +
- ", before=" + before +
- ", after=" + after +
- '}';
+ "schema='" + schema + '\'' +
+ ", table='" + table + '\'' +
+ ", opStatement='" + opStatement + '\'' +
+ ", opType='" + opType + '\'' +
+ ", op='" + op + '\'' +
+ ", opTs='" + opTs + '\'' +
+ ", version='" + version + '\'' +
+ ", before=" + before +
+ ", after=" + after +
+ '}';
}
}
}
diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java
index 5a29dc8..53b9be3 100644
--- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java
+++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java
@@ -1,16 +1,9 @@
package com.lanyuanxiaoyao.service.common.utils;
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
+import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -35,8 +28,9 @@ public class RecordHelper {
}
public static Boolean isVersionUpdateRecord(Record record) {
+ // Record{source=Source{sourceId='versionUpdate', sourceType='null', sourcePos='null', currentTs='2022-11-15 22:17:44'}, statement=Statement{schema='crm_ivpn_cust', table='customer', opStatement='null', opType='version', op='null', opTs='2022-11-15 00:17:43', version='20220925', before=null, after=null}}
return Constants.VERSION_UPDATE_KEY.equals(record.getSource().getSourceId())
- && Constants.VERSION_KEY.equals(record.getInfo().getOpType());
+ && Constants.VERSION_KEY.equals(record.getStatement().getOpType());
}
/**
@@ -79,7 +73,7 @@ public class RecordHelper {
}
public static Map addExtraMetadata(Map current, TableMeta tableMeta, Record record) {
- String operationType = record.getInfo().getOpType();
+ String operationType = record.getStatement().getOpType();
return addExtraMetadata(current, tableMeta, record, Constants.DELETE.equals(operationType));
}
@@ -87,7 +81,7 @@ public class RecordHelper {
Map newMap = new HashMap<>(current);
newMap.put(Constants.UNION_KEY_NAME, RecordHelper.createUnionKey(tableMeta, current));
newMap.put(Constants.UPDATE_TIMESTAMP_KEY_NAME, SnowFlakeHelper.next());
- newMap.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, record.getInfo().getOpTs());
+ newMap.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, record.getStatement().getOpTs());
newMap.put(Constants.HUDI_DELETE_KEY_NAME, isDelete);
return newMap;
}
@@ -130,22 +124,4 @@ public class RecordHelper {
return primaryKey + "_" + partitionKey;
}
}
-
- public static Record parse(String json) {
- JSONObject root = JSONUtil.parseObj(json);
- if (!root.containsKey("info")) {
- throw new RuntimeException(StrUtil.format("Info statement not found ({})", json));
- }
- JSONObject info = root.getJSONObject("info");
- if (!info.containsKey("encode")) {
- throw new RuntimeException(StrUtil.format("encode not found ({})", info.toString()));
- }
- String encode = info.getStr("encode");
- if (StrUtil.equals(encode, Record.Info.ENCODE_ENCRYPT_STATEMENT)) {
- String statementText = SecureHelper.decryptStatement(root.getStr("statement"));
- JSONObject statement = JSONUtil.parseObj(statementText);
- root.putOpt("statement", statement);
- }
- return root.toBean(Record.class);
- }
}
diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java
deleted file mode 100644
index da7a431..0000000
--- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.lanyuanxiaoyao.service.common.utils;
-
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.crypto.Mode;
-import cn.hutool.crypto.Padding;
-import cn.hutool.crypto.symmetric.AES;
-
-/**
- * @author lanyuanxiaoyao
- */
-public class SecureHelper {
- private static final AES aes = new AES(Mode.ECB, Padding.PKCS5Padding, StrUtil.bytes("6fa22c779ec14b98"));
-
- public static String decryptStatement(String text) {
- return aes.decryptStr(text);
- }
-}
diff --git a/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java b/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java
deleted file mode 100644
index 2fa053e..0000000
--- a/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java
+++ /dev/null
@@ -1,301 +0,0 @@
-package com.lanyuanxiaoyao.service.common;
-
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
-import com.lanyuanxiaoyao.service.common.entity.Record;
-import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
-import com.lanyuanxiaoyao.service.common.utils.SecureHelper;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Record解析
- *
- * @author lanyuanxiaoyao
- */
-public class TestRecordParse {
- // language=JSON
- private static final String dmlJsonPlainText = "{\n" +
- " \"source\": {\n" +
- " \"sourceId\": \"set_7778798043_kafka\",\n" +
- " \"sourcePos\": \"mysql-bin.014567:1015413544\",\n" +
- " \"sourceType\": null,\n" +
- " \"currentTs\": \"2024-06-24 00:00:29\"\n" +
- " },\n" +
- " \"info\": {\n" +
- " \"opTs\": \"2024-06-24 00:00:29\",\n" +
- " \"opType\": \"U\",\n" +
- " \"schema\": \"acct_dg\",\n" +
- " \"table\": \"acct_item\",\n" +
- " \"encode\": \"0\"\n" +
- " },\n" +
- " \"statement\": {\n" +
- " \"before\": {\n" +
- " \"REGION_ID\": \"12123174\",\n" +
- " \"CUST_ID\": \"0\",\n" +
- " \"AMOUNT_FIX\": \"10456\",\n" +
- " \"ONE_ACCT_ITEM_ID\": \"0\",\n" +
- " \"BILL_ID\": \"0\",\n" +
- " \"PARTY_ROLE_ID\": \"100\"\n" +
- " },\n" +
- " \"after\": {\n" +
- " \"REGION_ID\": \"12123174\",\n" +
- " \"CUST_ID\": \"0\",\n" +
- " \"AMOUNT_FIX\": \"10456\",\n" +
- " \"ONE_ACCT_ITEM_ID\": \"0\",\n" +
- " \"BILL_ID\": \"0\",\n" +
- " \"PARTY_ROLE_ID\": \"100\"\n" +
- " }\n" +
- " }\n" +
- "}";
-
- // language=JSON
- private static final String dmlJsonEncodeText = "{\n" +
- " \"source\": {\n" +
- " \"sourceId\": \"set_7778798043_kafka\",\n" +
- " \"sourcePos\": \"mysql-bin.014567:1015413544\",\n" +
- " \"sourceType\": null,\n" +
- " \"currentTs\": \"2024-06-24 00:00:29\"\n" +
- " },\n" +
- " \"info\": {\n" +
- " \"opTs\": \"2024-06-24 00:00:29\",\n" +
- " \"opType\": \"U\",\n" +
- " \"schema\": \"acct_dg\",\n" +
- " \"table\": \"acct_item\",\n" +
- " \"encode\": \"1\"\n" +
- " },\n" +
- " \"statement\": \"I1bXXe4yjPK3fbfcMfYAdmcpLWGeewvGTxPf8MbVcQDFJXkEFpQrdG4gF/pslug/HMTFbG9Ks8SHfrnkZBqETjC5htDu+0aFXKhJllrYuPJUFY6gJBBrA2CuOAK7TMtfuQB7q58WLo4jkJ/8TM3EX47QPQfqHjLxSDIEXVkrcAHcjlO4fNIEzZe0iQSWIpqkuzLq99EEozK8Y0fDVUuuw2qxwcy1TZNyw2cfchP+KVscrNchGhfwc8hcQ+fiV5ml8ei0ADOsLYMOrbJxgkqlP63P6y40GWHmSCwkGXVnN/9JziMwI/izXzSFjfkLy8H2q3MwDEreclGmbEtibV7LocwzZLHnpHg1tLpS65A9v9E\\u003d\"\n" +
- "}";
-
- // language=JSON
- private static final String versionJsonPlainText = "{\n" +
- " \"source\": {\n" +
- " \"currentTs\": \"2024-06-24 16:52:07\",\n" +
- " \"sourceId\": \"versionUpdate\",\n" +
- " \"sourcePos\": null,\n" +
- " \"sourceType\": null\n" +
- " },\n" +
- " \"info\": {\n" +
- " \"opTs\": \"2024-06-24 16:52:07\",\n" +
- " \"opType\": \"version\",\n" +
- " \"schema\": \"schema\",\n" +
- " \"table\": \"table\",\n" +
- " \"encode\": \"0\"\n" +
- " },\n" +
- " \"statement\": {\n" +
- " \"version\": \"20240623\"\n" +
- " }\n" +
- "}";
-
- // language=JSON
- private static final String checkJsonPlainText = "{\n" +
- " \"source\": {\n" +
- " \"sourceId\": \"set_1351341338064\",\n" +
- " \"sourceType\": null,\n" +
- " \"sourcePos\": \"mysql-bin.008440:466807502\",\n" +
- " \"currentTs\": \"2020-11-24 22:17:44\"\n" +
- " },\n" +
- " \"info\": {\n" +
- " \"opType\": \"check\",\n" +
- " \"schema\": \"schema1\",\n" +
- " \"table\": \"table1\",\n" +
- " \"opTs\": \"2024-06-24 16:52:07\",\n" +
- " \"encode\": \"0\"\n" +
- " },\n" +
- " \"statement\": {\n" +
- " \"checkNum\": \"\",\n" +
- " \"topic\": \"topic1\",\n" +
- " \"I\": \"2\",\n" +
- " \"U\": \"3\",\n" +
- " \"D\": \"4\",\n" +
- " \"DDL\": \"0\"\n" +
- " }\n" +
- "}";
-
- // language=JSON
- private static final String ddlJsonPlainText = "{\n" +
- " \"source\": {\n" +
- " \"sourceId\": \"set_1021031048027_kafka\",\n" +
- " \"sourceType\": null,\n" +
- " \"sourcePos\": \"mysql-bin.017236:426272072\",\n" +
- " \"currentTs\": \"2024-06-18 23:25:29\"\n" +
- " },\n" +
- " \"info\": {\n" +
- " \"opTs\": \"2024-06-18 23:25:25\",\n" +
- " \"opType\": \"ddl\",\n" +
- " \"schema\": \"crm_cfguse\",\n" +
- " \"table\": \"tar_grp\",\n" +
- " \"encode\": \"0\"\n" +
- " },\n" +
- " \"statement\": {\n" +
- " \"sql\": \"alter table crm_cfguse.tar_grp add column is_zx_spec varchar(10) NOT NULL DEFAULT '' COMMENT '是否包含直销客户特殊标签';\",\n" +
- " \"op\": \"add_column\",\n" +
- " \"tablePk\": null,\n" +
- " \"tableComent\": null,\n" +
- " \"column\": {\n" +
- " \"before\": null,\n" +
- " \"after\": {\n" +
- " \"isNotNull\": \"true\",\n" +
- " \"columnType\": \"string\",\n" +
- " \"columnLength\": \"10\",\n" +
- " \"comment\": \"'是否包含直销客户特殊标签'\",\n" +
- " \"isPrimaryKey\": \"false\",\n" +
- " \"columnName\": \"is_zx_spec\"\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- "}";
-
- private static void timeRunnable(String name, Runnable runnable) {
- long start = System.currentTimeMillis();
- runnable.run();
- System.out.println(StrUtil.format("{} \tcost time: {}s", name, (System.currentTimeMillis() - start) / 1000.0));
- }
-
- public static void main(String[] args) {
- int times = 10_000_000;
- timeRunnable("Plain", () -> {
- for (int index = 0; index < times; index++) {
- RecordHelper.parse(dmlJsonPlainText);
- }
- });
- timeRunnable("Encode", () -> {
- for (int index = 0; index < times; index++) {
- RecordHelper.parse(dmlJsonEncodeText);
- }
- });
- }
-
- private void testSource(JSONObject root) {
- assertTrue(root.containsKey("sourceId"));
- assertTrue(root.containsKey("sourcePos"));
- assertTrue(root.containsKey("sourceType"));
- assertTrue(root.containsKey("currentTs"));
- }
-
- private void testInfo(JSONObject root) {
- assertTrue(root.containsKey("opTs"));
- assertTrue(root.containsKey("opType"));
- assertTrue(root.containsKey("schema"));
- assertTrue(root.containsKey("table"));
- assertTrue(root.containsKey("encode"));
- }
-
- private void testDmlStatement(JSONObject root) {
- assertTrue(root.containsKey("before"));
- assertTrue(root.containsKey("after"));
- }
-
- private void testVersionStatement(JSONObject root) {
- assertTrue(root.containsKey("version"));
- }
-
- private void testCheckStatement(JSONObject root) {
- }
-
- private void testDdlStatement(JSONObject root) {
- }
-
- @Test
- public void testRecordDMLParse() {
- assertDoesNotThrow(() -> {JSONUtil.parseObj(dmlJsonPlainText);});
- JSONObject root = JSONUtil.parseObj(dmlJsonPlainText);
- assertTrue(root.containsKey("source"));
- testSource(root.getJSONObject("source"));
- assertTrue(root.containsKey("info"));
- testInfo(root.getJSONObject("info"));
- assertTrue(root.containsKey("statement"));
- testDmlStatement(root.getJSONObject("statement"));
-
- Record record = RecordHelper.parse(dmlJsonPlainText);
- assertNotNull(record);
- assertNotNull(record.getSource());
- assertNotNull(record.getInfo());
- assertNotNull(record.getStatement());
- assertNotNull(record.getStatement().getBefore());
- assertNotNull(record.getStatement().getAfter());
- }
-
- @Test
- public void testRecordDMLEncodeParse() {
- assertDoesNotThrow(() -> {JSONUtil.parseObj(dmlJsonEncodeText);});
- JSONObject root = JSONUtil.parseObj(dmlJsonEncodeText);
- assertTrue(root.containsKey("source"));
- testSource(root.getJSONObject("source"));
- assertTrue(root.containsKey("info"));
- testInfo(root.getJSONObject("info"));
- assertTrue(root.containsKey("statement"));
- String statementText = root.getStr("statement");
- assertTrue(StrUtil.isNotBlank(statementText));
- assertEquals("{\"before\":{\"REGION_ID\":\"12123174\",\"CUST_ID\":\"0\",\"AMOUNT_FIX\":\"10456\",\"ONE_ACCT_ITEM_ID\":\"0\",\"BILL_ID\":\"0\",\"PARTY_ROLE_ID\":\"100\"},\"after\":{\"REGION_ID\":\"12123174\",\"CUST_ID\":\"0\",\"AMOUNT_FIX\":\"10456\",\"ONE_ACCT_ITEM_ID\":\"0\",\"BILL_ID\":\"0\",\"PARTY_ROLE_ID\":\"100\"}}", SecureHelper.decryptStatement(statementText));
- testDmlStatement(JSONUtil.parseObj(SecureHelper.decryptStatement(statementText)));
-
- Record record = RecordHelper.parse(dmlJsonEncodeText);
- assertNotNull(record);
- assertNotNull(record.getSource());
- assertNotNull(record.getInfo());
- assertNotNull(record.getStatement());
- assertNotNull(record.getStatement().getBefore());
- assertNotNull(record.getStatement().getAfter());
- }
-
- @Test
- public void textRecordVersionParse() {
- assertDoesNotThrow(() -> {JSONUtil.parseObj(versionJsonPlainText);});
- JSONObject root = JSONUtil.parseObj(versionJsonPlainText);
- assertTrue(root.containsKey("source"));
- testSource(root.getJSONObject("source"));
- assertTrue(root.containsKey("info"));
- testInfo(root.getJSONObject("info"));
- assertTrue(root.containsKey("statement"));
- testVersionStatement(root.getJSONObject("statement"));
-
- Record record = RecordHelper.parse(versionJsonPlainText);
- assertNotNull(record);
- assertNotNull(record.getSource());
- assertNotNull(record.getInfo());
- assertNotNull(record.getStatement());
- assertNotNull(record.getStatement().getVersion());
- }
-
- @Test
- public void textRecordCheckParse() {
- assertDoesNotThrow(() -> {JSONUtil.parseObj(checkJsonPlainText);});
- JSONObject root = JSONUtil.parseObj(checkJsonPlainText);
- assertTrue(root.containsKey("source"));
- testSource(root.getJSONObject("source"));
- assertTrue(root.containsKey("info"));
- testInfo(root.getJSONObject("info"));
- assertTrue(root.containsKey("statement"));
- testCheckStatement(root.getJSONObject("statement"));
-
- Record record = RecordHelper.parse(checkJsonPlainText);
- assertNotNull(record);
- assertNotNull(record.getSource());
- assertNotNull(record.getInfo());
- assertNotNull(record.getStatement());
- }
-
- @Test
- public void textRecordDDLParse() {
- assertDoesNotThrow(() -> {JSONUtil.parseObj(ddlJsonPlainText);});
- JSONObject root = JSONUtil.parseObj(ddlJsonPlainText);
- assertTrue(root.containsKey("source"));
- testSource(root.getJSONObject("source"));
- assertTrue(root.containsKey("info"));
- testInfo(root.getJSONObject("info"));
- assertTrue(root.containsKey("statement"));
- testDdlStatement(root.getJSONObject("statement"));
-
- Record record = RecordHelper.parse(ddlJsonPlainText);
- assertNotNull(record);
- assertNotNull(record.getSource());
- assertNotNull(record.getInfo());
- assertNotNull(record.getStatement());
- }
-}
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java
index ac74f29..e6437c7 100644
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java
@@ -4,15 +4,17 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.Record;
-import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
+import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
+import com.lanyuanxiaoyao.service.executor.task.helper.JacksonHelper;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.collections.api.factory.Lists;
/**
@@ -21,6 +23,7 @@ import org.eclipse.collections.api.factory.Lists;
public class PulsarMessage2Prisoner extends RichMapFunction {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$");
+ private final ObjectMapper mapper = JacksonHelper.getMapper();
private final long startTime;
private final long endTime;
@@ -38,19 +41,18 @@ public class PulsarMessage2Prisoner extends RichMapFunction fields;
- if (StrUtil.equalsAny(info.getOpType(), Constants.INSERT, Constants.UPDATE)) {
+ if (StrUtil.equalsAny(statement.getOpType(), Constants.INSERT, Constants.UPDATE)) {
fields = record.getStatement().getAfter();
- } else if (StrUtil.equals(info.getOpType(), Constants.DELETE)) {
+ } else if (StrUtil.equals(statement.getOpType(), Constants.DELETE)) {
fields = record.getStatement().getBefore();
} else {
return new Prisoner(value.getFile(), StrUtil.format("Invalid opType: {}", value.getData()));
@@ -76,6 +78,6 @@ public class PulsarMessage2Prisoner extends RichMapFunction {
@Override
public boolean filter(Record record) {
- String opType = record.getInfo().getOpType();
+ String opType = record.getStatement().getOpType();
switch (opType) {
case Constants.INSERT:
insertCounter.inc();
@@ -68,6 +68,6 @@ public class OperationTypeFilter extends RichFilterFunction {
default:
unknownCounter.inc();
}
- return !Constants.DDL.equals(record.getInfo().getOpType());
+ return !Constants.DDL.equals(record.getStatement().getOpType());
}
}
diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java
index 7897503..8881261 100644
--- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java
+++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java
@@ -7,6 +7,7 @@ import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
+import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils;
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
@@ -17,6 +18,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +40,7 @@ public class PulsarMessage2RecordFunction extends RichMapFunction> result = ListUtil.list(false);
if (RecordHelper.isVersionUpdateRecord(record)) {
- Record.Info info = record.getInfo();
Record.Statement statement = record.getStatement();
- LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(info.getSchema()), statement.getVersion(), statement.getVersion());
+ LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(statement.getSchema()), statement.getVersion(), statement.getVersion());
LogHelper.info(logger, VERSION_UPDATE, "Raw: {}", mapper.writeValueAsString(record));
- StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), info.getOpTs());
+ StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), statement.getOpTs());
return ListUtil.empty();
}
diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java
index 961709b..0e9e434 100644
--- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java
+++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java
@@ -31,10 +31,6 @@ public class ValidateRecordFilter extends RichFilterFunction {
logger.warn("Record Source is null");
return false;
}
- if (ObjectUtil.isNull(record.getInfo())) {
- logger.warn("Record Info is null");
- return false;
- }
if (ObjectUtil.isNull(record.getStatement())) {
logger.warn("Record Statement is null");
return false;
diff --git a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java
index 53b84bf..78a7235 100644
--- a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java
+++ b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java
@@ -3,7 +3,6 @@ package com.lanyuanxiaoyao.service.sync;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.common.entity.Record;
-import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,7 +99,8 @@ public class MessageParseTest {
" }\n" +
" }\n" +
"}";
- Record record = RecordHelper.parse(message);
+ ObjectMapper mapper = new ObjectMapper();
+ Record record = mapper.readValue(message, Record.class);
logger.info("Record: {}", record);
}
}