feat(common): Record消息结构更新
新增info字段用于存一些公共字段,statement加入消息加密能力
This commit is contained in:
@@ -51,7 +51,7 @@ public class OperationTypeFilter extends RichFilterFunction<Record> {
|
||||
|
||||
@Override
|
||||
public boolean filter(Record record) {
|
||||
String opType = record.getStatement().getOpType();
|
||||
String opType = record.getInfo().getOpType();
|
||||
switch (opType) {
|
||||
case Constants.INSERT:
|
||||
insertCounter.inc();
|
||||
@@ -68,6 +68,6 @@ public class OperationTypeFilter extends RichFilterFunction<Record> {
|
||||
default:
|
||||
unknownCounter.inc();
|
||||
}
|
||||
return !Constants.DDL.equals(record.getStatement().getOpType());
|
||||
return !Constants.DDL.equals(record.getInfo().getOpType());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import com.lanyuanxiaoyao.service.common.entity.Record;
|
||||
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
|
||||
import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
|
||||
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
|
||||
import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils;
|
||||
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
@@ -18,7 +17,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
|
||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||
import org.apache.flink.runtime.state.FunctionSnapshotContext;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -40,7 +38,6 @@ public class PulsarMessage2RecordFunction extends RichMapFunction<String, Record
|
||||
private final GlobalConfiguration globalConfiguration;
|
||||
private final FlinkJob flinkJob;
|
||||
private final TableMeta tableMeta;
|
||||
private final ObjectMapper mapper = JacksonUtils.getMapper();
|
||||
|
||||
public PulsarMessage2RecordFunction(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) {
|
||||
this.globalConfiguration = globalConfiguration;
|
||||
@@ -52,12 +49,12 @@ public class PulsarMessage2RecordFunction extends RichMapFunction<String, Record
|
||||
public Record map(String message) throws JsonProcessingException {
|
||||
Record record = null;
|
||||
try {
|
||||
record = mapper.readValue(message, Record.class);
|
||||
record = RecordHelper.parse(message);
|
||||
if (RecordHelper.isNotVersionUpdateRecord(record)) {
|
||||
latestOperationTime.set(record.getStatement().getOpTs());
|
||||
latestOperationTime.set(record.getInfo().getOpTs());
|
||||
}
|
||||
} catch (Exception exception) {
|
||||
logger.error("Message json parse failure", exception);
|
||||
logger.error(StrUtil.format("Message json parse failure: {}", message), exception);
|
||||
}
|
||||
return record;
|
||||
}
|
||||
|
||||
@@ -84,10 +84,11 @@ public class Record2RowDataFunction extends RichMapFunction<Record, List<RowData
|
||||
List<Map<String, Object>> result = ListUtil.list(false);
|
||||
|
||||
if (RecordHelper.isVersionUpdateRecord(record)) {
|
||||
Record.Info info = record.getInfo();
|
||||
Record.Statement statement = record.getStatement();
|
||||
LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(statement.getSchema()), statement.getVersion(), statement.getVersion());
|
||||
LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(info.getSchema()), statement.getVersion(), statement.getVersion());
|
||||
LogHelper.info(logger, VERSION_UPDATE, "Raw: {}", mapper.writeValueAsString(record));
|
||||
StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), statement.getOpTs());
|
||||
StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), info.getOpTs());
|
||||
return ListUtil.empty();
|
||||
}
|
||||
|
||||
|
||||
@@ -31,6 +31,10 @@ public class ValidateRecordFilter extends RichFilterFunction<Record> {
|
||||
logger.warn("Record Source is null");
|
||||
return false;
|
||||
}
|
||||
if (ObjectUtil.isNull(record.getInfo())) {
|
||||
logger.warn("Record Info is null");
|
||||
return false;
|
||||
}
|
||||
if (ObjectUtil.isNull(record.getStatement())) {
|
||||
logger.warn("Record Statement is null");
|
||||
return false;
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.sync;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.lanyuanxiaoyao.service.common.entity.Record;
|
||||
import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -99,8 +100,7 @@ public class MessageParseTest {
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
"}";
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
Record record = mapper.readValue(message, Record.class);
|
||||
Record record = RecordHelper.parse(message);
|
||||
logger.info("Record: {}", record);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user