From d4e161459a2213b5f1238102d6ef9c650d9f4ae2 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Tue, 5 Mar 2024 12:21:38 +0800 Subject: [PATCH] =?UTF-8?q?feat(monitor):=20=E5=A2=9E=E5=8A=A0=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E8=BE=93=E5=87=BA=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 一些外部指标查询通过指标输出模块输出,避免对原业务模块产生影响 --- .idea/httpRequests/http-requests-log.http | 17 +-- bin/build-monitor.sh | 6 + pom.xml | 1 + .../src/main/resources/application.yml | 6 + .../service/common/Constants.java | 11 +- service-monitor/pom.xml | 41 +++++++ .../service/monitor/MonitorApplication.java | 27 +++++ .../service/monitor/metric/Metrics.java | 9 ++ .../service/monitor/metric/PulsarMetrics.java | 76 +++++++++++++ .../src/main/resources/application.yml | 5 + .../src/main/resources/logback-spring.xml | 52 +++++++++ .../queue/controller/QueueOperator.java | 7 +- test/test.http | 3 + .../service/sync/MessageParseTest.java | 106 ++++++++++++++++++ 14 files changed, 355 insertions(+), 12 deletions(-) create mode 100755 bin/build-monitor.sh create mode 100644 service-monitor/pom.xml create mode 100644 service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/MonitorApplication.java create mode 100644 service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/Metrics.java create mode 100644 service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java create mode 100644 service-monitor/src/main/resources/application.yml create mode 100644 service-monitor/src/main/resources/logback-spring.xml create mode 100644 utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java diff --git a/.idea/httpRequests/http-requests-log.http b/.idea/httpRequests/http-requests-log.http index b5443a2..ecb8171 100644 --- a/.idea/httpRequests/http-requests-log.http +++ b/.idea/httpRequests/http-requests-log.http @@ -1,3 +1,12 @@ +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s15.hdp.dc:21685/pulsar/backlog?name=main&topic=persistent://odcp/grid/grid_serv_staff&subscription=Hudi_Sync_Pulsar_Reader_1552408245762723840_grid_grid_serv_staff_b_20230425 +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-03-05T111533.200.json + +### + GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service-exporter/exporter/un_running_flink_job Connection: Keep-Alive User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) @@ -398,11 +407,3 @@ Accept-Encoding: br,deflate,gzip,x-gzip ### -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - diff --git a/bin/build-monitor.sh b/bin/build-monitor.sh new file mode 100755 index 0000000..2639de6 --- /dev/null +++ b/bin/build-monitor.sh @@ -0,0 +1,6 @@ +#!/bin/bash +root_path=$(dirname $(cd $(dirname $0);pwd)) +source $root_path/bin/library.sh +mvn -pl service-common,service-dependencies,service-configuration clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml +mvn -pl service-monitor clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml +upload $root_path/service-monitor/target/service-monitor-1.0.0-SNAPSHOT.jar \ No newline at end of file diff --git a/pom.xml b/pom.xml index 55c0c6c..8c1e8c7 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,7 @@ service-info-query service-launcher service-loki-query + service-monitor service-pulsar-query service-queue service-scheduler diff --git a/service-cli/service-cli-runner/src/main/resources/application.yml b/service-cli/service-cli-runner/src/main/resources/application.yml index 2768a08..b6ae907 100644 --- a/service-cli/service-cli-runner/src/main/resources/application.yml +++ b/service-cli/service-cli-runner/src/main/resources/application.yml @@ -168,3 +168,9 @@ deploy: - "service" source-jar: service-exporter-1.0.0-SNAPSHOT.jar replicas: 3 + service-monitor: + order: 5 + groups: + - "service" + source-jar: service-monitor-1.0.0-SNAPSHOT.jar + replicas: 1 diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index 3ef059f..f2701c2 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -84,10 +84,12 @@ public interface Constants { String DATA_TIME = "data-time"; String DATA_PARENT_PATH = "data-parent-path"; - String METRICS_PREFIX = "metrics_hudi"; + String METRICS_PREFIX = "service"; + String METRICS_YARN_PREFIX = METRICS_PREFIX + "_yarn"; String METRICS_YARN_JOB = METRICS_YARN_PREFIX + "_job"; String METRICS_YARN_TABLE = METRICS_YARN_PREFIX + "_table"; + String METRICS_SYNC_PREFIX = METRICS_PREFIX + "_sync"; String METRICS_SYNC_SOURCE_LATENCY = METRICS_SYNC_PREFIX + "_source_latency"; String METRICS_SYNC_LATENCY = METRICS_SYNC_PREFIX + "_latency"; @@ -100,6 +102,12 @@ public interface Constants { String METRICS_SYNC_SOURCE_CHANGE_PARTITION = METRICS_SYNC_PREFIX + "_source_change_partition"; String METRICS_SYNC_SOURCE_BACK_LOGS = METRICS_SYNC_PREFIX + "_source_back_logs"; + String METRICS_QUEUE_PREFIX = METRICS_PREFIX + "_queue"; + String METRICS_QUEUE_SIZE = METRICS_QUEUE_PREFIX + "_size"; + + String METRICS_PULSAR_PREFIX = METRICS_PREFIX + "_pulsar"; + String METRICS_PULSAR_BACKLOG = METRICS_PULSAR_PREFIX + "_backlog"; + String METRICS_LABEL_FLINK_JOB_ID = "flink_job_id"; String METRICS_LABEL_FLINK_JOB_NAME = "flink_job_name"; String METRICS_LABEL_FLINK_NATIVE_JOB_ID = "flink_native_job_id"; @@ -124,6 +132,7 @@ public interface Constants { String METRICS_STATUS_STOPPED = "stopped"; String METRICS_LABEL_TYPE = "type"; + String METRICS_LABEL_NAME = "name"; String LOKI_PUSH_URL = "loki_push_url"; diff --git a/service-monitor/pom.xml b/service-monitor/pom.xml new file mode 100644 index 0000000..85ca2e5 --- /dev/null +++ b/service-monitor/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + + com.lanyuanxiaoyao + hudi-service + 1.0.0-SNAPSHOT + + + service-monitor + + + + com.lanyuanxiaoyao + service-dependencies + + + com.lanyuanxiaoyao + service-configuration + + + com.lanyuanxiaoyao + service-forest + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/MonitorApplication.java b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/MonitorApplication.java new file mode 100644 index 0000000..2d736b0 --- /dev/null +++ b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/MonitorApplication.java @@ -0,0 +1,27 @@ +package com.lanyuanxiaoyao.service.monitor; + +import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.retry.annotation.EnableRetry; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * @author lanyuanxiaoyao + * @date 2024-03-05 + */ +@EnableDiscoveryClient +@SpringBootApplication( + scanBasePackages = {"com.lanyuanxiaoyao.service"} +) +@EnableConfigurationProperties +@EnableEncryptableProperties +@EnableRetry +@EnableScheduling +public class MonitorApplication { + public static void main(String[] args) { + SpringApplication.run(MonitorApplication.class, args); + } +} diff --git a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/Metrics.java b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/Metrics.java new file mode 100644 index 0000000..8dda949 --- /dev/null +++ b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/Metrics.java @@ -0,0 +1,9 @@ +package com.lanyuanxiaoyao.service.monitor.metric; + +/** + * @author lanyuanxiaoyao + * @date 2024-03-05 + */ +public abstract class Metrics { + abstract void update(); +} diff --git a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java new file mode 100644 index 0000000..f0d1458 --- /dev/null +++ b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java @@ -0,0 +1,76 @@ +package com.lanyuanxiaoyao.service.monitor.metric; + +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.common.Constants; +import com.lanyuanxiaoyao.service.common.utils.NameHelper; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.forest.service.PulsarService; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import static com.lanyuanxiaoyao.service.common.Constants.MINUTE; + +/** + * Pulsar + * + * @author lanyuanxiaoyao + * @date 2024-03-05 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@Service +public class PulsarMetrics extends Metrics { + private static final Logger logger = LoggerFactory.getLogger(PulsarMetrics.class); + + private final MeterRegistry registry; + private final InfoService infoService; + private final PulsarService pulsarService; + + private final Map backlogMap; + + public PulsarMetrics(MeterRegistry registry, InfoService infoService, PulsarService pulsarService) { + this.registry = registry; + this.infoService = infoService; + this.pulsarService = pulsarService; + + backlogMap = Maps.mutable.empty(); + } + + @Scheduled(fixedDelay = MINUTE, initialDelay = MINUTE) + @Override + void update() { + infoService.tableMetaList() + // .asParallel(ExecutorProvider.EXECUTORS, 50) + .reject(meta -> StrUtil.isBlank(meta.getPulsarAddress())) + .forEach(meta -> { + try { + String name = pulsarService.name(meta.getPulsarAddress()); + Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias())); + AtomicLong backlogCache = backlogMap.getOrDefault( + meta.getAlias(), + registry.gauge( + Constants.METRICS_PULSAR_BACKLOG, + Lists.immutable.of( + Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()), + Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()), + Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()), + Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable()) + ), + new AtomicLong() + ) + ); + backlogCache.set(backlog); + } catch (Exception e) { + logger.warn("Something bad for " + meta.getAlias(), e); + } + }); + } +} diff --git a/service-monitor/src/main/resources/application.yml b/service-monitor/src/main/resources/application.yml new file mode 100644 index 0000000..84176e3 --- /dev/null +++ b/service-monitor/src/main/resources/application.yml @@ -0,0 +1,5 @@ +spring: + application: + name: service-monitor + profiles: + include: random-port,common,discovery,metrics,forest \ No newline at end of file diff --git a/service-monitor/src/main/resources/logback-spring.xml b/service-monitor/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..f272f36 --- /dev/null +++ b/service-monitor/src/main/resources/logback-spring.xml @@ -0,0 +1,52 @@ + + + + + + + + + + + true + + ${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push} + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx + + true + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %clr(%5p) %clr([${HOSTNAME}]){yellow} %clr([%t]){magenta} %clr(%logger{40}){cyan} #@# %m%n%wEx + + + + + ${LOGGING_PARENT:-.}/${APP_NAME:-run}.log + + ${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz + 7 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx + + + + + + + + + + + + \ No newline at end of file diff --git a/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueOperator.java b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueOperator.java index cb05c28..9be7ad3 100644 --- a/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueOperator.java +++ b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueOperator.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.queue.controller; +import com.lanyuanxiaoyao.service.common.Constants; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import java.util.Collection; @@ -17,10 +18,10 @@ public abstract class QueueOperator { protected void updateMetrics(MeterRegistry registry, String name, Map> map) { map.forEach((queue, collection) -> registry.gaugeCollectionSize( - "service_queue_size", + Constants.METRICS_QUEUE_SIZE, Lists.immutable.of( - Tag.of("type", name), - Tag.of("name", queue) + Tag.of(Constants.METRICS_LABEL_TYPE, name), + Tag.of(Constants.METRICS_LABEL_NAME, queue) ), collection )); diff --git a/test/test.http b/test/test.http index d034cbc..da78a9e 100644 --- a/test/test.http +++ b/test/test.http @@ -93,3 +93,6 @@ GET {{web-url}}/test ### 获取未运行的同步任务 GET {{exporter-url}}/exporter/un_running_flink_job + +### Pulsar backlog +GET http://{{username}}:{{password}}@b12s15.hdp.dc:21685/pulsar/backlog?name=main&topic=persistent://odcp/grid/grid_serv_staff&subscription=Hudi_Sync_Pulsar_Reader_1552408245762723840_grid_grid_serv_staff_b_20230425 diff --git a/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java new file mode 100644 index 0000000..78a7235 --- /dev/null +++ b/utils/sync/src/test/java/com/lanyuanxiaoyao/service/sync/MessageParseTest.java @@ -0,0 +1,106 @@ +package com.lanyuanxiaoyao.service.sync; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.common.entity.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author lanyuanxiaoyao + * @date 2024-03-04 + */ +public class MessageParseTest { + private static final Logger logger = LoggerFactory.getLogger(MessageParseTest.class); + + public static void main(String[] args) throws JsonProcessingException { + // language=JSON + String message = "{\n" + + " \"source\": {\n" + + " \"sourceId\": \"set_4846458104\",\n" + + " \"sourceType\": null,\n" + + " \"sourcePos\": \"mysql-bin.013514:245945070\",\n" + + " \"currentTs\": \"2024-03-02 06:29:04\"\n" + + " },\n" + + " \"statement\": {\n" + + " \"schema\": \"acct_sz\",\n" + + " \"table\": \"acct_item\",\n" + + " \"opStatement\": null,\n" + + " \"opType\": \"U\",\n" + + " \"op\": null,\n" + + " \"opTs\": \"2024-03-02 06:29:04\",\n" + + " \"before\": {\n" + + " \"REGION_ID\": \"940271349\",\n" + + " \"CUST_ID\": \"0\",\n" + + " \"AMOUNT_FIX\": \"-2224\",\n" + + " \"ONE_ACCT_ITEM_ID\": \"0\",\n" + + " \"BILL_ID\": \"0\",\n" + + " \"PARTY_ROLE_ID\": \"100\",\n" + + " \"AMOUNT\": \"-2224\",\n" + + " \"ITEM_SOURCE_ID\": \"30145122\",\n" + + " \"HAD_INVOICE_AMOUNT\": \"0\",\n" + + " \"DUE_DATE\": \"2024-04-06\",\n" + + " \"ACCT_ID\": \"310000038469672\",\n" + + " \"DUE_TYPE\": \"0\",\n" + + " \"FEE_CYCLE_ID\": \"20240201\",\n" + + " \"ARREAR_DATE\": \"2024-04-06\",\n" + + " \"PROD_INST_ID\": \"420002870683658\",\n" + + " \"PAYMENT_METHOD\": \"0\",\n" + + " \"STATUS_DATE\": \"2024-03-01 09:42:21\",\n" + + " \"PAY_CYCLE_ID\": \"20240301\",\n" + + " \"NO_INVOICE_AMOUNT\": \"0\",\n" + + " \"ACC_NUM\": \"13302956536\",\n" + + " \"PRESENT_AMOUNT\": \"0\",\n" + + " \"GROUP_ID\": \"270002513937230\",\n" + + " \"CREATE_DATE\": \"2024-03-01 09:42:21\",\n" + + " \"CITY_ID\": \"755\",\n" + + " \"ACCT_ITEM_TYPE_ID\": \"16150\",\n" + + " \"PAY_START_DATE\": \"2024-03-03\",\n" + + " \"ACCT_ITEM_ID\": \"27000036749650554\",\n" + + " \"BILLING_CYCLE_ID\": \"20240201\",\n" + + " \"OFFER_INST_ID\": \"420001788058165\",\n" + + " \"STATUS_CD\": \"1\",\n" + + " \"ORI_ACCT_ITEM_ID\": \"0\",\n" + + " \"CUSTOM_ITEM\": \"0000000000000000\"\n" + + " },\n" + + " \"after\": {\n" + + " \"REGION_ID\": \"940271349\",\n" + + " \"CUST_ID\": \"0\",\n" + + " \"AMOUNT_FIX\": \"-2224\",\n" + + " \"ONE_ACCT_ITEM_ID\": \"0\",\n" + + " \"BILL_ID\": \"0\",\n" + + " \"PARTY_ROLE_ID\": \"100\",\n" + + " \"AMOUNT\": \"-2224\",\n" + + " \"ITEM_SOURCE_ID\": \"30145122\",\n" + + " \"HAD_INVOICE_AMOUNT\": \"0\",\n" + + " \"DUE_DATE\": \"2024-04-06\",\n" + + " \"ACCT_ID\": \"310000038469672\",\n" + + " \"DUE_TYPE\": \"0\",\n" + + " \"FEE_CYCLE_ID\": \"20240201\",\n" + + " \"ARREAR_DATE\": \"2024-04-06\",\n" + + " \"PROD_INST_ID\": \"420002870683658\",\n" + + " \"PAYMENT_METHOD\": \"0\",\n" + + " \"STATUS_DATE\": \"2024-03-01 09:42:21\",\n" + + " \"PAY_CYCLE_ID\": \"20240301\",\n" + + " \"NO_INVOICE_AMOUNT\": \"0\",\n" + + " \"ACC_NUM\": \"13302956536\",\n" + + " \"PRESENT_AMOUNT\": \"0\",\n" + + " \"GROUP_ID\": \"270002513937230\",\n" + + " \"CREATE_DATE\": \"2024-03-01 09:42:21\",\n" + + " \"CITY_ID\": \"755\",\n" + + " \"ACCT_ITEM_TYPE_ID\": \"16150\",\n" + + " \"PAY_START_DATE\": \"2024-03-03\",\n" + + " \"ACCT_ITEM_ID\": \"27000036749650554\",\n" + + " \"BILLING_CYCLE_ID\": \"20240201\",\n" + + " \"OFFER_INST_ID\": \"420001788058165\",\n" + + " \"STATUS_CD\": \"1\",\n" + + " \"ORI_ACCT_ITEM_ID\": \"0\",\n" + + " \"CUSTOM_ITEM\": \"1000000000000000\"\n" + + " }\n" + + " }\n" + + "}"; + ObjectMapper mapper = new ObjectMapper(); + Record record = mapper.readValue(message, Record.class); + logger.info("Record: {}", record); + } +}