feat: 增加日线数据更新进度
This commit is contained in:
@@ -1,25 +1,28 @@
|
|||||||
package com.lanyuanxiaoyao.leopard.server.service.task;
|
package com.lanyuanxiaoyao.leopard.server.service.task;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.NumberUtil;
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.lanyuanxiaoyao.leopard.core.entity.Daily;
|
import com.lanyuanxiaoyao.leopard.core.entity.Daily;
|
||||||
import com.lanyuanxiaoyao.leopard.core.entity.Stock;
|
import com.lanyuanxiaoyao.leopard.core.entity.Stock;
|
||||||
import com.lanyuanxiaoyao.leopard.core.repository.DailyRepository;
|
import com.lanyuanxiaoyao.leopard.core.repository.DailyRepository;
|
||||||
import com.lanyuanxiaoyao.leopard.core.repository.StockRepository;
|
import com.lanyuanxiaoyao.leopard.core.repository.StockRepository;
|
||||||
|
import com.lanyuanxiaoyao.leopard.server.helper.NumberHelper;
|
||||||
|
import com.lanyuanxiaoyao.leopard.server.service.TaskService;
|
||||||
import com.lanyuanxiaoyao.leopard.server.service.TuShareService;
|
import com.lanyuanxiaoyao.leopard.server.service.TuShareService;
|
||||||
import com.yomahub.liteflow.annotation.LiteflowComponent;
|
import com.yomahub.liteflow.annotation.LiteflowComponent;
|
||||||
import com.yomahub.liteflow.core.NodeComponent;
|
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@LiteflowComponent("update_daily")
|
@LiteflowComponent("update_daily")
|
||||||
public class UpdateDailyNode extends NodeComponent {
|
public class UpdateDailyNode extends TaskNodeComponent {
|
||||||
private final StockRepository stockRepository;
|
private final StockRepository stockRepository;
|
||||||
private final DailyRepository dailyRepository;
|
private final DailyRepository dailyRepository;
|
||||||
|
|
||||||
@@ -27,14 +30,14 @@ public class UpdateDailyNode extends NodeComponent {
|
|||||||
|
|
||||||
private final TransactionTemplate transactionTemplate;
|
private final TransactionTemplate transactionTemplate;
|
||||||
|
|
||||||
public UpdateDailyNode(StockRepository stockRepository, DailyRepository dailyRepository, TuShareService tuShareService, TransactionTemplate transactionTemplate) {
|
public UpdateDailyNode(TaskService taskService, StockRepository stockRepository, DailyRepository dailyRepository, TuShareService tuShareService, TransactionTemplate transactionTemplate) {
|
||||||
|
super(taskService);
|
||||||
this.stockRepository = stockRepository;
|
this.stockRepository = stockRepository;
|
||||||
this.dailyRepository = dailyRepository;
|
this.dailyRepository = dailyRepository;
|
||||||
this.tuShareService = tuShareService;
|
this.tuShareService = tuShareService;
|
||||||
this.transactionTemplate = transactionTemplate;
|
this.transactionTemplate = transactionTemplate;
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Transactional(rollbackOn = Throwable.class)
|
|
||||||
@Override
|
@Override
|
||||||
public void process() {
|
public void process() {
|
||||||
var tradeDates = new HashSet<LocalDate>();
|
var tradeDates = new HashSet<LocalDate>();
|
||||||
@@ -50,11 +53,13 @@ public class UpdateDailyNode extends NodeComponent {
|
|||||||
var nowDate = LocalDate.now();
|
var nowDate = LocalDate.now();
|
||||||
var stocks = stockRepository.findAll();
|
var stocks = stockRepository.findAll();
|
||||||
var stocksMap = stocks.stream().collect(Collectors.toMap(Stock::getCode, stock -> stock));
|
var stocksMap = stocks.stream().collect(Collectors.toMap(Stock::getCode, stock -> stock));
|
||||||
tradeDates.stream()
|
var allTradeDates = tradeDates.stream()
|
||||||
.filter(date -> date.isBefore(nowDate) || date.isEqual(nowDate))
|
.filter(date -> date.isBefore(nowDate) || date.isEqual(nowDate))
|
||||||
.filter(date -> !existsTradeDates.contains(date))
|
.filter(date -> !existsTradeDates.contains(date))
|
||||||
.sorted()
|
.sorted()
|
||||||
.parallel()
|
.toList();
|
||||||
|
var total = new AtomicInteger(allTradeDates.size());
|
||||||
|
allTradeDates.parallelStream()
|
||||||
.forEach(tradeDate -> {
|
.forEach(tradeDate -> {
|
||||||
var factorResponse = tuShareService.factorList(tradeDate);
|
var factorResponse = tuShareService.factorList(tradeDate);
|
||||||
var factorMap = new HashMap<String, Double>();
|
var factorMap = new HashMap<String, Double>();
|
||||||
@@ -65,30 +70,27 @@ public class UpdateDailyNode extends NodeComponent {
|
|||||||
var response = tuShareService.dailyList(tradeDate);
|
var response = tuShareService.dailyList(tradeDate);
|
||||||
transactionTemplate.execute(status -> {
|
transactionTemplate.execute(status -> {
|
||||||
try {
|
try {
|
||||||
var count = 0;
|
|
||||||
for (List<String> item : response.data().items()) {
|
for (List<String> item : response.data().items()) {
|
||||||
var code = item.get(0);
|
var code = item.get(0);
|
||||||
if (stocksMap.containsKey(code)) {
|
if (stocksMap.containsKey(code)) {
|
||||||
count++;
|
|
||||||
var stock = stocksMap.get(code);
|
var stock = stocksMap.get(code);
|
||||||
var factor = factorMap.get(code);
|
var factor = factorMap.get(code);
|
||||||
var daily = new Daily();
|
var daily = new Daily();
|
||||||
daily.setTradeDate(LocalDate.parse(item.get(1), TuShareService.TRADE_FORMAT));
|
daily.setTradeDate(tradeDate);
|
||||||
daily.setOpen(item.get(2) == null ? null : Double.parseDouble(item.get(2)));
|
daily.setOpen(NumberHelper.parseDouble(item.get(2)));
|
||||||
daily.setHigh(item.get(3) == null ? null : Double.parseDouble(item.get(3)));
|
daily.setHigh(NumberUtil.parseDouble(item.get(3)));
|
||||||
daily.setLow(item.get(4) == null ? null : Double.parseDouble(item.get(4)));
|
daily.setLow(NumberUtil.parseDouble(item.get(4)));
|
||||||
daily.setClose(item.get(5) == null ? null : Double.parseDouble(item.get(5)));
|
daily.setClose(NumberUtil.parseDouble(item.get(5)));
|
||||||
daily.setPreviousClose(item.get(6) == null ? null : Double.parseDouble(item.get(6)));
|
daily.setPreviousClose(NumberUtil.parseDouble(item.get(6)));
|
||||||
daily.setPriceChangeAmount(item.get(7) == null ? null : Double.parseDouble(item.get(7)));
|
daily.setPriceChangeAmount(NumberUtil.parseDouble(item.get(7)));
|
||||||
daily.setPriceFluctuationRange(item.get(8) == null ? null : Double.parseDouble(item.get(8)));
|
daily.setPriceFluctuationRange(NumberUtil.parseDouble(item.get(8)));
|
||||||
daily.setVolume(item.get(9) == null ? null : Double.parseDouble(item.get(9)));
|
daily.setVolume(NumberUtil.parseDouble(item.get(9)));
|
||||||
daily.setTurnover(item.get(10) == null ? null : Double.parseDouble(item.get(10)) * 1000);
|
daily.setTurnover(NumberUtil.parseDouble(item.get(10)));
|
||||||
daily.setFactor(factor);
|
daily.setFactor(factor);
|
||||||
daily.setStock(stock);
|
daily.setStock(stock);
|
||||||
dailyRepository.save(daily);
|
dailyRepository.save(daily);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("Trade date: {} {}", tradeDate, count);
|
|
||||||
return true;
|
return true;
|
||||||
} catch (Exception exception) {
|
} catch (Exception exception) {
|
||||||
log.error("Error", exception);
|
log.error("Error", exception);
|
||||||
|
|||||||
Reference in New Issue
Block a user