From 616eb1e5144709106e7bc7b5705a1e2881938f48 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Mon, 15 Jan 2024 10:41:01 +0800 Subject: [PATCH] =?UTF-8?q?feat(queue):=20=E8=BF=81=E7=A7=BBqueue=E5=88=B0?= =?UTF-8?q?hudi=20service=E9=A1=B9=E7=9B=AE=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bin/build-all.sh | 2 +- bin/build-queue.sh | 4 + pom.xml | 1 + .../src/main/resources/application.yml | 6 + .../configuration/entity/queue/QueueItem.java | 129 ++++++++++ .../service/forest/service/QueueService.java | 2 +- service-queue/pom.xml | 39 +++ .../service/queue/QueueApplication.java | 108 ++++++++ .../configuration/AutoSaveProperties.java | 39 +++ .../queue/configuration/Constants.java | 25 ++ .../queue/controller/DequeController.java | 241 ++++++++++++++++++ .../queue/controller/QueueController.java | 155 +++++++++++ .../queue/controller/QueueOperator.java | 17 ++ .../queue/service/AutoSaveService.java | 67 +++++ .../src/main/resources/application.yml | 11 + .../src/main/resources/logback-spring.xml | 51 ++++ 16 files changed, 895 insertions(+), 2 deletions(-) create mode 100755 bin/build-queue.sh create mode 100644 service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/queue/QueueItem.java create mode 100644 service-queue/pom.xml create mode 100644 service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/QueueApplication.java create mode 100644 service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/configuration/AutoSaveProperties.java create mode 100644 service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/configuration/Constants.java create mode 100644 service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/DequeController.java create mode 100644 service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueController.java create mode 100644 service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueOperator.java create mode 100644 service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/service/AutoSaveService.java create mode 100644 service-queue/src/main/resources/application.yml create mode 100644 service-queue/src/main/resources/logback-spring.xml diff --git a/bin/build-all.sh b/bin/build-all.sh index 7df8d43..670e97f 100644 --- a/bin/build-all.sh +++ b/bin/build-all.sh @@ -1,7 +1,7 @@ #!/bin/bash mvn -pl service-dependencies,service-configuration,service-forest,service-executor,service-executor/service-executor-core clean source:jar deploy -D skipTests -P local -s ~/.m2/settings-development.xml -mvn -pl service-gateway clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml +mvn -pl service-gateway,service-queue clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml mvn -pl service-flink-query,service-info-query,service-loki-query,service-pulsar-query,service-yarn-query,service-zookeeper-query,service-web clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml -P b2b12 mvn -pl service-hudi-query,service-executor/service-executor-manager,service-executor/service-executor-task clean package -D skipTests -s ~/.m2/settings-development.xml -P b2b12 diff --git a/bin/build-queue.sh b/bin/build-queue.sh new file mode 100755 index 0000000..a81f7f4 --- /dev/null +++ b/bin/build-queue.sh @@ -0,0 +1,4 @@ +#!/bin/bash +mvn -pl service-dependencies,service-configuration clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml +mvn -pl service-queue clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml +ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-queue/target/service-queue-1.0.0-SNAPSHOT.jar \ No newline at end of file diff --git a/pom.xml b/pom.xml index ba1e2fb..6682932 100644 --- a/pom.xml +++ b/pom.xml @@ -11,6 +11,7 @@ service-configuration service-gateway + service-queue service-hudi-query service-info-query service-pulsar-query diff --git a/service-cli/service-cli-runner/src/main/resources/application.yml b/service-cli/service-cli-runner/src/main/resources/application.yml index 8af8d02..779dc35 100644 --- a/service-cli/service-cli-runner/src/main/resources/application.yml +++ b/service-cli/service-cli-runner/src/main/resources/application.yml @@ -19,6 +19,12 @@ deploy: data_save_location: ${deploy.runtime.data-path} - name: service-gateway source-jar: service-gateway-1.0.0-SNAPSHOT.jar + - name: service-queue + source-jar: service-queue-1.0.0-SNAPSHOT.jar + replicas: 1 + arguments: + data_save_enable: true + data_save_location: ${deploy.runtime.data-path} - name: service-scheduler source-jar: service-scheduler-1.0.0-SNAPSHOT.jar replicas: 1 diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/queue/QueueItem.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/queue/QueueItem.java new file mode 100644 index 0000000..f96908c --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/queue/QueueItem.java @@ -0,0 +1,129 @@ +package com.lanyuanxiaoyao.service.configuration.entity.queue; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +/** + * 队列对象 + * + * @author lanyuanxiaoyao + * @date 2023-05-06 + */ +public final class QueueItem implements Comparable> { + private final String traceId; + private final Long createTime; + + private String id; + private Map metadata; + private Integer priority; + private E data; + + public QueueItem() { + this.traceId = UUID.randomUUID().toString(); + this.createTime = Instant.now().toEpochMilli(); + } + + public QueueItem(String id) { + this(); + this.id = id; + } + + public QueueItem(String id, Integer priority) { + this(id); + this.priority = priority; + } + + public QueueItem(String id, Integer priority, E data) { + this(id, priority); + this.data = data; + } + + public QueueItem(String id, Map metadata, Integer priority, E data) { + this(id, priority, data); + this.metadata = metadata; + } + + public String getTraceId() { + return traceId; + } + + public Long getCreateTime() { + return createTime; + } + + public String getId() { + return id; + } + + public Map getMetadata() { + return metadata == null ? new HashMap<>() : metadata; + } + + public String getMetadata(String key) { + return metadata == null ? null : metadata.get(key); + } + + public String getMetadata(String key, String defaultValue) { + return metadata == null ? defaultValue : metadata.getOrDefault(key, defaultValue); + } + + public void setMetadata(String key, String value) { + if (metadata == null) { + metadata = new HashMap<>(); + } + metadata.put(key, value); + } + + public Integer getPriority() { + return priority; + } + + public E getData() { + return data; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + QueueItem queueItem = (QueueItem) o; + + return id.equals(queueItem.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return "QueueItem{" + + "traceId='" + traceId + '\'' + + ", createTime=" + createTime + + ", id='" + id + '\'' + + ", metadata=" + metadata + + ", priority=" + priority + + ", data=" + data + + '}'; + } + + @Override + public int compareTo(QueueItem o) { + Integer p1 = this.priority; + Integer p2 = o.priority; + if (Objects.equals(p1, p2)) { + return 0; + } else if (p1 == null) { + return 1; + } else if (p2 == null) { + return -1; + } else { + return Integer.compare(p2, p1); + } + } +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/QueueService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/QueueService.java index e08f2cc..becd633 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/QueueService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/QueueService.java @@ -13,7 +13,7 @@ import org.eclipse.collections.api.list.ImmutableList; * @author lanyuanxiaoyao * @date 2023-05-07 */ -@BaseRequest(baseURL = "http://center-queue/queue") +@BaseRequest(baseURL = "http://service-queue/queue") public interface QueueService { @Get("/names") ImmutableList names(); diff --git a/service-queue/pom.xml b/service-queue/pom.xml new file mode 100644 index 0000000..ae58387 --- /dev/null +++ b/service-queue/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + com.lanyuanxiaoyao + hudi-service + 1.0.0-SNAPSHOT + + + service-queue + + + + com.lanyuanxiaoyao + service-dependencies + 1.0.0-SNAPSHOT + + + com.lanyuanxiaoyao + service-configuration + 1.0.0-SNAPSHOT + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/QueueApplication.java b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/QueueApplication.java new file mode 100644 index 0000000..e256eaa --- /dev/null +++ b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/QueueApplication.java @@ -0,0 +1,108 @@ +package com.lanyuanxiaoyao.service.queue; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; +import com.lanyuanxiaoyao.service.queue.configuration.AutoSaveProperties; +import com.lanyuanxiaoyao.service.queue.configuration.Constants; +import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties; +import java.io.IOException; +import java.net.URLDecoder; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.PriorityBlockingQueue; +import javax.annotation.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 启动类 + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +@EnableDiscoveryClient +@EnableConfigurationProperties +@EnableEncryptableProperties +@SpringBootApplication(scanBasePackages = {"com.lanyuanxiaoyao.service"}) +@EnableScheduling +@RestController +@RequestMapping("queue") +public class QueueApplication implements ApplicationRunner { + private static final Logger logger = LoggerFactory.getLogger(QueueApplication.class); + + public static void main(String[] args) { + SpringApplication.run(QueueApplication.class, args); + } + + @Resource + private AutoSaveProperties autoSaveProperties; + @Resource + private Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder; + @Resource + private DiscoveryClient client; + + @Override + public void run(ApplicationArguments args) throws Exception { + // 队列不支持分布式启动,只能启动一个 + if (client.getServices().contains("queue")) { + logger.warn("Wait for 15s"); + Thread.sleep(15000); + logger.error("Found queue exists, exit..."); + System.exit(1); + } + + if (autoSaveProperties.getLocation() != null && !"".equals(autoSaveProperties.getLocation())) { + ObjectMapper mapper = jackson2ObjectMapperBuilder.build(); + + String dequeData = load(Constants.DEQUE_SAVE_FILENAME); + if (Objects.nonNull(dequeData)) { + ConcurrentHashMap>> map = mapper.readValue(dequeData, new TypeReference>>>() { + }); + Constants.DEQUE_MAP.putAll(map); + } + + String queueData = load(Constants.QUEUE_SAVE_FILENAME); + if (Objects.nonNull(queueData)) { + ConcurrentHashMap>> map = mapper.readValue(queueData, new TypeReference>>>() { + }); + Constants.QUEUE_MAP.putAll(map); + } + + // 如果启用了自动保存,数据加载完成后才开始自动保存 + if (autoSaveProperties.getEnable()) { + Constants.AUTO_SAVE.set(true); + } else { + logger.warn("Auto save is disabled"); + } + } else { + logger.warn("Save path is not set"); + } + } + + private String load(String name) throws IOException { + Path saveDataPath = Paths.get(autoSaveProperties.getLocation(), name); + if (Files.exists(saveDataPath)) { + String data = new String(Files.readAllBytes(saveDataPath)); + data = URLDecoder.decode(data, "utf-8"); + return data; + } + logger.warn("Cannot found any data from {}", saveDataPath); + return null; + } +} diff --git a/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/configuration/AutoSaveProperties.java b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/configuration/AutoSaveProperties.java new file mode 100644 index 0000000..a50a045 --- /dev/null +++ b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/configuration/AutoSaveProperties.java @@ -0,0 +1,39 @@ +package com.lanyuanxiaoyao.service.queue.configuration; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @author lanyuanxiaoyao + * @date 2023-05-06 + */ +@Component +@ConfigurationProperties("data.save") +public class AutoSaveProperties { + private Boolean enable = false; + private String location = "data"; + + public void setEnable(Boolean enable) { + this.enable = enable; + } + + public void setLocation(String location) { + this.location = location; + } + + public Boolean getEnable() { + return enable; + } + + public String getLocation() { + return location; + } + + @Override + public String toString() { + return "AutoSaveProperty{" + + "enable=" + enable + + ", location='" + location + '\'' + + '}'; + } +} diff --git a/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/configuration/Constants.java b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/configuration/Constants.java new file mode 100644 index 0000000..e7a382b --- /dev/null +++ b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/configuration/Constants.java @@ -0,0 +1,25 @@ +package com.lanyuanxiaoyao.service.queue.configuration; + +import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author lanyuanxiaoyao + * @date 2023-05-06 + */ +public interface Constants { + AtomicBoolean AUTO_SAVE = new AtomicBoolean(false); + + ConcurrentHashMap>> DEQUE_MAP = new ConcurrentHashMap<>(10); + ConcurrentHashMap>> QUEUE_MAP = new ConcurrentHashMap<>(10); + + String DEQUE_SAVE_FILENAME = "save_data_deque"; + String QUEUE_SAVE_FILENAME = "save_data_deque"; + + TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS; + long POLL_TIMEOUT = 1; +} diff --git a/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/DequeController.java b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/DequeController.java new file mode 100644 index 0000000..7babfdb --- /dev/null +++ b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/DequeController.java @@ -0,0 +1,241 @@ +package com.lanyuanxiaoyao.service.queue.controller; + +import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; +import com.lanyuanxiaoyao.service.queue.configuration.Constants; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.*; + +/** + * 双向队列 + * + * @author lanyuanxiaoyao + * @date 2023-05-06 + */ +@RestController +@RequestMapping("deque") +public class DequeController implements QueueOperator { + private static final Logger logger = LoggerFactory.getLogger(DequeController.class); + + private ConcurrentLinkedDeque> generateDeque(String name) { + return new ConcurrentLinkedDeque<>(); + } + + private void makeVoid(String name, Consumer>> consumer) { + consumer.accept(Constants.DEQUE_MAP.computeIfAbsent(name, this::generateDeque)); + } + + private T makeReturn(String name, Function>, T> function) { + return function.apply(Constants.DEQUE_MAP.computeIfAbsent(name, this::generateDeque)); + } + + @GetMapping("/names") + @Override + public Enumeration names() { + return Constants.DEQUE_MAP.keys(); + } + + @GetMapping("/all/{name}") + @Override + public List> all(@PathVariable("name") String name) { + return makeReturn(name, ArrayList::new); + } + + @PostMapping("/add_first/{name}") + public void addFirst(@PathVariable("name") String name, @RequestBody QueueItem data) { + makeVoid(name, deque -> deque.addFirst(data)); + } + + @PostMapping("/add_last/{name}") + public void addLast(@PathVariable("name") String name, @RequestBody QueueItem data) { + makeVoid(name, deque -> deque.addLast(data)); + } + + @PostMapping("/offer_first/{name}") + public Boolean offerFirst(@PathVariable("name") String name, @RequestBody QueueItem data) { + return makeReturn(name, deque -> deque.offerFirst(data)); + } + + @PostMapping("/offer_last/{name}") + public Boolean offerLast(@PathVariable("name") String name, @RequestBody QueueItem data) { + return makeReturn(name, deque -> deque.offerLast(data)); + } + + @GetMapping("/remove_first/{name}") + public QueueItem removeFirst(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::removeFirst); + } + + @GetMapping("/remove_last/{name}") + public QueueItem removeLast(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::removeLast); + } + + @GetMapping("/poll_first/{name}") + public QueueItem pollFirst(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::pollFirst); + } + + @GetMapping("/poll_last/{name}") + public QueueItem pollLast(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::pollLast); + } + + @GetMapping("/get_first/{name}") + public QueueItem getFirst(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::getFirst); + } + + @GetMapping("/get_last/{name}") + public QueueItem getLast(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::getLast); + } + + @GetMapping("/peek_first/{name}") + public QueueItem peekFirst(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::peekFirst); + } + + @GetMapping("/peek_last/{name}") + public QueueItem peekLast(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::peekLast); + } + + @PostMapping("/remove_first_occurrence/{name}") + public Boolean removeFirstOccurrence(@PathVariable("name") String name, @RequestBody String data) { + return makeReturn(name, deque -> deque.removeFirstOccurrence(data)); + } + + @GetMapping("/remove_first_occurrence_id/{name}/{id}") + public Boolean removeFirstOccurrenceId(@PathVariable("name") String name, @PathVariable("id") String id) { + return makeReturn(name, deque -> deque.removeFirstOccurrence(new QueueItem<>(id))); + } + + @PostMapping("/remove_last_occurrence/{name}") + public Boolean removeLastOccurrence(@PathVariable("name") String name, @RequestBody String data) { + return makeReturn(name, deque -> deque.removeLastOccurrence(data)); + } + + @GetMapping("/remove_last_occurrence_id/{name}/{id}") + public Boolean removeLastOccurrenceId(@PathVariable("name") String name, @PathVariable("id") String id) { + return makeReturn(name, deque -> deque.removeLastOccurrence(new QueueItem<>(id))); + } + + @PostMapping("/add/{name}") + public Boolean add(@PathVariable("name") String name, @RequestBody QueueItem data) { + return makeReturn(name, deque -> deque.add(data)); + } + + @PostMapping("/offer/{name}") + public Boolean offer(@PathVariable("name") String name, @RequestBody QueueItem data) { + return makeReturn(name, deque -> deque.offer(data)); + } + + @GetMapping("/remove/{name}") + public QueueItem remove(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::remove); + } + + @GetMapping("/poll/{name}") + public QueueItem poll(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::poll); + } + + @GetMapping("/element/{name}") + public QueueItem element(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::element); + } + + @GetMapping("/peek/{name}") + public QueueItem peek(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::peek); + } + + @PostMapping("/push/{name}") + public void push(@PathVariable("name") String name, @RequestBody QueueItem data) { + makeVoid(name, deque -> deque.push(data)); + } + + @GetMapping("/pop/{name}") + public QueueItem pop(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::pop); + } + + @PostMapping("/remove/{name}") + public Boolean remove(@PathVariable("name") String name, @RequestBody QueueItem data) { + return makeReturn(name, deque -> deque.remove(data)); + } + + @GetMapping("/remove_id/{name}/{id}") + public Boolean removeId(@PathVariable("name") String name, @PathVariable("id") String id) { + return makeReturn(name, deque -> deque.remove(new QueueItem<>(id))); + } + + @PostMapping("/contains_all/{name}") + public Boolean containsAll(@PathVariable("name") String name, @RequestBody List> data) { + return makeReturn(name, deque -> deque.containsAll(data)); + } + + @PostMapping("/contains_all_id/{name}") + public Boolean containsAllId(@PathVariable("name") String name, @RequestBody List ids) { + List> containsItems = ids.stream().map(QueueItem::new).collect(Collectors.toList()); + return makeReturn(name, deque -> deque.containsAll(containsItems)); + } + + @PostMapping("/add_all/{name}") + public Boolean addAll(@PathVariable("name") String name, @RequestBody List> data) { + return makeReturn(name, deque -> deque.addAll(data)); + } + + @PostMapping("/remove_all/{name}") + public Boolean removeAll(@PathVariable("name") String name, @RequestBody List> data) { + return makeReturn(name, deque -> deque.removeAll(data)); + } + + @PostMapping("/remove_all_id/{name}") + public Boolean removeAllId(@PathVariable("name") String name, @RequestBody List ids) { + return makeReturn(name, deque -> deque.removeIf(item -> ids.contains(item.getId()))); + } + + @GetMapping("/remove_all_id/{name}/{id}") + public Boolean removeAllId(@PathVariable("name") String name, @PathVariable String id) { + return makeReturn(name, deque -> deque.removeIf(item -> item.getId().equals(id))); + } + + @PostMapping("/retain_all/{name}") + public Boolean retainAll(@PathVariable("name") String name, @RequestBody List> data) { + return makeReturn(name, deque -> deque.retainAll(data)); + } + + @GetMapping("/clear/{name}") + public void clear(@PathVariable("name") String name) { + makeVoid(name, ConcurrentLinkedDeque::clear); + } + + @PostMapping("/contains/{name}") + public Boolean contains(@PathVariable("name") String name, @RequestBody QueueItem data) { + return makeReturn(name, deque -> deque.contains(data)); + } + + @GetMapping("/contains_id/{name}/{id}") + public Boolean containsId(@PathVariable("name") String name, @PathVariable("id") String id) { + return makeReturn(name, deque -> deque.contains(new QueueItem<>(id))); + } + + @GetMapping("/size/{name}") + public Integer size(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::size); + } + + @GetMapping("/is_empty/{name}") + public Boolean isEmpty(@PathVariable("name") String name) { + return makeReturn(name, ConcurrentLinkedDeque::isEmpty); + } +} diff --git a/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueController.java b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueController.java new file mode 100644 index 0000000..bef9fae --- /dev/null +++ b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueController.java @@ -0,0 +1,155 @@ +package com.lanyuanxiaoyao.service.queue.controller; + +import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; +import com.lanyuanxiaoyao.service.queue.configuration.Constants; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.*; + +/** + * @author lanyuanxiaoyao + * @date 2023-05-06 + */ +@RestController +@RequestMapping("queue") +public class QueueController implements QueueOperator { + private static final Logger logger = LoggerFactory.getLogger(QueueController.class); + + private PriorityBlockingQueue> generateDeque(String name) { + return new PriorityBlockingQueue<>(); + } + + private void makeVoid(String name, Consumer>> consumer) { + consumer.accept(Constants.QUEUE_MAP.computeIfAbsent(name, this::generateDeque)); + } + + private T makeReturn(String name, Function>, T> function) { + return function.apply(Constants.QUEUE_MAP.computeIfAbsent(name, this::generateDeque)); + } + + @GetMapping("/names") + @Override + public Enumeration names() { + return Constants.QUEUE_MAP.keys(); + } + + @GetMapping("/all/{name}") + @Override + public List> all(@PathVariable("name") String name) { + return makeReturn(name, ArrayList::new); + } + + @GetMapping("/size/{name}") + public Integer size(@PathVariable("name") String name) { + return makeReturn(name, PriorityBlockingQueue::size); + } + + @GetMapping("/is_empty/{name}") + public Boolean isEmpty(@PathVariable("name") String name) { + return makeReturn(name, PriorityBlockingQueue::isEmpty); + } + + @PostMapping("/contains/{name}") + public Boolean contains(@PathVariable("name") String name, @RequestBody QueueItem data) { + return makeReturn(name, queue -> queue.contains(data)); + } + + @GetMapping("/contains/{name}/{id}") + public Boolean containsId(@PathVariable("name") String name, @PathVariable("id") String id) { + return makeReturn(name, queue -> queue.contains(new QueueItem<>(id))); + } + + @PostMapping("/add/{name}") + public Boolean add(@PathVariable("name") String name, @RequestBody QueueItem data) { + return makeReturn(name, queue -> queue.add(data)); + } + + @PostMapping("/remove/{name}") + public Boolean remove(@PathVariable("name") String name, @RequestBody QueueItem data) { + return makeReturn(name, queue -> queue.remove(data)); + } + + @GetMapping("/remove_id/{name}/{id}") + public Boolean remove(@PathVariable("name") String name, @PathVariable("id") String id) { + return makeReturn(name, queue -> queue.remove(new QueueItem<>(id))); + } + + @PostMapping("/contains_all/{name}") + public Boolean containsAll(@PathVariable("name") String name, @RequestBody List> data) { + return makeReturn(name, queue -> queue.containsAll(data)); + } + + @PostMapping("/contains_all_id/{name}") + public Boolean containsAllId(@PathVariable("name") String name, @RequestBody List ids) { + List> containsItems = ids.stream().map(QueueItem::new).collect(Collectors.toList()); + return makeReturn(name, queue -> queue.containsAll(containsItems)); + } + + @PostMapping("/add_all/{name}") + public Boolean addAll(@PathVariable("name") String name, @RequestBody List> data) { + return makeReturn(name, queue -> queue.addAll(data)); + } + + @PostMapping("/remove_all/{name}") + public Boolean removeAll(@PathVariable("name") String name, @RequestBody List> data) { + return makeReturn(name, queue -> queue.removeAll(data)); + } + + @PostMapping("/remove_all_id/{name}") + public Boolean removeAllId(@PathVariable("name") String name, @RequestBody List ids) { + return makeReturn(name, queue -> queue.removeIf(item -> ids.contains(item.getId()))); + } + + @GetMapping("/remove_all_id/{name}/{id}") + public Boolean removeAllId(@PathVariable("name") String name, @PathVariable("id") String id) { + return makeReturn(name, queue -> queue.removeIf(item -> item.getId().equals(id))); + } + + @PostMapping("/retain_all/{name}") + public Boolean retainAll(@PathVariable("name") String name, @RequestBody List> data) { + return makeReturn(name, queue -> queue.retainAll(data)); + } + + @GetMapping("/clear/{name}") + public void clear(@PathVariable("name") String name) { + makeVoid(name, PriorityBlockingQueue::clear); + } + + @PostMapping("/offer/{name}") + public Boolean offer(@PathVariable("name") String name, @RequestBody QueueItem data) { + return makeReturn(name, queue -> queue.offer(data)); + } + + @GetMapping("/remove/{name}") + public QueueItem remove(@PathVariable("name") String name) { + return makeReturn(name, PriorityBlockingQueue::remove); + } + + @GetMapping("/poll/{name}") + public QueueItem poll(@PathVariable("name") String name) { + return makeReturn(name, queue -> { + try { + return queue.poll(Constants.POLL_TIMEOUT, Constants.TIMEOUT_UNIT); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + @GetMapping("/element/{name}") + public QueueItem element(@PathVariable("name") String name) { + return makeReturn(name, PriorityBlockingQueue::element); + } + + @GetMapping("/peek/{name}") + public QueueItem peek(@PathVariable("name") String name) { + return makeReturn(name, PriorityBlockingQueue::peek); + } +} diff --git a/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueOperator.java b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueOperator.java new file mode 100644 index 0000000..f0468ca --- /dev/null +++ b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/controller/QueueOperator.java @@ -0,0 +1,17 @@ +package com.lanyuanxiaoyao.service.queue.controller; + +import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; +import java.util.Enumeration; +import java.util.List; + +/** + * 队列通用操作 + * + * @author lanyuanxiaoyao + * @date 2023-05-07 + */ +public interface QueueOperator { + Enumeration names(); + + List> all(String name); +} diff --git a/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/service/AutoSaveService.java b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/service/AutoSaveService.java new file mode 100644 index 0000000..7fe26f2 --- /dev/null +++ b/service-queue/src/main/java/com/lanyuanxiaoyao/service/queue/service/AutoSaveService.java @@ -0,0 +1,67 @@ +package com.lanyuanxiaoyao.service.queue.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.queue.configuration.AutoSaveProperties; +import com.lanyuanxiaoyao.service.queue.configuration.Constants; +import java.io.IOException; +import java.net.URLEncoder; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +/** + * 自动保存 + * + * @author lanyuanxiaoyao + * @date 2023-05-06 + */ +@Service +public class AutoSaveService { + private static final Logger logger = LoggerFactory.getLogger(AutoSaveService.class); + + private final AutoSaveProperties autoSaveProperties; + private final Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder; + + public AutoSaveService(AutoSaveProperties autoSaveProperties, Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder) { + this.autoSaveProperties = autoSaveProperties; + this.jackson2ObjectMapperBuilder = jackson2ObjectMapperBuilder; + } + + @Scheduled(fixedDelay = 60, initialDelay = 60, timeUnit = TimeUnit.SECONDS) + public void autoSave() throws IOException { + // 用单独的变量确认是否开始自动保存,保证自动保存在初始化加载完后再启动,防止把空数据覆盖保存文件 + if (Constants.AUTO_SAVE.get()) { + if (autoSaveProperties.getLocation() != null && !"".equals(autoSaveProperties.getLocation())) { + save(Constants.DEQUE_MAP, Constants.DEQUE_SAVE_FILENAME); + save(Constants.QUEUE_MAP, Constants.QUEUE_SAVE_FILENAME); + return; + } + logger.warn("Save path is not set"); + } + } + + private void save(Map> o, String name) throws IOException { + if (o == null) { + return; + } + Map> map = new HashMap<>(o.size()); + o.forEach((k, q) -> map.put(k, new ArrayList<>(o.get(k)))); + + Files.createDirectories(Paths.get(autoSaveProperties.getLocation())); + Path saveDataPath = Paths.get(autoSaveProperties.getLocation(), name); + ObjectMapper mapper = jackson2ObjectMapperBuilder.build(); + String data = mapper.writeValueAsString(map); + + map.clear(); + + data = URLEncoder.encode(data, "utf-8"); + Files.write(saveDataPath, data.getBytes()); + } +} diff --git a/service-queue/src/main/resources/application.yml b/service-queue/src/main/resources/application.yml new file mode 100644 index 0000000..0a609a9 --- /dev/null +++ b/service-queue/src/main/resources/application.yml @@ -0,0 +1,11 @@ +server: + tomcat: + accept-count: 1000 + threads: + max: 500 + min-spare: 100 +spring: + application: + name: service-queue + profiles: + include: random-port,common,discovery,metrics diff --git a/service-queue/src/main/resources/logback-spring.xml b/service-queue/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..f955921 --- /dev/null +++ b/service-queue/src/main/resources/logback-spring.xml @@ -0,0 +1,51 @@ + + + + + + + + + + + true + + ${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push} + + + + + ${FILE_LOG_PATTERN:-%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} [${HOSTNAME}] ${LOG_LEVEL_PATTERN:-%5p} ${PID:- } --- [%t] %-40.40logger{39} #@# : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}} + + true + + + + + + ${CONSOLE_LOG_PATTERN:-%clr(%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}} + + + + + ${LOGGING_PARENT:-.}/${APP_NAME:-run}.log + + ${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz + 7 + + + ${FILE_LOG_PATTERN:-%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} [${HOSTNAME}] ${LOG_LEVEL_PATTERN:-%5p} ${PID:- } --- [%t] %-40.40logger{39} #@# : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}} + + + + + + + + + + + + \ No newline at end of file