From 63b81155b822f91f85457ed0f5d416238ca35d5c Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Tue, 30 Jul 2024 19:06:01 +0800 Subject: [PATCH] =?UTF-8?q?fix(sync):=20=E4=BF=AE=E5=A4=8D=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E5=90=8D=E7=A7=B0=E9=87=8D=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lanyuanxiaoyao/service/common/Constants.java | 5 +++++ .../service/sync/functions/OperationTypeFilter.java | 13 +++++-------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index eddc1b9..34e7aa8 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -101,6 +101,11 @@ public interface Constants { String METRICS_SYNC_SOURCE_MESSAGE_RECEIVE = METRICS_SYNC_PREFIX + "_source_message_receive"; String METRICS_SYNC_SOURCE_MESSAGE_SIZE_RECEIVE_BYTES = METRICS_SYNC_PREFIX + "_source_message_receive_bytes"; String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE = METRICS_SYNC_PREFIX + "_source_operation_type_receive"; + String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_INSERT = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_insert"; + String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UPDATE = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_update"; + String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DELETE = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_delete"; + String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DDL = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_ddl"; + String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UNKNOWN = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_unknown"; String METRICS_SYNC_SOURCE_CHANGE_FILTER = METRICS_SYNC_PREFIX + "_source_change_filter"; String METRICS_SYNC_SOURCE_CHANGE_PARTITION = METRICS_SYNC_PREFIX + "_source_change_partition"; String METRICS_SYNC_SOURCE_BACK_LOGS = METRICS_SYNC_PREFIX + "_source_back_logs"; diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java index b6fcea3..32bc89f 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java @@ -38,15 +38,12 @@ public class OperationTypeFilter extends RichFilterFunction { public void open(Configuration parameters) throws Exception { super.open(parameters); - Function> fillTags = operator -> MapUtil.builder() - .put(Constants.METRICS_LABEL_TYPE, operator) - .build(); // 初始化指标 - insertCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.INSERT)); - updateCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.UPDATE)); - deleteCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.DELETE)); - ddlCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.DDL)); - unknownCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.UNKNOWN)); + insertCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_INSERT); + updateCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UPDATE); + deleteCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DELETE); + ddlCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DDL); + unknownCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UNKNOWN); } @Override