diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java index 5592447..b6fcea3 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/OperationTypeFilter.java @@ -1,22 +1,17 @@ package com.lanyuanxiaoyao.service.sync.functions; -import cn.hutool.core.collection.ListUtil; import cn.hutool.core.map.MapUtil; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.Record; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; -import com.lanyuanxiaoyao.service.sync.metrics.CountMetric; import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils; -import java.util.List; import java.util.Map; import java.util.function.Function; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.metrics.Counter; /** * 操作类型过滤算子 @@ -24,37 +19,34 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; * @author ZhangJiacheng * @date 2022-06-12 */ -public class OperationTypeFilter extends RichFilterFunction implements CheckpointedFunction { - private final CountMetric insertRateMetric; - private final CountMetric updateRateMetric; - private final CountMetric deleteRateMetric; - private final CountMetric ddlRateMetric; - private final CountMetric unknownRateMetric; - private final List metrics; - private final GlobalConfiguration globalConfiguration; +public class OperationTypeFilter extends RichFilterFunction { + private final FlinkJob flinkJob; + private final TableMeta tableMeta; + + private transient Counter insertCounter; + private transient Counter updateCounter; + private transient Counter deleteCounter; + private transient Counter ddlCounter; + private transient Counter unknownCounter; public OperationTypeFilter(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) { - this.globalConfiguration = globalConfiguration; - - Function> fillTags = operator -> MapUtil.builder() - .put(Constants.METRICS_LABEL_TYPE, operator) - .build(); - - insertRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.INSERT)); - updateRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.UPDATE)); - deleteRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.DELETE)); - ddlRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.DDL)); - unknownRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.UNKNOWN)); - - metrics = ListUtil.toList(insertRateMetric, updateRateMetric, deleteRateMetric, ddlRateMetric, unknownRateMetric); + this.flinkJob = flinkJob; + this.tableMeta = tableMeta; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); + Function> fillTags = operator -> MapUtil.builder() + .put(Constants.METRICS_LABEL_TYPE, operator) + .build(); // 初始化指标 - MetricsUtils.createMakePointTimer(globalConfiguration, metrics); + insertCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.INSERT)); + updateCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.UPDATE)); + deleteCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.DELETE)); + ddlCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.DDL)); + unknownCounter = MetricsUtils.counter(getRuntimeContext(), flinkJob, tableMeta, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, fillTags.apply(Constants.UNKNOWN)); } @Override @@ -62,29 +54,20 @@ public class OperationTypeFilter extends RichFilterFunction implements C String opType = record.getStatement().getOpType(); switch (opType) { case Constants.INSERT: - insertRateMetric.increment(); + insertCounter.inc(); break; case Constants.UPDATE: - updateRateMetric.increment(); + updateCounter.inc(); break; case Constants.DELETE: - deleteRateMetric.increment(); + deleteCounter.inc(); break; case Constants.DDL: - ddlRateMetric.increment(); + ddlCounter.inc(); break; default: - unknownRateMetric.increment(); + unknownCounter.inc(); } return !Constants.DDL.equals(record.getStatement().getOpType()); } - - @Override - public void initializeState(FunctionInitializationContext context) { - } - - @Override - public void snapshotState(FunctionSnapshotContext context) { - MetricsUtils.publishAllMetrics(metrics); - } } diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java index 595015d..5e82422 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java @@ -10,8 +10,6 @@ import com.lanyuanxiaoyao.service.common.utils.LogHelper; import com.lanyuanxiaoyao.service.common.utils.NameHelper; import com.lanyuanxiaoyao.service.common.utils.RecordHelper; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; -import com.lanyuanxiaoyao.service.sync.metrics.MessageSizeSizeMetric; -import com.lanyuanxiaoyao.service.sync.metrics.RateMetric; import com.lanyuanxiaoyao.service.sync.utils.LoadBalance; import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils; import com.lanyuanxiaoyao.service.sync.utils.StatusUtils; @@ -25,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -32,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.impl.schema.StringSchema; import org.apache.pulsar.client.internal.DefaultImplementation; @@ -40,7 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_COMPLETE; -import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_INITIAL; import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_INITIAL_MESSAGE_ID; import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_START; @@ -64,12 +61,11 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction lastMessageId = new AtomicReference<>(); private final AtomicLong latestPublishTime = new AtomicLong(0); private final AtomicLong latestReceiveTime = new AtomicLong(0); - private final RateMetric messageReceiveMetric; - private final MessageSizeSizeMetric messageSizeReceiveMetric; private final Map messageIdMap = new ConcurrentHashMap<>(); private boolean running = true; - private PulsarClient client = null; - private Reader reader = null; + + private transient Counter messageReceiveCounter; + private transient Counter messageReceiveBytesCounter; public PulsarMessageSourceReader(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) { logger.info("Use PulsarMessageSourceReader"); @@ -77,27 +73,6 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction context) throws Exception { String queryUrl = StrUtil.format( "{}/api/message_id?flink_job_id={}&alias={}", LoadBalance.getCustomPublishUrl(globalConfiguration), flinkJob.getId(), tableMeta.getAlias() ); - logger.info("Query url: {}", queryUrl); String messageId = Failsafe.with(MESSAGE_ID_RETRY) .onFailure(event -> { if (ObjectUtil.isNotNull(event.getException())) { @@ -130,45 +107,57 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction context) throws Exception { - String currentValue = null; - while (running) { - synchronized (context.getCheckpointLock()) { - Message message; - try { - message = reader.readNext(); - if (ObjectUtil.isNotNull(message)) { - String value = message.getValue(); - currentValue = value; - if (ObjectUtil.isEmpty(value)) { - logger.warn("{} {}", message.getValue(), message.getMessageId()); - } - synchronized (context.getCheckpointLock()) { - context.collect(value); - } - - if (RecordHelper.isNotVersionUpdateRecord(value)) { - latestPublishTime.set(message.getPublishTime()); - latestReceiveTime.set(Instant.now().toEpochMilli()); - } - lastMessageId.set(message.getMessageId()); - - messageReceiveMetric.increment(); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(tableMeta.getPulsarAddress()) + .build()) { + try (Reader reader = client.newReader(new StringSchema()) + .topic(topic) + .receiverQueueSize(10000) + .subscriptionName(NameHelper.pulsarSubscriptionName(flinkJob.getId(), tableMeta.getAlias(), globalConfiguration.getSignature())) + .startMessageId(lastMessageId.get()) + .startMessageIdInclusive() + .create()) { + String currentValue = null; + while (running) { + synchronized (context.getCheckpointLock()) { + Message message; try { - messageSizeReceiveMetric.increment(message.getValue().getBytes().length); + message = reader.readNext(); + if (ObjectUtil.isNotNull(message)) { + String value = message.getValue(); + currentValue = value; + if (ObjectUtil.isEmpty(value)) { + logger.warn("{} {}", message.getValue(), message.getMessageId()); + } + synchronized (context.getCheckpointLock()) { + context.collect(value); + } + + if (RecordHelper.isNotVersionUpdateRecord(value)) { + latestPublishTime.set(message.getPublishTime()); + latestReceiveTime.set(Instant.now().toEpochMilli()); + } + lastMessageId.set(message.getMessageId()); + + messageReceiveCounter.inc(); + try { + messageReceiveBytesCounter.inc(message.getValue().getBytes().length); + } catch (Throwable t) { + logger.warn("Parse message size failure", t); + } + } } catch (Throwable t) { - logger.warn("Parse message size failure", t); + throw new Exception("Read message failure, current value: " + currentValue, t); } } - } catch (Throwable t) { - throw new Exception("Read message failure, current value: " + currentValue, t); } } + } catch (Throwable exception) { + logger.error(StrUtil.format("Any error ({})", tableMeta.getAlias()), exception); + throw exception; } } @@ -177,25 +166,8 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction> implements CheckpointedFunction { +public class Record2RowDataFunction extends RichMapFunction> { private static final Logger logger = LoggerFactory.getLogger(Record2RowDataFunction.class); private final GlobalConfiguration globalConfiguration; - private final TableMeta tableMeta; - private final CountMetric changeFilterMetric; - private final CountMetric changePartitionMetric; - private final ObjectMapper mapper = JacksonUtils.getMapper(); private final FlinkJob flinkJob; + private final TableMeta tableMeta; + private final ObjectMapper mapper = JacksonUtils.getMapper(); private Schema schema; + private transient Counter changeFilterCounter; + private transient Counter changePartitionCounter; + public Record2RowDataFunction(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) { this.globalConfiguration = globalConfiguration; this.flinkJob = flinkJob; this.tableMeta = tableMeta; - - changeFilterMetric = new CountMetric( - globalConfiguration, - Constants.METRICS_SYNC_SOURCE_CHANGE_FILTER, - flinkJob, tableMeta - ); - changePartitionMetric = new CountMetric( - globalConfiguration, - Constants.METRICS_SYNC_SOURCE_CHANGE_PARTITION, - flinkJob, tableMeta - ); - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - schema = SyncUtils.avroSchemaWithExtraFields(tableMeta); } @Override @@ -79,32 +59,10 @@ public class Record2RowDataFunction extends RichMapFunction current) { - List fields = schema.getFields(); - GenericRowData data = new GenericRowData(fields.size()); - for (int index = 0; index < fields.size(); index++) { - Schema.Field field = fields.get(index); - // 如果是telepg的话,字段名就要统一改成小写,上游不规范,下游擦屁股 - Object value = current.getOrDefault(Constants.FIELD_COVERT.apply(tableMeta, field.name()), null); - if (field.schema().getType().equals(Schema.Type.STRING) - || (field.schema().isUnion() && field.schema().getTypes().contains(Schema.create(Schema.Type.STRING))) - || value instanceof String) { - data.setField(index, StringData.fromString((String) value)); - } else { - data.setField(index, value); - } - } - return data; + schema = SyncUtils.avroSchemaWithExtraFields(tableMeta); } private Boolean isFilterOut(TableMeta tableMeta, Map current) { @@ -143,14 +101,14 @@ public class Record2RowDataFunction extends RichMapFunction PUBLISH_RETRY = RetryPolicy.builder() - .handle(Exception.class) - .withDelay(Duration.ofSeconds(1)) - .withMaxAttempts(5) - .build(); - private final GlobalConfiguration globalConfiguration; - private final List lineCache = ListUtil.toList(); - private final LongAdder autoPublishCount = new LongAdder(); - private List requests = new ArrayList<>(); - - public AbstractMetric(GlobalConfiguration globalConfiguration) { - this.globalConfiguration = globalConfiguration; - } - - public void setRequests(HttpMetricsRequest... requests) { - setRequests(ListUtil.toList(requests)); - } - - public void setRequests(List requests) { - this.requests = requests; - } - - public void addRequest(HttpMetricsRequest request) { - this.requests.add(request); - } - - public void addRequests(HttpMetricsRequest... requests) { - addRequests(ListUtil.toList(requests)); - } - - public void addRequests(List requests) { - this.requests.addAll(requests); - } - - @Override - public void addTag(String key, String value) { - requests.forEach(request -> request.addTag(key, value)); - } - - @Override - public void makePoint(boolean autoPublish, int batch) { - if (!globalConfiguration.getMetricEnable()) { - return; - } - if (autoPublish) { - if (autoPublishCount.sum() >= batch) { - publish(); - } - autoPublishCount.increment(); - } - makePoint(); - } - - public synchronized void publish() { - if (!globalConfiguration.getMetricEnable()) { - return; - } - try { - requests.stream() - .filter(request -> !request.isEmpty()) - .map(request -> { - try { - String data = MAPPER.writeValueAsString(request); - request.clear(); - return data; - } catch (JsonProcessingException e) { - logger.warn("Parse metrics failure: " + request, e); - } - return null; - }) - .filter(Objects::nonNull) - .forEach(lineCache::add); - if (lineCache.isEmpty()) { - return; - } - String lines = String.join("\n", lineCache); - logger.debug("Push metrics: \n{}", lines); - HttpResponse response = Failsafe.with(PUBLISH_RETRY) - .get(() -> HttpUtil.createPost(globalConfiguration.getMetricPublishUrl()) - .body(lines) - .basicAuth(Constants.VICTORIA_USERNAME, Constants.VICTORIA_PASSWORD) - .timeout(globalConfiguration.getMetricPublishTimeout()) - .execute()); - if (response.isOk()) { - logger.debug("Metrics push success"); - } - } catch (Throwable throwable) { - logger.warn("Push metrics failure, url: " + globalConfiguration.getMetricPublishUrl(), throwable); - } finally { - lineCache.clear(); - autoPublishCount.reset(); - } - } -} diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/CountMetric.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/CountMetric.java deleted file mode 100644 index 3cc9055..0000000 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/CountMetric.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.lanyuanxiaoyao.service.sync.metrics; - -import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.map.MapUtil; -import com.lanyuanxiaoyao.service.common.entity.FlinkJob; -import com.lanyuanxiaoyao.service.common.entity.TableMeta; -import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; -import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.LongAdder; - -/** - * 基础类 - * - * @author ZhangJiacheng - * @date 2022-06-13 - */ -public class CountMetric extends AbstractMetric { - private final LongAdder count = new LongAdder(); - - private final HttpMetricsRequest countMetrics; - - public CountMetric(GlobalConfiguration globalConfiguration, String name) { - this(globalConfiguration, name, MapUtil.empty()); - } - - public CountMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta) { - this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)).build()); - } - - public CountMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, String extraTagKey, String extraTagValue) { - this(globalConfiguration, name, job, meta, MapUtil.of(extraTagKey, extraTagValue)); - } - - public CountMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, Map tags) { - this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)) - .putAll(tags) - .build()); - } - - public CountMetric(GlobalConfiguration globalConfiguration, String name, Map tags) { - super(globalConfiguration); - countMetrics = new HttpMetricsRequest( - name + "_count", - MapUtil.builder().putAll(tags).build() - ); - setRequests(countMetrics); - } - - public void increment() { - count.increment(); - } - - @Override - public void makePoint() { - double count = this.count.doubleValue(); - if (count != 0) { - countMetrics.addMetric(count); - } - } - - @Override - public List getMetrics() { - return ListUtil.toList(countMetrics); - } -} diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/HttpMetricsRequest.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/HttpMetricsRequest.java deleted file mode 100644 index 43cd4d7..0000000 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/HttpMetricsRequest.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.lanyuanxiaoyao.service.sync.metrics; - -import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.map.MapUtil; -import java.beans.Transient; -import java.io.Serializable; -import java.time.Instant; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * 指标实体类 - * - * @author ZhangJiacheng - * @date 2022-06-13 - */ -public class HttpMetricsRequest implements Serializable { - private final Map metric; - private final List values; - private final List timestamps; - - private final Lock lock = new ReentrantLock(); - - public HttpMetricsRequest(String name, Map metrics) { - this.metric = MapUtil.builder() - .put("__name__", name) - .build(); - this.metric.putAll(metrics); - this.values = Collections.synchronizedList(ListUtil.list(true)); - this.timestamps = Collections.synchronizedList(ListUtil.list(true)); - } - - public void addTag(String key, String value) { - this.metric.put(key, value); - } - - public void addMetric(Double value) { - addMetric(value, Instant.now().toEpochMilli()); - } - - public void addMetric(Double value, Long timestamp) { - synchronized (this) { - values.add(value); - timestamps.add(timestamp); - } - } - - public void clear() { - synchronized (this) { - this.values.clear(); - this.timestamps.clear(); - } - } - - @Transient - public boolean isEmpty() { - return this.values.isEmpty() && this.timestamps.isEmpty(); - } - - @Transient - public boolean isNonEmpty() { - return !isEmpty(); - } - - public Map getMetric() { - return metric; - } - - public List getValues() { - return values; - } - - public List getTimestamps() { - return timestamps; - } - - @Override - public String toString() { - return "MetricsItem{" + - "metrics=" + metric + - ", values=" + values + - ", timestamps=" + timestamps + - '}'; - } -} diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/MessageSizeSizeMetric.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/MessageSizeSizeMetric.java deleted file mode 100644 index 9acf771..0000000 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/MessageSizeSizeMetric.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.lanyuanxiaoyao.service.sync.metrics; - -import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.map.MapUtil; -import com.lanyuanxiaoyao.service.common.entity.FlinkJob; -import com.lanyuanxiaoyao.service.common.entity.TableMeta; -import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; -import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.DoubleAdder; -import java.util.concurrent.atomic.LongAdder; - -/** - * 基础类 - * - * @author ZhangJiacheng - * @date 2022-06-13 - */ -public class MessageSizeSizeMetric extends AbstractMetric { - private final LongAdder count = new LongAdder(); - private final DoubleAdder size = new DoubleAdder(); - - private final HttpMetricsRequest sizeMetrics; - private final HttpMetricsRequest perMessageSizeMetrics; - - public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name) { - this(globalConfiguration, name, MapUtil.empty()); - } - - public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta) { - this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)).build()); - } - - public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, String extraTagKey, String extraTagValue) { - this(globalConfiguration, name, job, meta, MapUtil.of(extraTagKey, extraTagValue)); - } - - public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, Map tags) { - this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)) - .putAll(tags) - .build()); - } - - public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, Map tags) { - super(globalConfiguration); - sizeMetrics = new HttpMetricsRequest( - name + "_total", - MapUtil.builder().putAll(tags).build() - ); - perMessageSizeMetrics = new HttpMetricsRequest( - name + "_per_message", - MapUtil.builder().putAll(tags).build() - ); - setRequests(sizeMetrics, perMessageSizeMetrics); - } - - public void increment(long size) { - this.count.increment(); - this.size.add(size); - } - - @Override - public void makePoint() { - double count = this.count.doubleValue(); - double size = this.size.doubleValue(); - if (size != 0 && count != 0) { - sizeMetrics.addMetric(size); - perMessageSizeMetrics.addMetric(size / count); - } - } - - @Override - public List getMetrics() { - return ListUtil.toList(sizeMetrics); - } -} diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/Metric.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/Metric.java deleted file mode 100644 index 145b539..0000000 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/Metric.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.lanyuanxiaoyao.service.sync.metrics; - -import java.io.Serializable; -import java.util.List; - -/** - * 指标类定义 - * - * @author ZhangJiacheng - * @date 2022-06-13 - */ -public interface Metric extends Serializable { - void addTag(String key, String value); - - void makePoint(boolean autoPublish, int batch); - - void makePoint(); - - List getMetrics(); -} diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/RateMetric.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/RateMetric.java deleted file mode 100644 index 173e152..0000000 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/metrics/RateMetric.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.lanyuanxiaoyao.service.sync.metrics; - -import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.map.MapUtil; -import com.lanyuanxiaoyao.service.common.entity.FlinkJob; -import com.lanyuanxiaoyao.service.common.entity.TableMeta; -import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; -import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils; -import java.time.Duration; -import java.time.Instant; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.LongAdder; - -/** - * 基础类 - * - * @author ZhangJiacheng - * @date 2022-06-13 - */ -public class RateMetric extends AbstractMetric { - private final LongAdder count = new LongAdder(); - - private final HttpMetricsRequest countMetrics; - private final HttpMetricsRequest millisecondMetrics; - private final HttpMetricsRequest perSecondMetrics; - - private final Instant startInstant; - - public RateMetric(GlobalConfiguration globalConfiguration, String name) { - this(globalConfiguration, name, MapUtil.empty()); - } - - public RateMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta) { - this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)).build()); - } - - public RateMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, String extraTagKey, String extraTagValue) { - this(globalConfiguration, name, job, meta, MapUtil.of(extraTagKey, extraTagValue)); - } - - public RateMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, Map tags) { - this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)) - .putAll(tags) - .build()); - } - - public RateMetric(GlobalConfiguration globalConfiguration, String name, Map tags) { - super(globalConfiguration); - - startInstant = Instant.now(); - - countMetrics = new HttpMetricsRequest( - name + "_count", - MapUtil.builder().putAll(tags).build() - ); - millisecondMetrics = new HttpMetricsRequest( - name + "_millisecond", - MapUtil.builder().putAll(tags).build() - ); - perSecondMetrics = new HttpMetricsRequest( - name + "_per_millisecond", - MapUtil.builder().putAll(tags).build() - ); - setRequests(countMetrics, millisecondMetrics, perSecondMetrics); - } - - public void increment() { - count.increment(); - } - - @Override - public void makePoint() { - double count = this.count.doubleValue(); - if (count != 0) { - long millis = Duration.between(startInstant, Instant.now()).toMillis(); - countMetrics.addMetric(count); - millisecondMetrics.addMetric((double) millis); - perSecondMetrics.addMetric(count / millis); - } - } - - @Override - public List getMetrics() { - return ListUtil.toList(countMetrics, millisecondMetrics, perSecondMetrics); - } -} diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/MetricsUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/MetricsUtils.java index b089628..45e2fe8 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/MetricsUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/MetricsUtils.java @@ -1,17 +1,14 @@ package com.lanyuanxiaoyao.service.sync.utils; -import cn.hutool.core.collection.ListUtil; import cn.hutool.core.map.MapUtil; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.TableMeta; -import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; -import com.lanyuanxiaoyao.service.sync.metrics.AbstractMetric; import java.io.Serializable; -import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,41 +21,20 @@ import org.slf4j.LoggerFactory; public class MetricsUtils implements Serializable { private static final Logger logger = LoggerFactory.getLogger(MetricsUtils.class); - public static Map commonTags(FlinkJob job, TableMeta meta) { - return MapUtil.builder() - .put(Constants.METRICS_LABEL_FLINK_JOB_ID, job.getId().toString()) - .put(Constants.METRICS_LABEL_FLINK_JOB_NAME, job.getName()) - .put(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()) - .put(Constants.METRICS_LABEL_TABLE, meta.getTable()) - .put(Constants.METRICS_LABEL_ALIAS, meta.getAlias()) - .build(); + public static Counter counter(RuntimeContext context, FlinkJob job, TableMeta meta, String name) { + return counter(context, job, meta, name, MapUtil.empty()); } - @SafeVarargs - public static void createMakePointTimer(GlobalConfiguration globalConfiguration, T... metrics) { - createMakePointTimer(globalConfiguration, ListUtil.toList(metrics)); - } - - public static void createMakePointTimer(GlobalConfiguration globalConfiguration, List metrics) { - logger.info("Create timer: {}", metrics); - new Timer().schedule(new TimerTask() { - @Override - public void run() { - for (AbstractMetric metric : metrics) { - metric.makePoint(true, globalConfiguration.getMetricPublishBatch()); - } - } - }, globalConfiguration.getMetricPublishDelay(), globalConfiguration.getMetricPublishPeriod()); - } - - @SafeVarargs - public static void publishAllMetrics(T... metrics) { - publishAllMetrics(ListUtil.toList(metrics)); - } - - public static void publishAllMetrics(List metrics) { - for (AbstractMetric metric : metrics) { - metric.publish(); + public static Counter counter(RuntimeContext context, FlinkJob job, TableMeta meta, String name, Map tags) { + MetricGroup metricGroup = context.getMetricGroup() + .addGroup(Constants.METRICS_LABEL_FLINK_JOB_ID, job.getId().toString()) + .addGroup(Constants.METRICS_LABEL_FLINK_JOB_NAME, job.getName()) + .addGroup(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()) + .addGroup(Constants.METRICS_LABEL_TABLE, meta.getTable()) + .addGroup(Constants.METRICS_LABEL_ALIAS, meta.getAlias()); + for (Map.Entry entry : tags.entrySet()) { + metricGroup.addGroup(entry.getKey(), entry.getValue()); } + return metricGroup.counter(name); } }