feat(queue): 增加队列指标
This commit is contained in:
@@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.queue.controller;
|
|||||||
|
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
|
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
|
||||||
import com.lanyuanxiaoyao.service.queue.configuration.Constants;
|
import com.lanyuanxiaoyao.service.queue.configuration.Constants;
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -11,6 +12,7 @@ import java.util.function.Function;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -21,8 +23,13 @@ import org.springframework.web.bind.annotation.*;
|
|||||||
*/
|
*/
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("deque")
|
@RequestMapping("deque")
|
||||||
public class DequeController implements QueueOperator {
|
public class DequeController extends QueueOperator {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(DequeController.class);
|
private static final Logger logger = LoggerFactory.getLogger(DequeController.class);
|
||||||
|
private final MeterRegistry meterRegistry;
|
||||||
|
|
||||||
|
public DequeController(MeterRegistry meterRegistry) {
|
||||||
|
this.meterRegistry = meterRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
private ConcurrentLinkedDeque<QueueItem<?>> generateDeque(String name) {
|
private ConcurrentLinkedDeque<QueueItem<?>> generateDeque(String name) {
|
||||||
return new ConcurrentLinkedDeque<>();
|
return new ConcurrentLinkedDeque<>();
|
||||||
@@ -36,14 +43,18 @@ public class DequeController implements QueueOperator {
|
|||||||
return function.apply(Constants.DEQUE_MAP.computeIfAbsent(name, this::generateDeque));
|
return function.apply(Constants.DEQUE_MAP.computeIfAbsent(name, this::generateDeque));
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/names")
|
@Scheduled(fixedDelay = 5_000, initialDelay = 15_000)
|
||||||
@Override
|
@Override
|
||||||
|
void updateMetrics() {
|
||||||
|
updateMetrics(meterRegistry, "deque", Constants.DEQUE_MAP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("/names")
|
||||||
public Enumeration<String> names() {
|
public Enumeration<String> names() {
|
||||||
return Constants.DEQUE_MAP.keys();
|
return Constants.DEQUE_MAP.keys();
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/all")
|
@GetMapping("/all")
|
||||||
@Override
|
|
||||||
public List<QueueItem<?>> all(@RequestParam("name") String name) {
|
public List<QueueItem<?>> all(@RequestParam("name") String name) {
|
||||||
return makeReturn(name, ArrayList::new);
|
return makeReturn(name, ArrayList::new);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.queue.controller;
|
|||||||
|
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
|
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
|
||||||
import com.lanyuanxiaoyao.service.queue.configuration.Constants;
|
import com.lanyuanxiaoyao.service.queue.configuration.Constants;
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -11,6 +12,7 @@ import java.util.function.Function;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -19,8 +21,13 @@ import org.springframework.web.bind.annotation.*;
|
|||||||
*/
|
*/
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("queue")
|
@RequestMapping("queue")
|
||||||
public class QueueController implements QueueOperator {
|
public class QueueController extends QueueOperator {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(QueueController.class);
|
private static final Logger logger = LoggerFactory.getLogger(QueueController.class);
|
||||||
|
private final MeterRegistry meterRegistry;
|
||||||
|
|
||||||
|
public QueueController(MeterRegistry meterRegistry) {
|
||||||
|
this.meterRegistry = meterRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
private PriorityBlockingQueue<QueueItem<?>> generateDeque(String name) {
|
private PriorityBlockingQueue<QueueItem<?>> generateDeque(String name) {
|
||||||
return new PriorityBlockingQueue<>();
|
return new PriorityBlockingQueue<>();
|
||||||
@@ -34,14 +41,18 @@ public class QueueController implements QueueOperator {
|
|||||||
return function.apply(Constants.QUEUE_MAP.computeIfAbsent(name, this::generateDeque));
|
return function.apply(Constants.QUEUE_MAP.computeIfAbsent(name, this::generateDeque));
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/names")
|
@Scheduled(fixedDelay = 5_000, initialDelay = 15_000)
|
||||||
@Override
|
@Override
|
||||||
|
public void updateMetrics() {
|
||||||
|
updateMetrics(meterRegistry, "queue", Constants.QUEUE_MAP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("/names")
|
||||||
public Enumeration<String> names() {
|
public Enumeration<String> names() {
|
||||||
return Constants.QUEUE_MAP.keys();
|
return Constants.QUEUE_MAP.keys();
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/all")
|
@GetMapping("/all")
|
||||||
@Override
|
|
||||||
public List<QueueItem<?>> all(@RequestParam("name") String name) {
|
public List<QueueItem<?>> all(@RequestParam("name") String name) {
|
||||||
return makeReturn(name, ArrayList::new);
|
return makeReturn(name, ArrayList::new);
|
||||||
}
|
}
|
||||||
@@ -77,7 +88,7 @@ public class QueueController implements QueueOperator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/remove_id")
|
@GetMapping("/remove_id")
|
||||||
public Boolean remove(@RequestParam("name") String name, @RequestParam("id") String id) {
|
public Boolean removeId(@RequestParam("name") String name, @RequestParam("id") String id) {
|
||||||
return makeReturn(name, queue -> queue.remove(new QueueItem<>(id)));
|
return makeReturn(name, queue -> queue.remove(new QueueItem<>(id)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
package com.lanyuanxiaoyao.service.queue.controller;
|
package com.lanyuanxiaoyao.service.queue.controller;
|
||||||
|
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
|
import io.micrometer.core.instrument.MeterRegistry;
|
||||||
import java.util.Enumeration;
|
import io.micrometer.core.instrument.Tag;
|
||||||
import java.util.List;
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.eclipse.collections.api.factory.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 队列通用操作
|
* 队列通用操作
|
||||||
@@ -10,8 +12,17 @@ import java.util.List;
|
|||||||
* @author lanyuanxiaoyao
|
* @author lanyuanxiaoyao
|
||||||
* @date 2023-05-07
|
* @date 2023-05-07
|
||||||
*/
|
*/
|
||||||
public interface QueueOperator {
|
public abstract class QueueOperator {
|
||||||
Enumeration<String> names();
|
abstract void updateMetrics();
|
||||||
|
|
||||||
List<QueueItem<?>> all(String name);
|
protected void updateMetrics(MeterRegistry registry, String name, Map<String, ? extends Collection<?>> map) {
|
||||||
|
map.forEach((queue, collection) -> registry.gaugeCollectionSize(
|
||||||
|
"service_queue_size",
|
||||||
|
Lists.immutable.of(
|
||||||
|
Tag.of("type", name),
|
||||||
|
Tag.of("name", queue)
|
||||||
|
),
|
||||||
|
collection
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user