From b0c5d04476bbb370e0c3fe3be2d49f44d2a94d1b Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Mon, 29 Jul 2024 15:42:06 +0800 Subject: [PATCH] =?UTF-8?q?revert(common):=20=E5=9B=9E=E6=BB=9ARecord?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E7=BB=93=E6=9E=84=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 6c9f43d3109d8fed65af03dad7998f70b82fbc81. --- service-common/pom.xml | 6 - .../service/common/entity/Record.java | 131 +++----- .../service/common/utils/RecordHelper.java | 34 +- .../service/common/utils/SecureHelper.java | 17 - .../service/common/TestRecordParse.java | 301 ------------------ .../police/PulsarMessage2Prisoner.java | 20 +- .../sync/functions/OperationTypeFilter.java | 4 +- .../PulsarMessage2RecordFunction.java | 9 +- .../functions/Record2RowDataFunction.java | 5 +- .../sync/functions/ValidateRecordFilter.java | 4 - .../service/sync/MessageParseTest.java | 4 +- 11 files changed, 80 insertions(+), 455 deletions(-) delete mode 100644 service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java delete mode 100644 service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java diff --git a/service-common/pom.xml b/service-common/pom.xml index 5867723..032b1e8 100644 --- a/service-common/pom.xml +++ b/service-common/pom.xml @@ -23,12 +23,6 @@ io.github.dragons96 sql-builder - - org.junit.jupiter - junit-jupiter - 5.7.2 - test - diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java index 97627bb..c73ea5e 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/Record.java @@ -12,7 +12,6 @@ import java.util.Map; */ public class Record implements Serializable { private Source source; - private Info info; private Statement statement; public Record() { @@ -26,14 +25,6 @@ public class Record implements Serializable { this.source = source; } - public Info getInfo() { - return info; - } - - public void setInfo(Info info) { - this.info = info; - } - public Statement getStatement() { return statement; } @@ -45,16 +36,15 @@ public class Record implements Serializable { @Override public String toString() { return "Record{" + - "source=" + source + - ", info=" + info + - ", statement=" + statement + - '}'; + "source=" + source + + ", statement=" + statement + + '}'; } public static class Source implements Serializable { private String sourceId; - private String sourcePos; private String sourceType; + private String sourcePos; private String currentTs; public Source() { @@ -68,14 +58,6 @@ public class Record implements Serializable { this.sourceId = sourceId; } - public String getSourcePos() { - return sourcePos; - } - - public void setSourcePos(String sourcePos) { - this.sourcePos = sourcePos; - } - public String getSourceType() { return sourceType; } @@ -84,6 +66,14 @@ public class Record implements Serializable { this.sourceType = sourceType; } + public String getSourcePos() { + return sourcePos; + } + + public void setSourcePos(String sourcePos) { + this.sourcePos = sourcePos; + } + public String getCurrentTs() { return currentTs; } @@ -95,33 +85,27 @@ public class Record implements Serializable { @Override public String toString() { return "Source{" + - "sourceId='" + sourceId + '\'' + - ", sourcePos='" + sourcePos + '\'' + - ", sourceType='" + sourceType + '\'' + - ", currentTs='" + currentTs + '\'' + - '}'; + "sourceId='" + sourceId + '\'' + + ", sourceType='" + sourceType + '\'' + + ", sourcePos='" + sourcePos + '\'' + + ", currentTs='" + currentTs + '\'' + + '}'; } } - public static class Info implements Serializable { - /** - * 无加密 - */ - public static final String ENCODE_ENCRYPT_NONE = "0"; - /** - * 加密 - */ - public static final String ENCODE_ENCRYPT_STATEMENT = "1"; - + public static class Statement implements Serializable { private String schema; private String table; + private String opStatement; private String opType; + private String op; private String opTs; - /** - * 0: 无加密 {@link Info#ENCODE_ENCRYPT_NONE} - * 1: 加密 {@link Info#ENCODE_ENCRYPT_STATEMENT} - */ - private String encode; + private String version; + private Map before; + private Map after; + + public Statement() { + } public String getSchema() { return schema; @@ -139,6 +123,14 @@ public class Record implements Serializable { this.table = table; } + public String getOpStatement() { + return opStatement; + } + + public void setOpStatement(String opStatement) { + this.opStatement = opStatement; + } + public String getOpType() { return opType; } @@ -147,6 +139,14 @@ public class Record implements Serializable { this.opType = opType; } + public String getOp() { + return op; + } + + public void setOp(String op) { + this.op = op; + } + public String getOpTs() { return opTs; } @@ -155,39 +155,6 @@ public class Record implements Serializable { this.opTs = opTs; } - public String getEncode() { - return encode; - } - - public void setEncode(String encode) { - this.encode = encode; - } - - @Override - public String toString() { - return "Info{" + - "schema='" + schema + '\'' + - ", table='" + table + '\'' + - ", opType='" + opType + '\'' + - ", opTs='" + opTs + '\'' + - ", encode='" + encode + '\'' + - '}'; - } - } - - public static class Statement implements Serializable { - // Version - - private String version; - - // DML - - private Map before; - private Map after; - - public Statement() { - } - public String getVersion() { return version; } @@ -215,10 +182,16 @@ public class Record implements Serializable { @Override public String toString() { return "Statement{" + - "version='" + version + '\'' + - ", before=" + before + - ", after=" + after + - '}'; + "schema='" + schema + '\'' + + ", table='" + table + '\'' + + ", opStatement='" + opStatement + '\'' + + ", opType='" + opType + '\'' + + ", op='" + op + '\'' + + ", opTs='" + opTs + '\'' + + ", version='" + version + '\'' + + ", before=" + before + + ", after=" + after + + '}'; } } } diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java index 5a29dc8..53b9be3 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/RecordHelper.java @@ -1,16 +1,9 @@ package com.lanyuanxiaoyao.service.common.utils; -import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.Record; import com.lanyuanxiaoyao.service.common.entity.TableMeta; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -35,8 +28,9 @@ public class RecordHelper { } public static Boolean isVersionUpdateRecord(Record record) { + // Record{source=Source{sourceId='versionUpdate', sourceType='null', sourcePos='null', currentTs='2022-11-15 22:17:44'}, statement=Statement{schema='crm_ivpn_cust', table='customer', opStatement='null', opType='version', op='null', opTs='2022-11-15 00:17:43', version='20220925', before=null, after=null}} return Constants.VERSION_UPDATE_KEY.equals(record.getSource().getSourceId()) - && Constants.VERSION_KEY.equals(record.getInfo().getOpType()); + && Constants.VERSION_KEY.equals(record.getStatement().getOpType()); } /** @@ -79,7 +73,7 @@ public class RecordHelper { } public static Map addExtraMetadata(Map current, TableMeta tableMeta, Record record) { - String operationType = record.getInfo().getOpType(); + String operationType = record.getStatement().getOpType(); return addExtraMetadata(current, tableMeta, record, Constants.DELETE.equals(operationType)); } @@ -87,7 +81,7 @@ public class RecordHelper { Map newMap = new HashMap<>(current); newMap.put(Constants.UNION_KEY_NAME, RecordHelper.createUnionKey(tableMeta, current)); newMap.put(Constants.UPDATE_TIMESTAMP_KEY_NAME, SnowFlakeHelper.next()); - newMap.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, record.getInfo().getOpTs()); + newMap.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, record.getStatement().getOpTs()); newMap.put(Constants.HUDI_DELETE_KEY_NAME, isDelete); return newMap; } @@ -130,22 +124,4 @@ public class RecordHelper { return primaryKey + "_" + partitionKey; } } - - public static Record parse(String json) { - JSONObject root = JSONUtil.parseObj(json); - if (!root.containsKey("info")) { - throw new RuntimeException(StrUtil.format("Info statement not found ({})", json)); - } - JSONObject info = root.getJSONObject("info"); - if (!info.containsKey("encode")) { - throw new RuntimeException(StrUtil.format("encode not found ({})", info.toString())); - } - String encode = info.getStr("encode"); - if (StrUtil.equals(encode, Record.Info.ENCODE_ENCRYPT_STATEMENT)) { - String statementText = SecureHelper.decryptStatement(root.getStr("statement")); - JSONObject statement = JSONUtil.parseObj(statementText); - root.putOpt("statement", statement); - } - return root.toBean(Record.class); - } } diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java deleted file mode 100644 index da7a431..0000000 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/SecureHelper.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.lanyuanxiaoyao.service.common.utils; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.crypto.Mode; -import cn.hutool.crypto.Padding; -import cn.hutool.crypto.symmetric.AES; - -/** - * @author lanyuanxiaoyao - */ -public class SecureHelper { - private static final AES aes = new AES(Mode.ECB, Padding.PKCS5Padding, StrUtil.bytes("6fa22c779ec14b98")); - - public static String decryptStatement(String text) { - return aes.decryptStr(text); - } -} diff --git a/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java b/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java deleted file mode 100644 index 2fa053e..0000000 --- a/service-common/src/test/java/com/lanyuanxiaoyao/service/common/TestRecordParse.java +++ /dev/null @@ -1,301 +0,0 @@ -package com.lanyuanxiaoyao.service.common; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; -import com.lanyuanxiaoyao.service.common.entity.Record; -import com.lanyuanxiaoyao.service.common.utils.RecordHelper; -import com.lanyuanxiaoyao.service.common.utils.SecureHelper; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * Record解析 - * - * @author lanyuanxiaoyao - */ -public class TestRecordParse { - // language=JSON - private static final String dmlJsonPlainText = "{\n" + - " \"source\": {\n" + - " \"sourceId\": \"set_7778798043_kafka\",\n" + - " \"sourcePos\": \"mysql-bin.014567:1015413544\",\n" + - " \"sourceType\": null,\n" + - " \"currentTs\": \"2024-06-24 00:00:29\"\n" + - " },\n" + - " \"info\": {\n" + - " \"opTs\": \"2024-06-24 00:00:29\",\n" + - " \"opType\": \"U\",\n" + - " \"schema\": \"acct_dg\",\n" + - " \"table\": \"acct_item\",\n" + - " \"encode\": \"0\"\n" + - " },\n" + - " \"statement\": {\n" + - " \"before\": {\n" + - " \"REGION_ID\": \"12123174\",\n" + - " \"CUST_ID\": \"0\",\n" + - " \"AMOUNT_FIX\": \"10456\",\n" + - " \"ONE_ACCT_ITEM_ID\": \"0\",\n" + - " \"BILL_ID\": \"0\",\n" + - " \"PARTY_ROLE_ID\": \"100\"\n" + - " },\n" + - " \"after\": {\n" + - " \"REGION_ID\": \"12123174\",\n" + - " \"CUST_ID\": \"0\",\n" + - " \"AMOUNT_FIX\": \"10456\",\n" + - " \"ONE_ACCT_ITEM_ID\": \"0\",\n" + - " \"BILL_ID\": \"0\",\n" + - " \"PARTY_ROLE_ID\": \"100\"\n" + - " }\n" + - " }\n" + - "}"; - - // language=JSON - private static final String dmlJsonEncodeText = "{\n" + - " \"source\": {\n" + - " \"sourceId\": \"set_7778798043_kafka\",\n" + - " \"sourcePos\": \"mysql-bin.014567:1015413544\",\n" + - " \"sourceType\": null,\n" + - " \"currentTs\": \"2024-06-24 00:00:29\"\n" + - " },\n" + - " \"info\": {\n" + - " \"opTs\": \"2024-06-24 00:00:29\",\n" + - " \"opType\": \"U\",\n" + - " \"schema\": \"acct_dg\",\n" + - " \"table\": \"acct_item\",\n" + - " \"encode\": \"1\"\n" + - " },\n" + - " \"statement\": \"I1bXXe4yjPK3fbfcMfYAdmcpLWGeewvGTxPf8MbVcQDFJXkEFpQrdG4gF/pslug/HMTFbG9Ks8SHfrnkZBqETjC5htDu+0aFXKhJllrYuPJUFY6gJBBrA2CuOAK7TMtfuQB7q58WLo4jkJ/8TM3EX47QPQfqHjLxSDIEXVkrcAHcjlO4fNIEzZe0iQSWIpqkuzLq99EEozK8Y0fDVUuuw2qxwcy1TZNyw2cfchP+KVscrNchGhfwc8hcQ+fiV5ml8ei0ADOsLYMOrbJxgkqlP63P6y40GWHmSCwkGXVnN/9JziMwI/izXzSFjfkLy8H2q3MwDEreclGmbEtibV7LocwzZLHnpHg1tLpS65A9v9E\\u003d\"\n" + - "}"; - - // language=JSON - private static final String versionJsonPlainText = "{\n" + - " \"source\": {\n" + - " \"currentTs\": \"2024-06-24 16:52:07\",\n" + - " \"sourceId\": \"versionUpdate\",\n" + - " \"sourcePos\": null,\n" + - " \"sourceType\": null\n" + - " },\n" + - " \"info\": {\n" + - " \"opTs\": \"2024-06-24 16:52:07\",\n" + - " \"opType\": \"version\",\n" + - " \"schema\": \"schema\",\n" + - " \"table\": \"table\",\n" + - " \"encode\": \"0\"\n" + - " },\n" + - " \"statement\": {\n" + - " \"version\": \"20240623\"\n" + - " }\n" + - "}"; - - // language=JSON - private static final String checkJsonPlainText = "{\n" + - " \"source\": {\n" + - " \"sourceId\": \"set_1351341338064\",\n" + - " \"sourceType\": null,\n" + - " \"sourcePos\": \"mysql-bin.008440:466807502\",\n" + - " \"currentTs\": \"2020-11-24 22:17:44\"\n" + - " },\n" + - " \"info\": {\n" + - " \"opType\": \"check\",\n" + - " \"schema\": \"schema1\",\n" + - " \"table\": \"table1\",\n" + - " \"opTs\": \"2024-06-24 16:52:07\",\n" + - " \"encode\": \"0\"\n" + - " },\n" + - " \"statement\": {\n" + - " \"checkNum\": \"\",\n" + - " \"topic\": \"topic1\",\n" + - " \"I\": \"2\",\n" + - " \"U\": \"3\",\n" + - " \"D\": \"4\",\n" + - " \"DDL\": \"0\"\n" + - " }\n" + - "}"; - - // language=JSON - private static final String ddlJsonPlainText = "{\n" + - " \"source\": {\n" + - " \"sourceId\": \"set_1021031048027_kafka\",\n" + - " \"sourceType\": null,\n" + - " \"sourcePos\": \"mysql-bin.017236:426272072\",\n" + - " \"currentTs\": \"2024-06-18 23:25:29\"\n" + - " },\n" + - " \"info\": {\n" + - " \"opTs\": \"2024-06-18 23:25:25\",\n" + - " \"opType\": \"ddl\",\n" + - " \"schema\": \"crm_cfguse\",\n" + - " \"table\": \"tar_grp\",\n" + - " \"encode\": \"0\"\n" + - " },\n" + - " \"statement\": {\n" + - " \"sql\": \"alter table crm_cfguse.tar_grp add column is_zx_spec varchar(10) NOT NULL DEFAULT '' COMMENT '是否包含直销客户特殊标签';\",\n" + - " \"op\": \"add_column\",\n" + - " \"tablePk\": null,\n" + - " \"tableComent\": null,\n" + - " \"column\": {\n" + - " \"before\": null,\n" + - " \"after\": {\n" + - " \"isNotNull\": \"true\",\n" + - " \"columnType\": \"string\",\n" + - " \"columnLength\": \"10\",\n" + - " \"comment\": \"'是否包含直销客户特殊标签'\",\n" + - " \"isPrimaryKey\": \"false\",\n" + - " \"columnName\": \"is_zx_spec\"\n" + - " }\n" + - " }\n" + - " }\n" + - "}"; - - private static void timeRunnable(String name, Runnable runnable) { - long start = System.currentTimeMillis(); - runnable.run(); - System.out.println(StrUtil.format("{} \tcost time: {}s", name, (System.currentTimeMillis() - start) / 1000.0)); - } - - public static void main(String[] args) { - int times = 10_000_000; - timeRunnable("Plain", () -> { - for (int index = 0; index < times; index++) { - RecordHelper.parse(dmlJsonPlainText); - } - }); - timeRunnable("Encode", () -> { - for (int index = 0; index < times; index++) { - RecordHelper.parse(dmlJsonEncodeText); - } - }); - } - - private void testSource(JSONObject root) { - assertTrue(root.containsKey("sourceId")); - assertTrue(root.containsKey("sourcePos")); - assertTrue(root.containsKey("sourceType")); - assertTrue(root.containsKey("currentTs")); - } - - private void testInfo(JSONObject root) { - assertTrue(root.containsKey("opTs")); - assertTrue(root.containsKey("opType")); - assertTrue(root.containsKey("schema")); - assertTrue(root.containsKey("table")); - assertTrue(root.containsKey("encode")); - } - - private void testDmlStatement(JSONObject root) { - assertTrue(root.containsKey("before")); - assertTrue(root.containsKey("after")); - } - - private void testVersionStatement(JSONObject root) { - assertTrue(root.containsKey("version")); - } - - private void testCheckStatement(JSONObject root) { - } - - private void testDdlStatement(JSONObject root) { - } - - @Test - public void testRecordDMLParse() { - assertDoesNotThrow(() -> {JSONUtil.parseObj(dmlJsonPlainText);}); - JSONObject root = JSONUtil.parseObj(dmlJsonPlainText); - assertTrue(root.containsKey("source")); - testSource(root.getJSONObject("source")); - assertTrue(root.containsKey("info")); - testInfo(root.getJSONObject("info")); - assertTrue(root.containsKey("statement")); - testDmlStatement(root.getJSONObject("statement")); - - Record record = RecordHelper.parse(dmlJsonPlainText); - assertNotNull(record); - assertNotNull(record.getSource()); - assertNotNull(record.getInfo()); - assertNotNull(record.getStatement()); - assertNotNull(record.getStatement().getBefore()); - assertNotNull(record.getStatement().getAfter()); - } - - @Test - public void testRecordDMLEncodeParse() { - assertDoesNotThrow(() -> {JSONUtil.parseObj(dmlJsonEncodeText);}); - JSONObject root = JSONUtil.parseObj(dmlJsonEncodeText); - assertTrue(root.containsKey("source")); - testSource(root.getJSONObject("source")); - assertTrue(root.containsKey("info")); - testInfo(root.getJSONObject("info")); - assertTrue(root.containsKey("statement")); - String statementText = root.getStr("statement"); - assertTrue(StrUtil.isNotBlank(statementText)); - assertEquals("{\"before\":{\"REGION_ID\":\"12123174\",\"CUST_ID\":\"0\",\"AMOUNT_FIX\":\"10456\",\"ONE_ACCT_ITEM_ID\":\"0\",\"BILL_ID\":\"0\",\"PARTY_ROLE_ID\":\"100\"},\"after\":{\"REGION_ID\":\"12123174\",\"CUST_ID\":\"0\",\"AMOUNT_FIX\":\"10456\",\"ONE_ACCT_ITEM_ID\":\"0\",\"BILL_ID\":\"0\",\"PARTY_ROLE_ID\":\"100\"}}", SecureHelper.decryptStatement(statementText)); - testDmlStatement(JSONUtil.parseObj(SecureHelper.decryptStatement(statementText))); - - Record record = RecordHelper.parse(dmlJsonEncodeText); - assertNotNull(record); - assertNotNull(record.getSource()); - assertNotNull(record.getInfo()); - assertNotNull(record.getStatement()); - assertNotNull(record.getStatement().getBefore()); - assertNotNull(record.getStatement().getAfter()); - } - - @Test - public void textRecordVersionParse() { - assertDoesNotThrow(() -> {JSONUtil.parseObj(versionJsonPlainText);}); - JSONObject root = JSONUtil.parseObj(versionJsonPlainText); - assertTrue(root.containsKey("source")); - testSource(root.getJSONObject("source")); - assertTrue(root.containsKey("info")); - testInfo(root.getJSONObject("info")); - assertTrue(root.containsKey("statement")); - testVersionStatement(root.getJSONObject("statement")); - - Record record = RecordHelper.parse(versionJsonPlainText); - assertNotNull(record); - assertNotNull(record.getSource()); - assertNotNull(record.getInfo()); - assertNotNull(record.getStatement()); - assertNotNull(record.getStatement().getVersion()); - } - - @Test - public void textRecordCheckParse() { - assertDoesNotThrow(() -> {JSONUtil.parseObj(checkJsonPlainText);}); - JSONObject root = JSONUtil.parseObj(checkJsonPlainText); - assertTrue(root.containsKey("source")); - testSource(root.getJSONObject("source")); - assertTrue(root.containsKey("info")); - testInfo(root.getJSONObject("info")); - assertTrue(root.containsKey("statement")); - testCheckStatement(root.getJSONObject("statement")); - - Record record = RecordHelper.parse(checkJsonPlainText); - assertNotNull(record); - assertNotNull(record.getSource()); - assertNotNull(record.getInfo()); - assertNotNull(record.getStatement()); - } - - @Test - public void textRecordDDLParse() { - assertDoesNotThrow(() -> {JSONUtil.parseObj(ddlJsonPlainText);}); - JSONObject root = JSONUtil.parseObj(ddlJsonPlainText); - assertTrue(root.containsKey("source")); - testSource(root.getJSONObject("source")); - assertTrue(root.containsKey("info")); - testInfo(root.getJSONObject("info")); - assertTrue(root.containsKey("statement")); - testDdlStatement(root.getJSONObject("statement")); - - Record record = RecordHelper.parse(ddlJsonPlainText); - assertNotNull(record); - assertNotNull(record.getSource()); - assertNotNull(record.getInfo()); - assertNotNull(record.getStatement()); - } -} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java index ac74f29..e6437c7 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java @@ -4,15 +4,17 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.Record; -import com.lanyuanxiaoyao.service.common.utils.RecordHelper; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner; import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; +import com.lanyuanxiaoyao.service.executor.task.helper.JacksonHelper; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.regex.Pattern; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.collections.api.factory.Lists; /** @@ -21,6 +23,7 @@ import org.eclipse.collections.api.factory.Lists; public class PulsarMessage2Prisoner extends RichMapFunction { private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$"); + private final ObjectMapper mapper = JacksonHelper.getMapper(); private final long startTime; private final long endTime; @@ -38,19 +41,18 @@ public class PulsarMessage2Prisoner extends RichMapFunction fields; - if (StrUtil.equalsAny(info.getOpType(), Constants.INSERT, Constants.UPDATE)) { + if (StrUtil.equalsAny(statement.getOpType(), Constants.INSERT, Constants.UPDATE)) { fields = record.getStatement().getAfter(); - } else if (StrUtil.equals(info.getOpType(), Constants.DELETE)) { + } else if (StrUtil.equals(statement.getOpType(), Constants.DELETE)) { fields = record.getStatement().getBefore(); } else { return new Prisoner(value.getFile(), StrUtil.format("Invalid opType: {}", value.getData())); @@ -76,6 +78,6 @@ public class PulsarMessage2Prisoner extends RichMapFunction { @Override public boolean filter(Record record) { - String opType = record.getInfo().getOpType(); + String opType = record.getStatement().getOpType(); switch (opType) { case Constants.INSERT: insertCounter.inc(); @@ -68,6 +68,6 @@ public class OperationTypeFilter extends RichFilterFunction { default: unknownCounter.inc(); } - return !Constants.DDL.equals(record.getInfo().getOpType()); + return !Constants.DDL.equals(record.getStatement().getOpType()); } } diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java index 7897503..8881261 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java @@ -7,6 +7,7 @@ import com.lanyuanxiaoyao.service.common.entity.Record; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.utils.RecordHelper; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; +import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils; import com.lanyuanxiaoyao.service.sync.utils.StatusUtils; import java.time.LocalDateTime; import java.time.ZoneOffset; @@ -17,6 +18,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,7 @@ public class PulsarMessage2RecordFunction extends RichMapFunction> result = ListUtil.list(false); if (RecordHelper.isVersionUpdateRecord(record)) { - Record.Info info = record.getInfo(); Record.Statement statement = record.getStatement(); - LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(info.getSchema()), statement.getVersion(), statement.getVersion()); + LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(statement.getSchema()), statement.getVersion(), statement.getVersion()); LogHelper.info(logger, VERSION_UPDATE, "Raw: {}", mapper.writeValueAsString(record)); - StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), info.getOpTs()); + StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), statement.getOpTs()); return ListUtil.empty(); } diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java index 961709b..0e9e434 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/ValidateRecordFilter.java @@ -31,10 +31,6 @@ public class ValidateRecordFilter extends RichFilterFunction { logger.warn("Record Source is null"); return false; } - if (ObjectUtil.isNull(record.getInfo())) { - logger.warn("Record Info is null"); - return false; - } if (ObjectUtil.isNull(record.getStatement())) { logger.warn("Record Statement is null"); return false; diff --git a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java index 53b84bf..78a7235 100644 --- a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java +++ b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java @@ -3,7 +3,6 @@ package com.lanyuanxiaoyao.service.sync; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.common.entity.Record; -import com.lanyuanxiaoyao.service.common.utils.RecordHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +99,8 @@ public class MessageParseTest { " }\n" + " }\n" + "}"; - Record record = RecordHelper.parse(message); + ObjectMapper mapper = new ObjectMapper(); + Record record = mapper.readValue(message, Record.class); logger.info("Record: {}", record); } }