diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index eddc1b9..34e7aa8 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -101,6 +101,11 @@ public interface Constants { String METRICS_SYNC_SOURCE_MESSAGE_RECEIVE = METRICS_SYNC_PREFIX + "_source_message_receive"; String METRICS_SYNC_SOURCE_MESSAGE_SIZE_RECEIVE_BYTES = METRICS_SYNC_PREFIX + "_source_message_receive_bytes"; String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE = METRICS_SYNC_PREFIX + "_source_operation_type_receive"; + String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_INSERT = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_insert"; + String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UPDATE = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_update"; + String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DELETE = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_delete"; + String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DDL = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_ddl"; + String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UNKNOWN = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_unknown"; String METRICS_SYNC_SOURCE_CHANGE_FILTER = METRICS_SYNC_PREFIX + "_source_change_filter"; String METRICS_SYNC_SOURCE_CHANGE_PARTITION = METRICS_SYNC_PREFIX + "_source_change_partition"; String METRICS_SYNC_SOURCE_BACK_LOGS = METRICS_SYNC_PREFIX + "_source_back_logs"; diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java index b6fcea3..32bc89f 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java @@ -38,15 +38,12 @@ public class OperationTypeFilter extends RichFilterFunction { public void open(Configuration parameters) throws Exception { super.open(parameters); - Function> fillTags = operator -> MapUtil.builder() - .put(Constants.METRICS_LABEL_TYPE, operator) - .build(); // 初始化指标 - insertCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.INSERT)); - updateCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.UPDATE)); - deleteCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.DELETE)); - ddlCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.DDL)); - unknownCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.UNKNOWN)); + insertCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_INSERT); + updateCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UPDATE); + deleteCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DELETE); + ddlCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DDL); + unknownCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UNKNOWN); } @Override