From 6c9f43d3109d8fed65af03dad7998f70b82fbc81 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Tue, 9 Jul 2024 15:36:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(common):=20Record=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E7=BB=93=E6=9E=84=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增info字段用于存一些公共字段,statement加入消息加密能力 --- service-common/pom.xml | 6 + .../service/common/entity/Record.java | 131 +++++--- .../service/common/utils/RecordHelper.java | 34 +- .../service/common/utils/SecureHelper.java | 17 + .../service/common/TestRecordParse.java | 301 ++++++++++++++++++ .../police/PulsarMessage2Prisoner.java | 20 +- .../sync/functions/OperationTypeFilter.java | 4 +- .../PulsarMessage2RecordFunction.java | 9 +- .../functions/Record2RowDataFunction.java | 5 +- .../sync/functions/ValidateRecordFilter.java | 4 + .../service/sync/MessageParseTest.java | 4 +- 11 files changed, 455 insertions(+), 80 deletions(-) create mode 100644 service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java create mode 100644 service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java diff --git a/service-common/pom.xml b/service-common/pom.xml index 032b1e8..5867723 100644 --- a/service-common/pom.xml +++ b/service-common/pom.xml @@ -23,6 +23,12 @@ io.github.dragons96 sql-builder + + org.junit.jupiter + junit-jupiter + 5.7.2 + test + diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java index c73ea5e..97627bb 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java @@ -12,6 +12,7 @@ import java.util.Map; */ public class Record implements Serializable { private Source source; + private Info info; private Statement statement; public Record() { @@ -25,6 +26,14 @@ public class Record implements Serializable { this.source = source; } + public Info getInfo() { + return info; + } + + public void setInfo(Info info) { + this.info = info; + } + public Statement getStatement() { return statement; } @@ -36,15 +45,16 @@ public class Record implements Serializable { @Override public String toString() { return "Record{" + - "source=" + source + - ", statement=" + statement + - '}'; + "source=" + source + + ", info=" + info + + ", statement=" + statement + + '}'; } public static class Source implements Serializable { private String sourceId; - private String sourceType; private String sourcePos; + private String sourceType; private String currentTs; public Source() { @@ -58,14 +68,6 @@ public class Record implements Serializable { this.sourceId = sourceId; } - public String getSourceType() { - return sourceType; - } - - public void setSourceType(String sourceType) { - this.sourceType = sourceType; - } - public String getSourcePos() { return sourcePos; } @@ -74,6 +76,14 @@ public class Record implements Serializable { this.sourcePos = sourcePos; } + public String getSourceType() { + return sourceType; + } + + public void setSourceType(String sourceType) { + this.sourceType = sourceType; + } + public String getCurrentTs() { return currentTs; } @@ -85,27 +95,33 @@ public class Record implements Serializable { @Override public String toString() { return "Source{" + - "sourceId='" + sourceId + '\'' + - ", sourceType='" + sourceType + '\'' + - ", sourcePos='" + sourcePos + '\'' + - ", currentTs='" + currentTs + '\'' + - '}'; + "sourceId='" + sourceId + '\'' + + ", sourcePos='" + sourcePos + '\'' + + ", sourceType='" + sourceType + '\'' + + ", currentTs='" + currentTs + '\'' + + '}'; } } - public static class Statement implements Serializable { + public static class Info implements Serializable { + /** + * 无加密 + */ + public static final String ENCODE_ENCRYPT_NONE = "0"; + /** + * 加密 + */ + public static final String ENCODE_ENCRYPT_STATEMENT = "1"; + private String schema; private String table; - private String opStatement; private String opType; - private String op; private String opTs; - private String version; - private Map before; - private Map after; - - public Statement() { - } + /** + * 0: 无加密 {@link Info#ENCODE_ENCRYPT_NONE} + * 1: 加密 {@link Info#ENCODE_ENCRYPT_STATEMENT} + */ + private String encode; public String getSchema() { return schema; @@ -123,14 +139,6 @@ public class Record implements Serializable { this.table = table; } - public String getOpStatement() { - return opStatement; - } - - public void setOpStatement(String opStatement) { - this.opStatement = opStatement; - } - public String getOpType() { return opType; } @@ -139,14 +147,6 @@ public class Record implements Serializable { this.opType = opType; } - public String getOp() { - return op; - } - - public void setOp(String op) { - this.op = op; - } - public String getOpTs() { return opTs; } @@ -155,6 +155,39 @@ public class Record implements Serializable { this.opTs = opTs; } + public String getEncode() { + return encode; + } + + public void setEncode(String encode) { + this.encode = encode; + } + + @Override + public String toString() { + return "Info{" + + "schema='" + schema + '\'' + + ", table='" + table + '\'' + + ", opType='" + opType + '\'' + + ", opTs='" + opTs + '\'' + + ", encode='" + encode + '\'' + + '}'; + } + } + + public static class Statement implements Serializable { + // Version + + private String version; + + // DML + + private Map before; + private Map after; + + public Statement() { + } + public String getVersion() { return version; } @@ -182,16 +215,10 @@ public class Record implements Serializable { @Override public String toString() { return "Statement{" + - "schema='" + schema + '\'' + - ", table='" + table + '\'' + - ", opStatement='" + opStatement + '\'' + - ", opType='" + opType + '\'' + - ", op='" + op + '\'' + - ", opTs='" + opTs + '\'' + - ", version='" + version + '\'' + - ", before=" + before + - ", after=" + after + - '}'; + "version='" + version + '\'' + + ", before=" + before + + ", after=" + after + + '}'; } } } diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java index 53b9be3..5a29dc8 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java @@ -1,9 +1,16 @@ package com.lanyuanxiaoyao.service.common.utils; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.Record; import com.lanyuanxiaoyao.service.common.entity.TableMeta; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -28,9 +35,8 @@ public class RecordHelper { } public static Boolean isVersionUpdateRecord(Record record) { - // Record{source=Source{sourceId='versionUpdate', sourceType='null', sourcePos='null', currentTs='2022-11-15 22:17:44'}, statement=Statement{schema='crm_ivpn_cust', table='customer', opStatement='null', opType='version', op='null', opTs='2022-11-15 00:17:43', version='20220925', before=null, after=null}} return Constants.VERSION_UPDATE_KEY.equals(record.getSource().getSourceId()) - && Constants.VERSION_KEY.equals(record.getStatement().getOpType()); + && Constants.VERSION_KEY.equals(record.getInfo().getOpType()); } /** @@ -73,7 +79,7 @@ public class RecordHelper { } public static Map addExtraMetadata(Map current, TableMeta tableMeta, Record record) { - String operationType = record.getStatement().getOpType(); + String operationType = record.getInfo().getOpType(); return addExtraMetadata(current, tableMeta, record, Constants.DELETE.equals(operationType)); } @@ -81,7 +87,7 @@ public class RecordHelper { Map newMap = new HashMap<>(current); newMap.put(Constants.UNION_KEY_NAME, RecordHelper.createUnionKey(tableMeta, current)); newMap.put(Constants.UPDATE_TIMESTAMP_KEY_NAME, SnowFlakeHelper.next()); - newMap.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, record.getStatement().getOpTs()); + newMap.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, record.getInfo().getOpTs()); newMap.put(Constants.HUDI_DELETE_KEY_NAME, isDelete); return newMap; } @@ -124,4 +130,22 @@ public class RecordHelper { return primaryKey + "_" + partitionKey; } } + + public static Record parse(String json) { + JSONObject root = JSONUtil.parseObj(json); + if (!root.containsKey("info")) { + throw new RuntimeException(StrUtil.format("Info statement not found ({})", json)); + } + JSONObject info = root.getJSONObject("info"); + if (!info.containsKey("encode")) { + throw new RuntimeException(StrUtil.format("encode not found ({})", info.toString())); + } + String encode = info.getStr("encode"); + if (StrUtil.equals(encode, Record.Info.ENCODE_ENCRYPT_STATEMENT)) { + String statementText = SecureHelper.decryptStatement(root.getStr("statement")); + JSONObject statement = JSONUtil.parseObj(statementText); + root.putOpt("statement", statement); + } + return root.toBean(Record.class); + } } diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java new file mode 100644 index 0000000..da7a431 --- /dev/null +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java @@ -0,0 +1,17 @@ +package com.lanyuanxiaoyao.service.common.utils; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.Mode; +import cn.hutool.crypto.Padding; +import cn.hutool.crypto.symmetric.AES; + +/** + * @author lanyuanxiaoyao + */ +public class SecureHelper { + private static final AES aes = new AES(Mode.ECB, Padding.PKCS5Padding, StrUtil.bytes("6fa22c779ec14b98")); + + public static String decryptStatement(String text) { + return aes.decryptStr(text); + } +} diff --git a/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java b/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java new file mode 100644 index 0000000..2fa053e --- /dev/null +++ b/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java @@ -0,0 +1,301 @@ +package com.lanyuanxiaoyao.service.common; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.lanyuanxiaoyao.service.common.entity.Record; +import com.lanyuanxiaoyao.service.common.utils.RecordHelper; +import com.lanyuanxiaoyao.service.common.utils.SecureHelper; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Record解析 + * + * @author lanyuanxiaoyao + */ +public class TestRecordParse { + // language=JSON + private static final String dmlJsonPlainText = "{\n" + + " \"source\": {\n" + + " \"sourceId\": \"set_7778798043_kafka\",\n" + + " \"sourcePos\": \"mysql-bin.014567:1015413544\",\n" + + " \"sourceType\": null,\n" + + " \"currentTs\": \"2024-06-24 00:00:29\"\n" + + " },\n" + + " \"info\": {\n" + + " \"opTs\": \"2024-06-24 00:00:29\",\n" + + " \"opType\": \"U\",\n" + + " \"schema\": \"acct_dg\",\n" + + " \"table\": \"acct_item\",\n" + + " \"encode\": \"0\"\n" + + " },\n" + + " \"statement\": {\n" + + " \"before\": {\n" + + " \"REGION_ID\": \"12123174\",\n" + + " \"CUST_ID\": \"0\",\n" + + " \"AMOUNT_FIX\": \"10456\",\n" + + " \"ONE_ACCT_ITEM_ID\": \"0\",\n" + + " \"BILL_ID\": \"0\",\n" + + " \"PARTY_ROLE_ID\": \"100\"\n" + + " },\n" + + " \"after\": {\n" + + " \"REGION_ID\": \"12123174\",\n" + + " \"CUST_ID\": \"0\",\n" + + " \"AMOUNT_FIX\": \"10456\",\n" + + " \"ONE_ACCT_ITEM_ID\": \"0\",\n" + + " \"BILL_ID\": \"0\",\n" + + " \"PARTY_ROLE_ID\": \"100\"\n" + + " }\n" + + " }\n" + + "}"; + + // language=JSON + private static final String dmlJsonEncodeText = "{\n" + + " \"source\": {\n" + + " \"sourceId\": \"set_7778798043_kafka\",\n" + + " \"sourcePos\": \"mysql-bin.014567:1015413544\",\n" + + " \"sourceType\": null,\n" + + " \"currentTs\": \"2024-06-24 00:00:29\"\n" + + " },\n" + + " \"info\": {\n" + + " \"opTs\": \"2024-06-24 00:00:29\",\n" + + " \"opType\": \"U\",\n" + + " \"schema\": \"acct_dg\",\n" + + " \"table\": \"acct_item\",\n" + + " \"encode\": \"1\"\n" + + " },\n" + + " \"statement\": \"I1bXXe4yjPK3fbfcMfYAdmcpLWGeewvGTxPf8MbVcQDFJXkEFpQrdG4gF/pslug/HMTFbG9Ks8SHfrnkZBqETjC5htDu+0aFXKhJllrYuPJUFY6gJBBrA2CuOAK7TMtfuQB7q58WLo4jkJ/8TM3EX47QPQfqHjLxSDIEXVkrcAHcjlO4fNIEzZe0iQSWIpqkuzLq99EEozK8Y0fDVUuuw2qxwcy1TZNyw2cfchP+KVscrNchGhfwc8hcQ+fiV5ml8ei0ADOsLYMOrbJxgkqlP63P6y40GWHmSCwkGXVnN/9JziMwI/izXzSFjfkLy8H2q3MwDEreclGmbEtibV7LocwzZLHnpHg1tLpS65A9v9E\\u003d\"\n" + + "}"; + + // language=JSON + private static final String versionJsonPlainText = "{\n" + + " \"source\": {\n" + + " \"currentTs\": \"2024-06-24 16:52:07\",\n" + + " \"sourceId\": \"versionUpdate\",\n" + + " \"sourcePos\": null,\n" + + " \"sourceType\": null\n" + + " },\n" + + " \"info\": {\n" + + " \"opTs\": \"2024-06-24 16:52:07\",\n" + + " \"opType\": \"version\",\n" + + " \"schema\": \"schema\",\n" + + " \"table\": \"table\",\n" + + " \"encode\": \"0\"\n" + + " },\n" + + " \"statement\": {\n" + + " \"version\": \"20240623\"\n" + + " }\n" + + "}"; + + // language=JSON + private static final String checkJsonPlainText = "{\n" + + " \"source\": {\n" + + " \"sourceId\": \"set_1351341338064\",\n" + + " \"sourceType\": null,\n" + + " \"sourcePos\": \"mysql-bin.008440:466807502\",\n" + + " \"currentTs\": \"2020-11-24 22:17:44\"\n" + + " },\n" + + " \"info\": {\n" + + " \"opType\": \"check\",\n" + + " \"schema\": \"schema1\",\n" + + " \"table\": \"table1\",\n" + + " \"opTs\": \"2024-06-24 16:52:07\",\n" + + " \"encode\": \"0\"\n" + + " },\n" + + " \"statement\": {\n" + + " \"checkNum\": \"\",\n" + + " \"topic\": \"topic1\",\n" + + " \"I\": \"2\",\n" + + " \"U\": \"3\",\n" + + " \"D\": \"4\",\n" + + " \"DDL\": \"0\"\n" + + " }\n" + + "}"; + + // language=JSON + private static final String ddlJsonPlainText = "{\n" + + " \"source\": {\n" + + " \"sourceId\": \"set_1021031048027_kafka\",\n" + + " \"sourceType\": null,\n" + + " \"sourcePos\": \"mysql-bin.017236:426272072\",\n" + + " \"currentTs\": \"2024-06-18 23:25:29\"\n" + + " },\n" + + " \"info\": {\n" + + " \"opTs\": \"2024-06-18 23:25:25\",\n" + + " \"opType\": \"ddl\",\n" + + " \"schema\": \"crm_cfguse\",\n" + + " \"table\": \"tar_grp\",\n" + + " \"encode\": \"0\"\n" + + " },\n" + + " \"statement\": {\n" + + " \"sql\": \"alter table crm_cfguse.tar_grp add column is_zx_spec varchar(10) NOT NULL DEFAULT '' COMMENT '是否包含直销客户特殊标签';\",\n" + + " \"op\": \"add_column\",\n" + + " \"tablePk\": null,\n" + + " \"tableComent\": null,\n" + + " \"column\": {\n" + + " \"before\": null,\n" + + " \"after\": {\n" + + " \"isNotNull\": \"true\",\n" + + " \"columnType\": \"string\",\n" + + " \"columnLength\": \"10\",\n" + + " \"comment\": \"'是否包含直销客户特殊标签'\",\n" + + " \"isPrimaryKey\": \"false\",\n" + + " \"columnName\": \"is_zx_spec\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + private static void timeRunnable(String name, Runnable runnable) { + long start = System.currentTimeMillis(); + runnable.run(); + System.out.println(StrUtil.format("{} \tcost time: {}s", name, (System.currentTimeMillis() - start) / 1000.0)); + } + + public static void main(String[] args) { + int times = 10_000_000; + timeRunnable("Plain", () -> { + for (int index = 0; index < times; index++) { + RecordHelper.parse(dmlJsonPlainText); + } + }); + timeRunnable("Encode", () -> { + for (int index = 0; index < times; index++) { + RecordHelper.parse(dmlJsonEncodeText); + } + }); + } + + private void testSource(JSONObject root) { + assertTrue(root.containsKey("sourceId")); + assertTrue(root.containsKey("sourcePos")); + assertTrue(root.containsKey("sourceType")); + assertTrue(root.containsKey("currentTs")); + } + + private void testInfo(JSONObject root) { + assertTrue(root.containsKey("opTs")); + assertTrue(root.containsKey("opType")); + assertTrue(root.containsKey("schema")); + assertTrue(root.containsKey("table")); + assertTrue(root.containsKey("encode")); + } + + private void testDmlStatement(JSONObject root) { + assertTrue(root.containsKey("before")); + assertTrue(root.containsKey("after")); + } + + private void testVersionStatement(JSONObject root) { + assertTrue(root.containsKey("version")); + } + + private void testCheckStatement(JSONObject root) { + } + + private void testDdlStatement(JSONObject root) { + } + + @Test + public void testRecordDMLParse() { + assertDoesNotThrow(() -> {JSONUtil.parseObj(dmlJsonPlainText);}); + JSONObject root = JSONUtil.parseObj(dmlJsonPlainText); + assertTrue(root.containsKey("source")); + testSource(root.getJSONObject("source")); + assertTrue(root.containsKey("info")); + testInfo(root.getJSONObject("info")); + assertTrue(root.containsKey("statement")); + testDmlStatement(root.getJSONObject("statement")); + + Record record = RecordHelper.parse(dmlJsonPlainText); + assertNotNull(record); + assertNotNull(record.getSource()); + assertNotNull(record.getInfo()); + assertNotNull(record.getStatement()); + assertNotNull(record.getStatement().getBefore()); + assertNotNull(record.getStatement().getAfter()); + } + + @Test + public void testRecordDMLEncodeParse() { + assertDoesNotThrow(() -> {JSONUtil.parseObj(dmlJsonEncodeText);}); + JSONObject root = JSONUtil.parseObj(dmlJsonEncodeText); + assertTrue(root.containsKey("source")); + testSource(root.getJSONObject("source")); + assertTrue(root.containsKey("info")); + testInfo(root.getJSONObject("info")); + assertTrue(root.containsKey("statement")); + String statementText = root.getStr("statement"); + assertTrue(StrUtil.isNotBlank(statementText)); + assertEquals("{\"before\":{\"REGION_ID\":\"12123174\",\"CUST_ID\":\"0\",\"AMOUNT_FIX\":\"10456\",\"ONE_ACCT_ITEM_ID\":\"0\",\"BILL_ID\":\"0\",\"PARTY_ROLE_ID\":\"100\"},\"after\":{\"REGION_ID\":\"12123174\",\"CUST_ID\":\"0\",\"AMOUNT_FIX\":\"10456\",\"ONE_ACCT_ITEM_ID\":\"0\",\"BILL_ID\":\"0\",\"PARTY_ROLE_ID\":\"100\"}}", SecureHelper.decryptStatement(statementText)); + testDmlStatement(JSONUtil.parseObj(SecureHelper.decryptStatement(statementText))); + + Record record = RecordHelper.parse(dmlJsonEncodeText); + assertNotNull(record); + assertNotNull(record.getSource()); + assertNotNull(record.getInfo()); + assertNotNull(record.getStatement()); + assertNotNull(record.getStatement().getBefore()); + assertNotNull(record.getStatement().getAfter()); + } + + @Test + public void textRecordVersionParse() { + assertDoesNotThrow(() -> {JSONUtil.parseObj(versionJsonPlainText);}); + JSONObject root = JSONUtil.parseObj(versionJsonPlainText); + assertTrue(root.containsKey("source")); + testSource(root.getJSONObject("source")); + assertTrue(root.containsKey("info")); + testInfo(root.getJSONObject("info")); + assertTrue(root.containsKey("statement")); + testVersionStatement(root.getJSONObject("statement")); + + Record record = RecordHelper.parse(versionJsonPlainText); + assertNotNull(record); + assertNotNull(record.getSource()); + assertNotNull(record.getInfo()); + assertNotNull(record.getStatement()); + assertNotNull(record.getStatement().getVersion()); + } + + @Test + public void textRecordCheckParse() { + assertDoesNotThrow(() -> {JSONUtil.parseObj(checkJsonPlainText);}); + JSONObject root = JSONUtil.parseObj(checkJsonPlainText); + assertTrue(root.containsKey("source")); + testSource(root.getJSONObject("source")); + assertTrue(root.containsKey("info")); + testInfo(root.getJSONObject("info")); + assertTrue(root.containsKey("statement")); + testCheckStatement(root.getJSONObject("statement")); + + Record record = RecordHelper.parse(checkJsonPlainText); + assertNotNull(record); + assertNotNull(record.getSource()); + assertNotNull(record.getInfo()); + assertNotNull(record.getStatement()); + } + + @Test + public void textRecordDDLParse() { + assertDoesNotThrow(() -> {JSONUtil.parseObj(ddlJsonPlainText);}); + JSONObject root = JSONUtil.parseObj(ddlJsonPlainText); + assertTrue(root.containsKey("source")); + testSource(root.getJSONObject("source")); + assertTrue(root.containsKey("info")); + testInfo(root.getJSONObject("info")); + assertTrue(root.containsKey("statement")); + testDdlStatement(root.getJSONObject("statement")); + + Record record = RecordHelper.parse(ddlJsonPlainText); + assertNotNull(record); + assertNotNull(record.getSource()); + assertNotNull(record.getInfo()); + assertNotNull(record.getStatement()); + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java index e6437c7..ac74f29 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java @@ -4,17 +4,15 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.Record; -import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import com.lanyuanxiaoyao.service.common.utils.RecordHelper; import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner; import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; -import com.lanyuanxiaoyao.service.executor.task.helper.JacksonHelper; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.regex.Pattern; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.collections.api.factory.Lists; /** @@ -23,7 +21,6 @@ import org.eclipse.collections.api.factory.Lists; public class PulsarMessage2Prisoner extends RichMapFunction { private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$"); - private final ObjectMapper mapper = JacksonHelper.getMapper(); private final long startTime; private final long endTime; @@ -41,18 +38,19 @@ public class PulsarMessage2Prisoner extends RichMapFunction fields; - if (StrUtil.equalsAny(statement.getOpType(), Constants.INSERT, Constants.UPDATE)) { + if (StrUtil.equalsAny(info.getOpType(), Constants.INSERT, Constants.UPDATE)) { fields = record.getStatement().getAfter(); - } else if (StrUtil.equals(statement.getOpType(), Constants.DELETE)) { + } else if (StrUtil.equals(info.getOpType(), Constants.DELETE)) { fields = record.getStatement().getBefore(); } else { return new Prisoner(value.getFile(), StrUtil.format("Invalid opType: {}", value.getData())); @@ -78,6 +76,6 @@ public class PulsarMessage2Prisoner extends RichMapFunction { @Override public boolean filter(Record record) { - String opType = record.getStatement().getOpType(); + String opType = record.getInfo().getOpType(); switch (opType) { case Constants.INSERT: insertCounter.inc(); @@ -68,6 +68,6 @@ public class OperationTypeFilter extends RichFilterFunction { default: unknownCounter.inc(); } - return !Constants.DDL.equals(record.getStatement().getOpType()); + return !Constants.DDL.equals(record.getInfo().getOpType()); } } diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java index 8881261..7897503 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java @@ -7,7 +7,6 @@ import com.lanyuanxiaoyao.service.common.entity.Record; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.utils.RecordHelper; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; -import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils; import com.lanyuanxiaoyao.service.sync.utils.StatusUtils; import java.time.LocalDateTime; import java.time.ZoneOffset; @@ -18,7 +17,6 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +38,6 @@ public class PulsarMessage2RecordFunction extends RichMapFunction> result = ListUtil.list(false); if (RecordHelper.isVersionUpdateRecord(record)) { + Record.Info info = record.getInfo(); Record.Statement statement = record.getStatement(); - LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(statement.getSchema()), statement.getVersion(), statement.getVersion()); + LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(info.getSchema()), statement.getVersion(), statement.getVersion()); LogHelper.info(logger, VERSION_UPDATE, "Raw: {}", mapper.writeValueAsString(record)); - StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), statement.getOpTs()); + StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), info.getOpTs()); return ListUtil.empty(); } diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java index 0e9e434..961709b 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java @@ -31,6 +31,10 @@ public class ValidateRecordFilter extends RichFilterFunction { logger.warn("Record Source is null"); return false; } + if (ObjectUtil.isNull(record.getInfo())) { + logger.warn("Record Info is null"); + return false; + } if (ObjectUtil.isNull(record.getStatement())) { logger.warn("Record Statement is null"); return false; diff --git a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java index 78a7235..53b84bf 100644 --- a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java +++ b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.sync; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.common.entity.Record; +import com.lanyuanxiaoyao.service.common.utils.RecordHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,8 +100,7 @@ public class MessageParseTest { " }\n" + " }\n" + "}"; - ObjectMapper mapper = new ObjectMapper(); - Record record = mapper.readValue(message, Record.class); + Record record = RecordHelper.parse(message); logger.info("Record: {}", record); } }