feat(queue): 迁移queue到hudi service项目中

This commit is contained in:
2024-01-15 10:41:01 +08:00
parent f5b1845d97
commit 616eb1e514
16 changed files with 895 additions and 2 deletions

39
service-queue/pom.xml Normal file
View File

@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>hudi-service</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>service-queue</artifactId>
<dependencies>
<dependency>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>service-dependencies</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>service-configuration</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,108 @@
package com.lanyuanxiaoyao.service.queue;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
import com.lanyuanxiaoyao.service.queue.configuration.AutoSaveProperties;
import com.lanyuanxiaoyao.service.queue.configuration.Constants;
import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties;
import java.io.IOException;
import java.net.URLDecoder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.PriorityBlockingQueue;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 启动类
*
* @author lanyuanxiaoyao
* @date 2023-05-04
*/
@EnableDiscoveryClient
@EnableConfigurationProperties
@EnableEncryptableProperties
@SpringBootApplication(scanBasePackages = {"com.lanyuanxiaoyao.service"})
@EnableScheduling
@RestController
@RequestMapping("queue")
public class QueueApplication implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(QueueApplication.class);
public static void main(String[] args) {
SpringApplication.run(QueueApplication.class, args);
}
@Resource
private AutoSaveProperties autoSaveProperties;
@Resource
private Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder;
@Resource
private DiscoveryClient client;
@Override
public void run(ApplicationArguments args) throws Exception {
// 队列不支持分布式启动,只能启动一个
if (client.getServices().contains("queue")) {
logger.warn("Wait for 15s");
Thread.sleep(15000);
logger.error("Found queue exists, exit...");
System.exit(1);
}
if (autoSaveProperties.getLocation() != null && !"".equals(autoSaveProperties.getLocation())) {
ObjectMapper mapper = jackson2ObjectMapperBuilder.build();
String dequeData = load(Constants.DEQUE_SAVE_FILENAME);
if (Objects.nonNull(dequeData)) {
ConcurrentHashMap<String, ConcurrentLinkedDeque<QueueItem<?>>> map = mapper.readValue(dequeData, new TypeReference<ConcurrentHashMap<String, ConcurrentLinkedDeque<QueueItem<?>>>>() {
});
Constants.DEQUE_MAP.putAll(map);
}
String queueData = load(Constants.QUEUE_SAVE_FILENAME);
if (Objects.nonNull(queueData)) {
ConcurrentHashMap<String, PriorityBlockingQueue<QueueItem<?>>> map = mapper.readValue(queueData, new TypeReference<ConcurrentHashMap<String, PriorityBlockingQueue<QueueItem<?>>>>() {
});
Constants.QUEUE_MAP.putAll(map);
}
// 如果启用了自动保存,数据加载完成后才开始自动保存
if (autoSaveProperties.getEnable()) {
Constants.AUTO_SAVE.set(true);
} else {
logger.warn("Auto save is disabled");
}
} else {
logger.warn("Save path is not set");
}
}
private String load(String name) throws IOException {
Path saveDataPath = Paths.get(autoSaveProperties.getLocation(), name);
if (Files.exists(saveDataPath)) {
String data = new String(Files.readAllBytes(saveDataPath));
data = URLDecoder.decode(data, "utf-8");
return data;
}
logger.warn("Cannot found any data from {}", saveDataPath);
return null;
}
}

View File

@@ -0,0 +1,39 @@
package com.lanyuanxiaoyao.service.queue.configuration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author lanyuanxiaoyao
* @date 2023-05-06
*/
@Component
@ConfigurationProperties("data.save")
public class AutoSaveProperties {
private Boolean enable = false;
private String location = "data";
public void setEnable(Boolean enable) {
this.enable = enable;
}
public void setLocation(String location) {
this.location = location;
}
public Boolean getEnable() {
return enable;
}
public String getLocation() {
return location;
}
@Override
public String toString() {
return "AutoSaveProperty{" +
"enable=" + enable +
", location='" + location + '\'' +
'}';
}
}

View File

@@ -0,0 +1,25 @@
package com.lanyuanxiaoyao.service.queue.configuration;
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author lanyuanxiaoyao
* @date 2023-05-06
*/
public interface Constants {
AtomicBoolean AUTO_SAVE = new AtomicBoolean(false);
ConcurrentHashMap<String, ConcurrentLinkedDeque<QueueItem<?>>> DEQUE_MAP = new ConcurrentHashMap<>(10);
ConcurrentHashMap<String, PriorityBlockingQueue<QueueItem<?>>> QUEUE_MAP = new ConcurrentHashMap<>(10);
String DEQUE_SAVE_FILENAME = "save_data_deque";
String QUEUE_SAVE_FILENAME = "save_data_deque";
TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS;
long POLL_TIMEOUT = 1;
}

View File

@@ -0,0 +1,241 @@
package com.lanyuanxiaoyao.service.queue.controller;
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
import com.lanyuanxiaoyao.service.queue.configuration.Constants;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
/**
* 双向队列
*
* @author lanyuanxiaoyao
* @date 2023-05-06
*/
@RestController
@RequestMapping("deque")
public class DequeController implements QueueOperator {
private static final Logger logger = LoggerFactory.getLogger(DequeController.class);
private ConcurrentLinkedDeque<QueueItem<?>> generateDeque(String name) {
return new ConcurrentLinkedDeque<>();
}
private void makeVoid(String name, Consumer<ConcurrentLinkedDeque<QueueItem<?>>> consumer) {
consumer.accept(Constants.DEQUE_MAP.computeIfAbsent(name, this::generateDeque));
}
private <T> T makeReturn(String name, Function<ConcurrentLinkedDeque<QueueItem<?>>, T> function) {
return function.apply(Constants.DEQUE_MAP.computeIfAbsent(name, this::generateDeque));
}
@GetMapping("/names")
@Override
public Enumeration<String> names() {
return Constants.DEQUE_MAP.keys();
}
@GetMapping("/all/{name}")
@Override
public List<QueueItem<?>> all(@PathVariable("name") String name) {
return makeReturn(name, ArrayList::new);
}
@PostMapping("/add_first/{name}")
public void addFirst(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
makeVoid(name, deque -> deque.addFirst(data));
}
@PostMapping("/add_last/{name}")
public void addLast(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
makeVoid(name, deque -> deque.addLast(data));
}
@PostMapping("/offer_first/{name}")
public Boolean offerFirst(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
return makeReturn(name, deque -> deque.offerFirst(data));
}
@PostMapping("/offer_last/{name}")
public Boolean offerLast(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
return makeReturn(name, deque -> deque.offerLast(data));
}
@GetMapping("/remove_first/{name}")
public QueueItem<?> removeFirst(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::removeFirst);
}
@GetMapping("/remove_last/{name}")
public QueueItem<?> removeLast(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::removeLast);
}
@GetMapping("/poll_first/{name}")
public QueueItem<?> pollFirst(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::pollFirst);
}
@GetMapping("/poll_last/{name}")
public QueueItem<?> pollLast(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::pollLast);
}
@GetMapping("/get_first/{name}")
public QueueItem<?> getFirst(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::getFirst);
}
@GetMapping("/get_last/{name}")
public QueueItem<?> getLast(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::getLast);
}
@GetMapping("/peek_first/{name}")
public QueueItem<?> peekFirst(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::peekFirst);
}
@GetMapping("/peek_last/{name}")
public QueueItem<?> peekLast(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::peekLast);
}
@PostMapping("/remove_first_occurrence/{name}")
public Boolean removeFirstOccurrence(@PathVariable("name") String name, @RequestBody String data) {
return makeReturn(name, deque -> deque.removeFirstOccurrence(data));
}
@GetMapping("/remove_first_occurrence_id/{name}/{id}")
public Boolean removeFirstOccurrenceId(@PathVariable("name") String name, @PathVariable("id") String id) {
return makeReturn(name, deque -> deque.removeFirstOccurrence(new QueueItem<>(id)));
}
@PostMapping("/remove_last_occurrence/{name}")
public Boolean removeLastOccurrence(@PathVariable("name") String name, @RequestBody String data) {
return makeReturn(name, deque -> deque.removeLastOccurrence(data));
}
@GetMapping("/remove_last_occurrence_id/{name}/{id}")
public Boolean removeLastOccurrenceId(@PathVariable("name") String name, @PathVariable("id") String id) {
return makeReturn(name, deque -> deque.removeLastOccurrence(new QueueItem<>(id)));
}
@PostMapping("/add/{name}")
public Boolean add(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
return makeReturn(name, deque -> deque.add(data));
}
@PostMapping("/offer/{name}")
public Boolean offer(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
return makeReturn(name, deque -> deque.offer(data));
}
@GetMapping("/remove/{name}")
public QueueItem<?> remove(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::remove);
}
@GetMapping("/poll/{name}")
public QueueItem<?> poll(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::poll);
}
@GetMapping("/element/{name}")
public QueueItem<?> element(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::element);
}
@GetMapping("/peek/{name}")
public QueueItem<?> peek(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::peek);
}
@PostMapping("/push/{name}")
public void push(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
makeVoid(name, deque -> deque.push(data));
}
@GetMapping("/pop/{name}")
public QueueItem<?> pop(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::pop);
}
@PostMapping("/remove/{name}")
public Boolean remove(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
return makeReturn(name, deque -> deque.remove(data));
}
@GetMapping("/remove_id/{name}/{id}")
public Boolean removeId(@PathVariable("name") String name, @PathVariable("id") String id) {
return makeReturn(name, deque -> deque.remove(new QueueItem<>(id)));
}
@PostMapping("/contains_all/{name}")
public Boolean containsAll(@PathVariable("name") String name, @RequestBody List<QueueItem<?>> data) {
return makeReturn(name, deque -> deque.containsAll(data));
}
@PostMapping("/contains_all_id/{name}")
public Boolean containsAllId(@PathVariable("name") String name, @RequestBody List<String> ids) {
List<QueueItem<?>> containsItems = ids.stream().map(QueueItem::new).collect(Collectors.toList());
return makeReturn(name, deque -> deque.containsAll(containsItems));
}
@PostMapping("/add_all/{name}")
public Boolean addAll(@PathVariable("name") String name, @RequestBody List<QueueItem<?>> data) {
return makeReturn(name, deque -> deque.addAll(data));
}
@PostMapping("/remove_all/{name}")
public Boolean removeAll(@PathVariable("name") String name, @RequestBody List<QueueItem<?>> data) {
return makeReturn(name, deque -> deque.removeAll(data));
}
@PostMapping("/remove_all_id/{name}")
public Boolean removeAllId(@PathVariable("name") String name, @RequestBody List<String> ids) {
return makeReturn(name, deque -> deque.removeIf(item -> ids.contains(item.getId())));
}
@GetMapping("/remove_all_id/{name}/{id}")
public Boolean removeAllId(@PathVariable("name") String name, @PathVariable String id) {
return makeReturn(name, deque -> deque.removeIf(item -> item.getId().equals(id)));
}
@PostMapping("/retain_all/{name}")
public Boolean retainAll(@PathVariable("name") String name, @RequestBody List<QueueItem<?>> data) {
return makeReturn(name, deque -> deque.retainAll(data));
}
@GetMapping("/clear/{name}")
public void clear(@PathVariable("name") String name) {
makeVoid(name, ConcurrentLinkedDeque::clear);
}
@PostMapping("/contains/{name}")
public Boolean contains(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
return makeReturn(name, deque -> deque.contains(data));
}
@GetMapping("/contains_id/{name}/{id}")
public Boolean containsId(@PathVariable("name") String name, @PathVariable("id") String id) {
return makeReturn(name, deque -> deque.contains(new QueueItem<>(id)));
}
@GetMapping("/size/{name}")
public Integer size(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::size);
}
@GetMapping("/is_empty/{name}")
public Boolean isEmpty(@PathVariable("name") String name) {
return makeReturn(name, ConcurrentLinkedDeque::isEmpty);
}
}

View File

@@ -0,0 +1,155 @@
package com.lanyuanxiaoyao.service.queue.controller;
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
import com.lanyuanxiaoyao.service.queue.configuration.Constants;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
/**
* @author lanyuanxiaoyao
* @date 2023-05-06
*/
@RestController
@RequestMapping("queue")
public class QueueController implements QueueOperator {
private static final Logger logger = LoggerFactory.getLogger(QueueController.class);
private PriorityBlockingQueue<QueueItem<?>> generateDeque(String name) {
return new PriorityBlockingQueue<>();
}
private void makeVoid(String name, Consumer<PriorityBlockingQueue<QueueItem<?>>> consumer) {
consumer.accept(Constants.QUEUE_MAP.computeIfAbsent(name, this::generateDeque));
}
private <T> T makeReturn(String name, Function<PriorityBlockingQueue<QueueItem<?>>, T> function) {
return function.apply(Constants.QUEUE_MAP.computeIfAbsent(name, this::generateDeque));
}
@GetMapping("/names")
@Override
public Enumeration<String> names() {
return Constants.QUEUE_MAP.keys();
}
@GetMapping("/all/{name}")
@Override
public List<QueueItem<?>> all(@PathVariable("name") String name) {
return makeReturn(name, ArrayList::new);
}
@GetMapping("/size/{name}")
public Integer size(@PathVariable("name") String name) {
return makeReturn(name, PriorityBlockingQueue::size);
}
@GetMapping("/is_empty/{name}")
public Boolean isEmpty(@PathVariable("name") String name) {
return makeReturn(name, PriorityBlockingQueue::isEmpty);
}
@PostMapping("/contains/{name}")
public Boolean contains(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
return makeReturn(name, queue -> queue.contains(data));
}
@GetMapping("/contains/{name}/{id}")
public Boolean containsId(@PathVariable("name") String name, @PathVariable("id") String id) {
return makeReturn(name, queue -> queue.contains(new QueueItem<>(id)));
}
@PostMapping("/add/{name}")
public Boolean add(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
return makeReturn(name, queue -> queue.add(data));
}
@PostMapping("/remove/{name}")
public Boolean remove(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
return makeReturn(name, queue -> queue.remove(data));
}
@GetMapping("/remove_id/{name}/{id}")
public Boolean remove(@PathVariable("name") String name, @PathVariable("id") String id) {
return makeReturn(name, queue -> queue.remove(new QueueItem<>(id)));
}
@PostMapping("/contains_all/{name}")
public Boolean containsAll(@PathVariable("name") String name, @RequestBody List<QueueItem<?>> data) {
return makeReturn(name, queue -> queue.containsAll(data));
}
@PostMapping("/contains_all_id/{name}")
public Boolean containsAllId(@PathVariable("name") String name, @RequestBody List<String> ids) {
List<QueueItem<?>> containsItems = ids.stream().map(QueueItem::new).collect(Collectors.toList());
return makeReturn(name, queue -> queue.containsAll(containsItems));
}
@PostMapping("/add_all/{name}")
public Boolean addAll(@PathVariable("name") String name, @RequestBody List<QueueItem<?>> data) {
return makeReturn(name, queue -> queue.addAll(data));
}
@PostMapping("/remove_all/{name}")
public Boolean removeAll(@PathVariable("name") String name, @RequestBody List<QueueItem<?>> data) {
return makeReturn(name, queue -> queue.removeAll(data));
}
@PostMapping("/remove_all_id/{name}")
public Boolean removeAllId(@PathVariable("name") String name, @RequestBody List<String> ids) {
return makeReturn(name, queue -> queue.removeIf(item -> ids.contains(item.getId())));
}
@GetMapping("/remove_all_id/{name}/{id}")
public Boolean removeAllId(@PathVariable("name") String name, @PathVariable("id") String id) {
return makeReturn(name, queue -> queue.removeIf(item -> item.getId().equals(id)));
}
@PostMapping("/retain_all/{name}")
public Boolean retainAll(@PathVariable("name") String name, @RequestBody List<QueueItem<?>> data) {
return makeReturn(name, queue -> queue.retainAll(data));
}
@GetMapping("/clear/{name}")
public void clear(@PathVariable("name") String name) {
makeVoid(name, PriorityBlockingQueue::clear);
}
@PostMapping("/offer/{name}")
public Boolean offer(@PathVariable("name") String name, @RequestBody QueueItem<?> data) {
return makeReturn(name, queue -> queue.offer(data));
}
@GetMapping("/remove/{name}")
public QueueItem<?> remove(@PathVariable("name") String name) {
return makeReturn(name, PriorityBlockingQueue::remove);
}
@GetMapping("/poll/{name}")
public QueueItem<?> poll(@PathVariable("name") String name) {
return makeReturn(name, queue -> {
try {
return queue.poll(Constants.POLL_TIMEOUT, Constants.TIMEOUT_UNIT);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
@GetMapping("/element/{name}")
public QueueItem<?> element(@PathVariable("name") String name) {
return makeReturn(name, PriorityBlockingQueue::element);
}
@GetMapping("/peek/{name}")
public QueueItem<?> peek(@PathVariable("name") String name) {
return makeReturn(name, PriorityBlockingQueue::peek);
}
}

View File

@@ -0,0 +1,17 @@
package com.lanyuanxiaoyao.service.queue.controller;
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
import java.util.Enumeration;
import java.util.List;
/**
* 队列通用操作
*
* @author lanyuanxiaoyao
* @date 2023-05-07
*/
public interface QueueOperator {
Enumeration<String> names();
List<QueueItem<?>> all(String name);
}

View File

@@ -0,0 +1,67 @@
package com.lanyuanxiaoyao.service.queue.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.queue.configuration.AutoSaveProperties;
import com.lanyuanxiaoyao.service.queue.configuration.Constants;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* 自动保存
*
* @author lanyuanxiaoyao
* @date 2023-05-06
*/
@Service
public class AutoSaveService {
private static final Logger logger = LoggerFactory.getLogger(AutoSaveService.class);
private final AutoSaveProperties autoSaveProperties;
private final Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder;
public AutoSaveService(AutoSaveProperties autoSaveProperties, Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder) {
this.autoSaveProperties = autoSaveProperties;
this.jackson2ObjectMapperBuilder = jackson2ObjectMapperBuilder;
}
@Scheduled(fixedDelay = 60, initialDelay = 60, timeUnit = TimeUnit.SECONDS)
public void autoSave() throws IOException {
// 用单独的变量确认是否开始自动保存,保证自动保存在初始化加载完后再启动,防止把空数据覆盖保存文件
if (Constants.AUTO_SAVE.get()) {
if (autoSaveProperties.getLocation() != null && !"".equals(autoSaveProperties.getLocation())) {
save(Constants.DEQUE_MAP, Constants.DEQUE_SAVE_FILENAME);
save(Constants.QUEUE_MAP, Constants.QUEUE_SAVE_FILENAME);
return;
}
logger.warn("Save path is not set");
}
}
private void save(Map<String, ? extends Queue<?>> o, String name) throws IOException {
if (o == null) {
return;
}
Map<String, List<?>> map = new HashMap<>(o.size());
o.forEach((k, q) -> map.put(k, new ArrayList<>(o.get(k))));
Files.createDirectories(Paths.get(autoSaveProperties.getLocation()));
Path saveDataPath = Paths.get(autoSaveProperties.getLocation(), name);
ObjectMapper mapper = jackson2ObjectMapperBuilder.build();
String data = mapper.writeValueAsString(map);
map.clear();
data = URLEncoder.encode(data, "utf-8");
Files.write(saveDataPath, data.getBytes());
}
}

View File

@@ -0,0 +1,11 @@
server:
tomcat:
accept-count: 1000
threads:
max: 500
min-spare: 100
spring:
application:
name: service-queue
profiles:
include: random-port,common,discovery,metrics

View File

@@ -0,0 +1,51 @@
<configuration>
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<springProperty scope="context" name="LOKI_PUSH_URL" source="loki.url"/>
<springProperty scope="context" name="LOGGING_PARENT" source="logging.parent"/>
<springProperty scope="context" name="APP_NAME" source="spring.application.name"/>
<appender name="Loki" class="com.github.loki4j.logback.Loki4jAppender">
<metricsEnabled>true</metricsEnabled>
<http class="com.github.loki4j.logback.ApacheHttpSender">
<url>${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push}</url>
</http>
<format>
<label>
<pattern>app=${APP_NAME:- },host=${HOSTNAME},level=%level</pattern>
</label>
<message>
<pattern>${FILE_LOG_PATTERN:-%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} [${HOSTNAME}] ${LOG_LEVEL_PATTERN:-%5p} ${PID:- } -&#45;&#45; [%t] %-40.40logger{39} #@# : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}</pattern>
</message>
<sortByTime>true</sortByTime>
</format>
</appender>
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN:-%clr(%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}</pattern>
</encoder>
</appender>
<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOGGING_PARENT:-.}/${APP_NAME:-run}.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz</fileNamePattern>
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder>
<pattern>${FILE_LOG_PATTERN:-%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} [${HOSTNAME}] ${LOG_LEVEL_PATTERN:-%5p} ${PID:- } -&#45;&#45; [%t] %-40.40logger{39} #@# : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}</pattern>
</encoder>
</appender>
<logger name="com.zaxxer.hikari" level="ERROR"/>
<logger name="com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver" level="WARN"/>
<root level="INFO">
<appender-ref ref="Loki"/>
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
</root>
</configuration>