refactor(sync): 移除自制的指标推送工具 接入flink的指标体系

This commit is contained in:
v-zhangjc9
2024-06-12 15:16:32 +08:00
parent bc744e0fd2
commit 1f9b7fbd08
10 changed files with 106 additions and 701 deletions

View File

@@ -1,22 +1,17 @@
package com.lanyuanxiaoyao.service.sync.functions;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.metrics.CountMetric;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.metrics.Counter;
/**
* 操作类型过滤算子
@@ -24,37 +19,34 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
* @author ZhangJiacheng
* @date 2022-06-12
*/
public class OperationTypeFilter extends RichFilterFunction<Record> implements CheckpointedFunction {
private final CountMetric insertRateMetric;
private final CountMetric updateRateMetric;
private final CountMetric deleteRateMetric;
private final CountMetric ddlRateMetric;
private final CountMetric unknownRateMetric;
private final List<CountMetric> metrics;
private final GlobalConfiguration globalConfiguration;
public class OperationTypeFilter extends RichFilterFunction<Record> {
private final FlinkJob flinkJob;
private final TableMeta tableMeta;
private transient Counter insertCounter;
private transient Counter updateCounter;
private transient Counter deleteCounter;
private transient Counter ddlCounter;
private transient Counter unknownCounter;
public OperationTypeFilter(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) {
this.globalConfiguration = globalConfiguration;
Function<String, Map<String, String>> fillTags = operator -> MapUtil.<String, String>builder()
.put(Constants.METRICS_LABEL_TYPE, operator)
.build();
insertRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.INSERT));
updateRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.UPDATE));
deleteRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.DELETE));
ddlRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.DDL));
unknownRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.UNKNOWN));
metrics = ListUtil.toList(insertRateMetric, updateRateMetric, deleteRateMetric, ddlRateMetric, unknownRateMetric);
this.flinkJob = flinkJob;
this.tableMeta = tableMeta;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Function<String, Map<String, String>> fillTags = operator -> MapUtil.<String, String>builder()
.put(Constants.METRICS_LABEL_TYPE, operator)
.build();
// 初始化指标
MetricsUtils.createMakePointTimer(globalConfiguration, metrics);
insertCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.INSERT));
updateCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.UPDATE));
deleteCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.DELETE));
ddlCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.DDL));
unknownCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.UNKNOWN));
}
@Override
@@ -62,29 +54,20 @@ public class OperationTypeFilter extends RichFilterFunction<Record> implements C
String opType = record.getStatement().getOpType();
switch (opType) {
case Constants.INSERT:
insertRateMetric.increment();
insertCounter.inc();
break;
case Constants.UPDATE:
updateRateMetric.increment();
updateCounter.inc();
break;
case Constants.DELETE:
deleteRateMetric.increment();
deleteCounter.inc();
break;
case Constants.DDL:
ddlRateMetric.increment();
ddlCounter.inc();
break;
default:
unknownRateMetric.increment();
unknownCounter.inc();
}
return !Constants.DDL.equals(record.getStatement().getOpType());
}
@Override
public void initializeState(FunctionInitializationContext context) {
}
@Override
public void snapshotState(FunctionSnapshotContext context) {
MetricsUtils.publishAllMetrics(metrics);
}
}

View File

@@ -10,8 +10,6 @@ import com.lanyuanxiaoyao.service.common.utils.LogHelper;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.metrics.MessageSizeSizeMetric;
import com.lanyuanxiaoyao.service.sync.metrics.RateMetric;
import com.lanyuanxiaoyao.service.sync.utils.LoadBalance;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
@@ -25,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -32,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.internal.DefaultImplementation;
@@ -40,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_COMPLETE;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_INITIAL;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_INITIAL_MESSAGE_ID;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_START;
@@ -64,12 +61,11 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
private final AtomicReference<MessageId> lastMessageId = new AtomicReference<>();
private final AtomicLong latestPublishTime = new AtomicLong(0);
private final AtomicLong latestReceiveTime = new AtomicLong(0);
private final RateMetric messageReceiveMetric;
private final MessageSizeSizeMetric messageSizeReceiveMetric;
private final Map<Long, MessageId> messageIdMap = new ConcurrentHashMap<>();
private boolean running = true;
private PulsarClient client = null;
private Reader<String> reader = null;
private transient Counter messageReceiveCounter;
private transient Counter messageReceiveBytesCounter;
public PulsarMessageSourceReader(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) {
logger.info("Use PulsarMessageSourceReader");
@@ -77,27 +73,6 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
this.flinkJob = flinkJob;
this.topic = tableMeta.getTopic();
this.tableMeta = tableMeta;
String messageId = tableMeta.getConfig().getMessageId();
logger.info("{} {}", Constants.LOG_POINT_PULSAR_SOURCE_BOOTSTRAP_MESSAGE_ID, messageId);
if (StrUtil.isNotBlank(messageId)) {
lastMessageId.set(parseMessageId(messageId));
} else {
logger.warn("Message id is empty");
lastMessageId.set(MessageId.earliest);
}
messageReceiveMetric = new RateMetric(
globalConfiguration,
Constants.METRICS_SYNC_SOURCE_MESSAGE_RECEIVE,
flinkJob,
tableMeta
);
messageSizeReceiveMetric = new MessageSizeSizeMetric(
globalConfiguration,
Constants.METRICS_SYNC_SOURCE_MESSAGE_SIZE_RECEIVE_BYTES,
flinkJob, tableMeta
);
}
private static MessageId parseMessageId(String messageIdText) {
@@ -105,15 +80,17 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
LogHelper.info(logger, CHECKPOINT_INITIAL);
public void initializeState(FunctionInitializationContext context) {
}
@Override
public void run(SourceContext<String> context) throws Exception {
String queryUrl = StrUtil.format(
"{}/api/message_id?flink_job_id={}&alias={}",
LoadBalance.getCustomPublishUrl(globalConfiguration),
flinkJob.getId(),
tableMeta.getAlias()
);
logger.info("Query url: {}", queryUrl);
String messageId = Failsafe.with(MESSAGE_ID_RETRY)
.onFailure(event -> {
if (ObjectUtil.isNotNull(event.getException())) {
@@ -130,45 +107,57 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
if (StrUtil.isNotBlank(messageId)) {
lastMessageId.set(parseMessageId(messageId));
} else {
logger.warn(StrUtil.format("{} Message id is empty, now message id is {}", Constants.LOG_POINT_MESSAGE_ID_EMPTY, lastMessageId.get()));
throw new Exception(StrUtil.format("Cannot get message id from: {}", queryUrl));
}
}
@Override
public void run(SourceContext<String> context) throws Exception {
String currentValue = null;
while (running) {
synchronized (context.getCheckpointLock()) {
Message<String> message;
try {
message = reader.readNext();
if (ObjectUtil.isNotNull(message)) {
String value = message.getValue();
currentValue = value;
if (ObjectUtil.isEmpty(value)) {
logger.warn("{} {}", message.getValue(), message.getMessageId());
}
synchronized (context.getCheckpointLock()) {
context.collect(value);
}
if (RecordHelper.isNotVersionUpdateRecord(value)) {
latestPublishTime.set(message.getPublishTime());
latestReceiveTime.set(Instant.now().toEpochMilli());
}
lastMessageId.set(message.getMessageId());
messageReceiveMetric.increment();
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(tableMeta.getPulsarAddress())
.build()) {
try (Reader<String> reader = client.newReader(new StringSchema())
.topic(topic)
.receiverQueueSize(10000)
.subscriptionName(NameHelper.pulsarSubscriptionName(flinkJob.getId(), tableMeta.getAlias(), globalConfiguration.getSignature()))
.startMessageId(lastMessageId.get())
.startMessageIdInclusive()
.create()) {
String currentValue = null;
while (running) {
synchronized (context.getCheckpointLock()) {
Message<String> message;
try {
messageSizeReceiveMetric.increment(message.getValue().getBytes().length);
message = reader.readNext();
if (ObjectUtil.isNotNull(message)) {
String value = message.getValue();
currentValue = value;
if (ObjectUtil.isEmpty(value)) {
logger.warn("{} {}", message.getValue(), message.getMessageId());
}
synchronized (context.getCheckpointLock()) {
context.collect(value);
}
if (RecordHelper.isNotVersionUpdateRecord(value)) {
latestPublishTime.set(message.getPublishTime());
latestReceiveTime.set(Instant.now().toEpochMilli());
}
lastMessageId.set(message.getMessageId());
messageReceiveCounter.inc();
try {
messageReceiveBytesCounter.inc(message.getValue().getBytes().length);
} catch (Throwable t) {
logger.warn("Parse message size failure", t);
}
}
} catch (Throwable t) {
logger.warn("Parse message size failure", t);
throw new Exception("Read message failure, current value: " + currentValue, t);
}
}
} catch (Throwable t) {
throw new Exception("Read message failure, current value: " + currentValue, t);
}
}
} catch (Throwable exception) {
logger.error(StrUtil.format("Any error ({})", tableMeta.getAlias()), exception);
throw exception;
}
}
@@ -177,25 +166,8 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
super.open(configuration);
// 初始化指标
MetricsUtils.createMakePointTimer(globalConfiguration, messageReceiveMetric);
MetricsUtils.createMakePointTimer(globalConfiguration, messageSizeReceiveMetric);
try {
client = PulsarClient.builder()
.serviceUrl(tableMeta.getPulsarAddress())
.build();
reader = client.newReader(new StringSchema())
.topic(topic)
.receiverQueueSize(10000)
.subscriptionName(NameHelper.pulsarSubscriptionName(flinkJob.getId(), tableMeta.getAlias(), globalConfiguration.getSignature()))
.startMessageId(lastMessageId.get())
.startMessageIdInclusive()
.create();
} catch (Exception exception) {
logger.error(StrUtil.format("Connect pulsar error ({} {})", tableMeta.getPulsarAddress(), topic), exception);
throw exception;
}
logger.info("Message id set to {}", lastMessageId.get());
messageReceiveCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_MESSAGE_RECEIVE);
messageReceiveBytesCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_MESSAGE_SIZE_RECEIVE_BYTES);
}
@Override
@@ -203,33 +175,11 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
running = false;
}
@Override
public void close() throws Exception {
super.close();
if (reader != null) {
try {
reader.close();
} catch (PulsarClientException e) {
logger.error("Pulsar reader close error", e);
}
}
if (client != null) {
try {
client.close();
} catch (PulsarClientException e) {
logger.error("Pulsar client close error", e);
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) {
MessageId messageId = lastMessageId.get();
messageIdMap.put(context.getCheckpointId(), messageId);
LogHelper.info(logger, CHECKPOINT_START, "Checkpoint start message id: {}, checkpoint id: {}", messageId, context.getCheckpointId());
messageReceiveMetric.publish();
messageSizeReceiveMetric.publish();
}
@Override

View File

@@ -11,7 +11,6 @@ import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.functions.type.TypeConverter;
import com.lanyuanxiaoyao.service.sync.metrics.CountMetric;
import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
@@ -22,13 +21,9 @@ import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,37 +36,22 @@ import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.VERSION
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class Record2RowDataFunction extends RichMapFunction<Record, List<RowData>> implements CheckpointedFunction {
public class Record2RowDataFunction extends RichMapFunction<Record, List<RowData>> {
private static final Logger logger = LoggerFactory.getLogger(Record2RowDataFunction.class);
private final GlobalConfiguration globalConfiguration;
private final TableMeta tableMeta;
private final CountMetric changeFilterMetric;
private final CountMetric changePartitionMetric;
private final ObjectMapper mapper = JacksonUtils.getMapper();
private final FlinkJob flinkJob;
private final TableMeta tableMeta;
private final ObjectMapper mapper = JacksonUtils.getMapper();
private Schema schema;
private transient Counter changeFilterCounter;
private transient Counter changePartitionCounter;
public Record2RowDataFunction(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) {
this.globalConfiguration = globalConfiguration;
this.flinkJob = flinkJob;
this.tableMeta = tableMeta;
changeFilterMetric = new CountMetric(
globalConfiguration,
Constants.METRICS_SYNC_SOURCE_CHANGE_FILTER,
flinkJob, tableMeta
);
changePartitionMetric = new CountMetric(
globalConfiguration,
Constants.METRICS_SYNC_SOURCE_CHANGE_PARTITION,
flinkJob, tableMeta
);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
schema = SyncUtils.avroSchemaWithExtraFields(tableMeta);
}
@Override
@@ -79,32 +59,10 @@ public class Record2RowDataFunction extends RichMapFunction<Record, List<RowData
super.open(parameters);
// 初始化指标
MetricsUtils.createMakePointTimer(globalConfiguration, changeFilterMetric);
MetricsUtils.createMakePointTimer(globalConfiguration, changePartitionMetric);
}
changeFilterCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_CHANGE_FILTER);
changePartitionCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_CHANGE_PARTITION);
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
MetricsUtils.publishAllMetrics(changeFilterMetric);
MetricsUtils.publishAllMetrics(changePartitionMetric);
}
private GenericRowData covert2RowData(Schema schema, Map<String, Object> current) {
List<Schema.Field> fields = schema.getFields();
GenericRowData data = new GenericRowData(fields.size());
for (int index = 0; index < fields.size(); index++) {
Schema.Field field = fields.get(index);
// 如果是telepg的话字段名就要统一改成小写上游不规范下游擦屁股
Object value = current.getOrDefault(Constants.FIELD_COVERT.apply(tableMeta, field.name()), null);
if (field.schema().getType().equals(Schema.Type.STRING)
|| (field.schema().isUnion() && field.schema().getTypes().contains(Schema.create(Schema.Type.STRING)))
|| value instanceof String) {
data.setField(index, StringData.fromString((String) value));
} else {
data.setField(index, value);
}
}
return data;
schema = SyncUtils.avroSchemaWithExtraFields(tableMeta);
}
private Boolean isFilterOut(TableMeta tableMeta, Map<String, Object> current) {
@@ -143,14 +101,14 @@ public class Record2RowDataFunction extends RichMapFunction<Record, List<RowData
boolean isChangeFilter = RecordHelper.isChangeField(tableMeta, record, TableMetaHelper::getFilterField);
if (isChangeFilter) {
logger.info("Change filter: {}", mapper.writeValueAsString(record));
changeFilterMetric.increment();
changeFilterCounter.inc();
}
// 如果是 update 且 city_id 不相等就先删除旧记录
boolean isChangePartition = RecordHelper.isChangeField(tableMeta, record, TableMetaHelper::getPartitionField);
if (isChangePartition) {
logger.info("Change partition field: {}", mapper.writeValueAsString(record));
changePartitionMetric.increment();
changePartitionCounter.inc();
}
if (isChangeFilter || isChangePartition) {

View File

@@ -1,123 +0,0 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 指标基础类
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public abstract class AbstractMetric implements Metric {
private static final Logger logger = LoggerFactory.getLogger(AbstractMetric.class);
private static final ObjectMapper MAPPER = JacksonUtils.getMapper();
private static final RetryPolicy<HttpResponse> PUBLISH_RETRY = RetryPolicy.<HttpResponse>builder()
.handle(Exception.class)
.withDelay(Duration.ofSeconds(1))
.withMaxAttempts(5)
.build();
private final GlobalConfiguration globalConfiguration;
private final List<String> lineCache = ListUtil.toList();
private final LongAdder autoPublishCount = new LongAdder();
private List<HttpMetricsRequest> requests = new ArrayList<>();
public AbstractMetric(GlobalConfiguration globalConfiguration) {
this.globalConfiguration = globalConfiguration;
}
public void setRequests(HttpMetricsRequest... requests) {
setRequests(ListUtil.toList(requests));
}
public void setRequests(List<HttpMetricsRequest> requests) {
this.requests = requests;
}
public void addRequest(HttpMetricsRequest request) {
this.requests.add(request);
}
public void addRequests(HttpMetricsRequest... requests) {
addRequests(ListUtil.toList(requests));
}
public void addRequests(List<HttpMetricsRequest> requests) {
this.requests.addAll(requests);
}
@Override
public void addTag(String key, String value) {
requests.forEach(request -> request.addTag(key, value));
}
@Override
public void makePoint(boolean autoPublish, int batch) {
if (!globalConfiguration.getMetricEnable()) {
return;
}
if (autoPublish) {
if (autoPublishCount.sum() >= batch) {
publish();
}
autoPublishCount.increment();
}
makePoint();
}
public synchronized void publish() {
if (!globalConfiguration.getMetricEnable()) {
return;
}
try {
requests.stream()
.filter(request -> !request.isEmpty())
.map(request -> {
try {
String data = MAPPER.writeValueAsString(request);
request.clear();
return data;
} catch (JsonProcessingException e) {
logger.warn("Parse metrics failure: " + request, e);
}
return null;
})
.filter(Objects::nonNull)
.forEach(lineCache::add);
if (lineCache.isEmpty()) {
return;
}
String lines = String.join("\n", lineCache);
logger.debug("Push metrics: \n{}", lines);
HttpResponse response = Failsafe.with(PUBLISH_RETRY)
.get(() -> HttpUtil.createPost(globalConfiguration.getMetricPublishUrl())
.body(lines)
.basicAuth(Constants.VICTORIA_USERNAME, Constants.VICTORIA_PASSWORD)
.timeout(globalConfiguration.getMetricPublishTimeout())
.execute());
if (response.isOk()) {
logger.debug("Metrics push success");
}
} catch (Throwable throwable) {
logger.warn("Push metrics failure, url: " + globalConfiguration.getMetricPublishUrl(), throwable);
} finally {
lineCache.clear();
autoPublishCount.reset();
}
}
}

View File

@@ -1,67 +0,0 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
/**
* 基础类
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class CountMetric extends AbstractMetric {
private final LongAdder count = new LongAdder();
private final HttpMetricsRequest countMetrics;
public CountMetric(GlobalConfiguration globalConfiguration, String name) {
this(globalConfiguration, name, MapUtil.empty());
}
public CountMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)).build());
}
public CountMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, String extraTagKey, String extraTagValue) {
this(globalConfiguration, name, job, meta, MapUtil.of(extraTagKey, extraTagValue));
}
public CountMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, Map<String, String> tags) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta))
.putAll(tags)
.build());
}
public CountMetric(GlobalConfiguration globalConfiguration, String name, Map<String, String> tags) {
super(globalConfiguration);
countMetrics = new HttpMetricsRequest(
name + "_count",
MapUtil.<String, String>builder().putAll(tags).build()
);
setRequests(countMetrics);
}
public void increment() {
count.increment();
}
@Override
public void makePoint() {
double count = this.count.doubleValue();
if (count != 0) {
countMetrics.addMetric(count);
}
}
@Override
public List<HttpMetricsRequest> getMetrics() {
return ListUtil.toList(countMetrics);
}
}

View File

@@ -1,88 +0,0 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import java.beans.Transient;
import java.io.Serializable;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 指标实体类
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class HttpMetricsRequest implements Serializable {
private final Map<String, String> metric;
private final List<Double> values;
private final List<Long> timestamps;
private final Lock lock = new ReentrantLock();
public HttpMetricsRequest(String name, Map<String, String> metrics) {
this.metric = MapUtil.<String, String>builder()
.put("__name__", name)
.build();
this.metric.putAll(metrics);
this.values = Collections.synchronizedList(ListUtil.list(true));
this.timestamps = Collections.synchronizedList(ListUtil.list(true));
}
public void addTag(String key, String value) {
this.metric.put(key, value);
}
public void addMetric(Double value) {
addMetric(value, Instant.now().toEpochMilli());
}
public void addMetric(Double value, Long timestamp) {
synchronized (this) {
values.add(value);
timestamps.add(timestamp);
}
}
public void clear() {
synchronized (this) {
this.values.clear();
this.timestamps.clear();
}
}
@Transient
public boolean isEmpty() {
return this.values.isEmpty() && this.timestamps.isEmpty();
}
@Transient
public boolean isNonEmpty() {
return !isEmpty();
}
public Map<String, String> getMetric() {
return metric;
}
public List<Double> getValues() {
return values;
}
public List<Long> getTimestamps() {
return timestamps;
}
@Override
public String toString() {
return "MetricsItem{" +
"metrics=" + metric +
", values=" + values +
", timestamps=" + timestamps +
'}';
}
}

View File

@@ -1,77 +0,0 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
/**
* 基础类
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class MessageSizeSizeMetric extends AbstractMetric {
private final LongAdder count = new LongAdder();
private final DoubleAdder size = new DoubleAdder();
private final HttpMetricsRequest sizeMetrics;
private final HttpMetricsRequest perMessageSizeMetrics;
public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name) {
this(globalConfiguration, name, MapUtil.empty());
}
public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)).build());
}
public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, String extraTagKey, String extraTagValue) {
this(globalConfiguration, name, job, meta, MapUtil.of(extraTagKey, extraTagValue));
}
public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, Map<String, String> tags) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta))
.putAll(tags)
.build());
}
public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, Map<String, String> tags) {
super(globalConfiguration);
sizeMetrics = new HttpMetricsRequest(
name + "_total",
MapUtil.<String, String>builder().putAll(tags).build()
);
perMessageSizeMetrics = new HttpMetricsRequest(
name + "_per_message",
MapUtil.<String, String>builder().putAll(tags).build()
);
setRequests(sizeMetrics, perMessageSizeMetrics);
}
public void increment(long size) {
this.count.increment();
this.size.add(size);
}
@Override
public void makePoint() {
double count = this.count.doubleValue();
double size = this.size.doubleValue();
if (size != 0 && count != 0) {
sizeMetrics.addMetric(size);
perMessageSizeMetrics.addMetric(size / count);
}
}
@Override
public List<HttpMetricsRequest> getMetrics() {
return ListUtil.toList(sizeMetrics);
}
}

View File

@@ -1,20 +0,0 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import java.io.Serializable;
import java.util.List;
/**
* 指标类定义
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public interface Metric extends Serializable {
void addTag(String key, String value);
void makePoint(boolean autoPublish, int batch);
void makePoint();
List<HttpMetricsRequest> getMetrics();
}

View File

@@ -1,87 +0,0 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
/**
* 基础类
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class RateMetric extends AbstractMetric {
private final LongAdder count = new LongAdder();
private final HttpMetricsRequest countMetrics;
private final HttpMetricsRequest millisecondMetrics;
private final HttpMetricsRequest perSecondMetrics;
private final Instant startInstant;
public RateMetric(GlobalConfiguration globalConfiguration, String name) {
this(globalConfiguration, name, MapUtil.empty());
}
public RateMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)).build());
}
public RateMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, String extraTagKey, String extraTagValue) {
this(globalConfiguration, name, job, meta, MapUtil.of(extraTagKey, extraTagValue));
}
public RateMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, Map<String, String> tags) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta))
.putAll(tags)
.build());
}
public RateMetric(GlobalConfiguration globalConfiguration, String name, Map<String, String> tags) {
super(globalConfiguration);
startInstant = Instant.now();
countMetrics = new HttpMetricsRequest(
name + "_count",
MapUtil.<String, String>builder().putAll(tags).build()
);
millisecondMetrics = new HttpMetricsRequest(
name + "_millisecond",
MapUtil.<String, String>builder().putAll(tags).build()
);
perSecondMetrics = new HttpMetricsRequest(
name + "_per_millisecond",
MapUtil.<String, String>builder().putAll(tags).build()
);
setRequests(countMetrics, millisecondMetrics, perSecondMetrics);
}
public void increment() {
count.increment();
}
@Override
public void makePoint() {
double count = this.count.doubleValue();
if (count != 0) {
long millis = Duration.between(startInstant, Instant.now()).toMillis();
countMetrics.addMetric(count);
millisecondMetrics.addMetric((double) millis);
perSecondMetrics.addMetric(count / millis);
}
}
@Override
public List<HttpMetricsRequest> getMetrics() {
return ListUtil.toList(countMetrics, millisecondMetrics, perSecondMetrics);
}
}

View File

@@ -1,17 +1,14 @@
package com.lanyuanxiaoyao.service.sync.utils;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.metrics.AbstractMetric;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,41 +21,20 @@ import org.slf4j.LoggerFactory;
public class MetricsUtils implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(MetricsUtils.class);
public static Map<String, String> commonTags(FlinkJob job, TableMeta meta) {
return MapUtil.<String, String>builder()
.put(Constants.METRICS_LABEL_FLINK_JOB_ID, job.getId().toString())
.put(Constants.METRICS_LABEL_FLINK_JOB_NAME, job.getName())
.put(Constants.METRICS_LABEL_SCHEMA, meta.getSchema())
.put(Constants.METRICS_LABEL_TABLE, meta.getTable())
.put(Constants.METRICS_LABEL_ALIAS, meta.getAlias())
.build();
public static Counter counter(RuntimeContext context, FlinkJob job, TableMeta meta, String name) {
return counter(context, job, meta, name, MapUtil.empty());
}
@SafeVarargs
public static <T extends AbstractMetric> void createMakePointTimer(GlobalConfiguration globalConfiguration, T... metrics) {
createMakePointTimer(globalConfiguration, ListUtil.toList(metrics));
}
public static <T extends AbstractMetric> void createMakePointTimer(GlobalConfiguration globalConfiguration, List<T> metrics) {
logger.info("Create timer: {}", metrics);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
for (AbstractMetric metric : metrics) {
metric.makePoint(true, globalConfiguration.getMetricPublishBatch());
}
}
}, globalConfiguration.getMetricPublishDelay(), globalConfiguration.getMetricPublishPeriod());
}
@SafeVarargs
public static <T extends AbstractMetric> void publishAllMetrics(T... metrics) {
publishAllMetrics(ListUtil.toList(metrics));
}
public static <T extends AbstractMetric> void publishAllMetrics(List<T> metrics) {
for (AbstractMetric metric : metrics) {
metric.publish();
public static Counter counter(RuntimeContext context, FlinkJob job, TableMeta meta, String name, Map<String, String> tags) {
MetricGroup metricGroup = context.getMetricGroup()
.addGroup(Constants.METRICS_LABEL_FLINK_JOB_ID, job.getId().toString())
.addGroup(Constants.METRICS_LABEL_FLINK_JOB_NAME, job.getName())
.addGroup(Constants.METRICS_LABEL_SCHEMA, meta.getSchema())
.addGroup(Constants.METRICS_LABEL_TABLE, meta.getTable())
.addGroup(Constants.METRICS_LABEL_ALIAS, meta.getAlias());
for (Map.Entry<String, String> entry : tags.entrySet()) {
metricGroup.addGroup(entry.getKey(), entry.getValue());
}
return metricGroup.counter(name);
}
}