fix(sync): 修复指标名称重复
This commit is contained in:
@@ -101,6 +101,11 @@ public interface Constants {
|
|||||||
String METRICS_SYNC_SOURCE_MESSAGE_RECEIVE = METRICS_SYNC_PREFIX + "_source_message_receive";
|
String METRICS_SYNC_SOURCE_MESSAGE_RECEIVE = METRICS_SYNC_PREFIX + "_source_message_receive";
|
||||||
String METRICS_SYNC_SOURCE_MESSAGE_SIZE_RECEIVE_BYTES = METRICS_SYNC_PREFIX + "_source_message_receive_bytes";
|
String METRICS_SYNC_SOURCE_MESSAGE_SIZE_RECEIVE_BYTES = METRICS_SYNC_PREFIX + "_source_message_receive_bytes";
|
||||||
String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE = METRICS_SYNC_PREFIX + "_source_operation_type_receive";
|
String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE = METRICS_SYNC_PREFIX + "_source_operation_type_receive";
|
||||||
|
String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_INSERT = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_insert";
|
||||||
|
String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UPDATE = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_update";
|
||||||
|
String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DELETE = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_delete";
|
||||||
|
String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DDL = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_ddl";
|
||||||
|
String METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UNKNOWN = METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE + "_unknown";
|
||||||
String METRICS_SYNC_SOURCE_CHANGE_FILTER = METRICS_SYNC_PREFIX + "_source_change_filter";
|
String METRICS_SYNC_SOURCE_CHANGE_FILTER = METRICS_SYNC_PREFIX + "_source_change_filter";
|
||||||
String METRICS_SYNC_SOURCE_CHANGE_PARTITION = METRICS_SYNC_PREFIX + "_source_change_partition";
|
String METRICS_SYNC_SOURCE_CHANGE_PARTITION = METRICS_SYNC_PREFIX + "_source_change_partition";
|
||||||
String METRICS_SYNC_SOURCE_BACK_LOGS = METRICS_SYNC_PREFIX + "_source_back_logs";
|
String METRICS_SYNC_SOURCE_BACK_LOGS = METRICS_SYNC_PREFIX + "_source_back_logs";
|
||||||
|
|||||||
@@ -38,15 +38,12 @@ public class OperationTypeFilter extends RichFilterFunction<Record> {
|
|||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
|
|
||||||
Function<String, Map<String, String>> fillTags = operator -> MapUtil.<String, String>builder()
|
|
||||||
.put(Constants.METRICS_LABEL_TYPE, operator)
|
|
||||||
.build();
|
|
||||||
// 初始化指标
|
// 初始化指标
|
||||||
insertCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.INSERT));
|
insertCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_INSERT);
|
||||||
updateCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.UPDATE));
|
updateCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UPDATE);
|
||||||
deleteCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.DELETE));
|
deleteCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DELETE);
|
||||||
ddlCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.DDL));
|
ddlCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_DDL);
|
||||||
unknownCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.UNKNOWN));
|
unknownCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE_UNKNOWN);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user