feat: 增加日线数据检查流程
This commit is contained in:
@@ -31,7 +31,7 @@ public class LeopardServerApplication implements ApplicationRunner {
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) {
|
||||
executor.execute2RespWithEL("THEN(update_daily)");
|
||||
// executor.execute2RespWithEL("THEN(update_daily)");
|
||||
// executor.execute2RespWithEL("THEN(update_stock)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
package com.lanyuanxiaoyao.leopard.server.service.task;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.leopard.server.entity.Stock;
|
||||
import com.lanyuanxiaoyao.leopard.server.service.DailyService;
|
||||
import com.lanyuanxiaoyao.leopard.server.service.StockService;
|
||||
import com.lanyuanxiaoyao.leopard.server.service.TuShareService;
|
||||
import com.yomahub.liteflow.annotation.LiteflowComponent;
|
||||
import com.yomahub.liteflow.core.NodeComponent;
|
||||
import java.time.LocalDate;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@LiteflowComponent("check_daily")
|
||||
public class CheckDailyNode extends NodeComponent {
|
||||
private final StockService stockService;
|
||||
private final DailyService dailyService;
|
||||
private final TuShareService tuShareService;
|
||||
|
||||
public CheckDailyNode(StockService stockService, DailyService dailyService, TuShareService tuShareService) {
|
||||
this.stockService = stockService;
|
||||
this.dailyService = dailyService;
|
||||
this.tuShareService = tuShareService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process() {
|
||||
var reports = new ArrayList<MissedTradeReport>();
|
||||
var stocks = stockService.list();
|
||||
var exchanges = stocks.stream().map(Stock::getMarket).distinct().toList();
|
||||
for (Stock.Market exchange : exchanges) {
|
||||
var nowDate = LocalDate.now();
|
||||
var allTradeDates = tuShareService.tradeDateList(exchange.name())
|
||||
.data()
|
||||
.items()
|
||||
.stream()
|
||||
.map(item -> LocalDate.parse(item.get(0), TuShareService.TRADE_FORMAT))
|
||||
.filter(date -> date.isBefore(nowDate) || date.isEqual(nowDate))
|
||||
.toList();
|
||||
for (Stock stock : stocks) {
|
||||
if (exchange.equals(stock.getMarket())) {
|
||||
var existsTradeDates = dailyService.findDistinctTradeDateByStockId(stock.getId());
|
||||
var missedTradeDates = allTradeDates.stream()
|
||||
.filter(date -> date.isBefore(stock.getListedDate()))
|
||||
.filter(date -> !existsTradeDates.contains(date))
|
||||
.toList();
|
||||
if (ObjectUtil.isNotEmpty(missedTradeDates)) {
|
||||
reports.add(new MissedTradeReport(
|
||||
stock.getCode(),
|
||||
stock.getName(),
|
||||
missedTradeDates
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ObjectUtil.isNotEmpty(reports)) {
|
||||
var context = getContextBean(TaskMonitorNodes.TaskMonitorContext.class);
|
||||
context.setTaskResult(
|
||||
reports.stream()
|
||||
.map(report -> StrUtil.format("{}({})缺少如下交易日数据:{}", report.name(), report.code(), report.missedTradeDates().stream().map(LocalDate::toString).collect(Collectors.joining(", "))))
|
||||
.collect(Collectors.joining("\n"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public record MissedTradeReport(
|
||||
String code,
|
||||
String name,
|
||||
List<LocalDate> missedTradeDates
|
||||
) {
|
||||
}
|
||||
}
|
||||
@@ -7,4 +7,7 @@
|
||||
<chain id="update_daily_information">
|
||||
CATCH(THEN(task_start, update_daily, task_end)).DO(task_error)
|
||||
</chain>
|
||||
<chain id="check_daily">
|
||||
CATCH(THEN(task_start, check_daily, task_end)).DO(task_error)
|
||||
</chain>
|
||||
</flow>
|
||||
|
||||
Reference in New Issue
Block a user