1
0

feat: 并行处理任务

This commit is contained in:
2025-09-07 21:26:58 +08:00
parent 9e2d2e3845
commit 769bd4a90b
2 changed files with 60 additions and 59 deletions

View File

@@ -41,22 +41,23 @@ public class CheckDailyNode extends NodeComponent {
.map(item -> LocalDate.parse(item.get(0), TuShareService.TRADE_FORMAT)) .map(item -> LocalDate.parse(item.get(0), TuShareService.TRADE_FORMAT))
.filter(date -> date.isBefore(nowDate) || date.isEqual(nowDate)) .filter(date -> date.isBefore(nowDate) || date.isEqual(nowDate))
.toList(); .toList();
for (Stock stock : stocks) { stocks.parallelStream()
if (exchange.equals(stock.getMarket())) { .forEach(stock -> {
var existsTradeDates = dailyService.findDistinctTradeDateByStockId(stock.getId()); if (exchange.equals(stock.getMarket())) {
var missedTradeDates = allTradeDates.stream() var existsTradeDates = dailyService.findDistinctTradeDateByStockId(stock.getId());
.filter(date -> date.isBefore(stock.getListedDate())) var missedTradeDates = allTradeDates.stream()
.filter(date -> !existsTradeDates.contains(date)) .filter(date -> date.isBefore(stock.getListedDate()))
.toList(); .filter(date -> !existsTradeDates.contains(date))
if (ObjectUtil.isNotEmpty(missedTradeDates)) { .toList();
reports.add(new MissedTradeReport( if (ObjectUtil.isNotEmpty(missedTradeDates)) {
stock.getCode(), reports.add(new MissedTradeReport(
stock.getName(), stock.getCode(),
missedTradeDates stock.getName(),
)); missedTradeDates
));
}
} }
} });
}
} }
if (ObjectUtil.isNotEmpty(reports)) { if (ObjectUtil.isNotEmpty(reports)) {
var context = getContextBean(TaskMonitorNodes.TaskMonitorContext.class); var context = getContextBean(TaskMonitorNodes.TaskMonitorContext.class);

View File

@@ -46,54 +46,54 @@ public class UpdateDailyNode extends NodeComponent {
} }
var existsTradeDates = dailyService.findDistinctTradeDate(); var existsTradeDates = dailyService.findDistinctTradeDate();
var nowDate = LocalDate.now(); var nowDate = LocalDate.now();
var targetDates = tradeDates.stream() var stocks = stockService.list();
var stocksMap = stocks.stream().collect(Collectors.toMap(Stock::getCode, stock -> stock));
tradeDates.stream()
.filter(date -> date.isBefore(nowDate) || date.isEqual(nowDate)) .filter(date -> date.isBefore(nowDate) || date.isEqual(nowDate))
.filter(date -> !existsTradeDates.contains(date)) .filter(date -> !existsTradeDates.contains(date))
.sorted() .sorted()
.toList(); .parallel()
var stocks = stockService.list(); .forEach(tradeDate -> {
var stocksMap = stocks.stream().collect(Collectors.toMap(Stock::getCode, stock -> stock)); var factorResponse = tuShareService.factorList(tradeDate);
for (LocalDate tradeDate : targetDates) { var factorMap = new HashMap<String, Double>();
var factorResponse = tuShareService.factorList(tradeDate); for (List<String> item : factorResponse.data().items()) {
var factorMap = new HashMap<String, Double>(); factorMap.put(item.get(0), Double.valueOf(item.get(2)));
for (List<String> item : factorResponse.data().items()) {
factorMap.put(item.get(0), Double.valueOf(item.get(2)));
}
var response = tuShareService.dailyList(tradeDate);
transactionTemplate.execute(status -> {
try {
var count = 0;
for (List<String> item : response.data().items()) {
var code = item.get(0);
if (stocksMap.containsKey(code)) {
count++;
var stock = stocksMap.get(code);
var factor = factorMap.get(code);
var daily = new Daily();
daily.setTradeDate(LocalDate.parse(item.get(1), TuShareService.TRADE_FORMAT));
daily.setOpen(item.get(2) == null ? null : Double.valueOf(item.get(2)));
daily.setHigh(item.get(3) == null ? null : Double.valueOf(item.get(3)));
daily.setLow(item.get(4) == null ? null : Double.valueOf(item.get(4)));
daily.setClose(item.get(5) == null ? null : Double.valueOf(item.get(5)));
daily.setPreviousClose(item.get(6) == null ? null : Double.valueOf(item.get(6)));
daily.setPriceChangeAmount(item.get(7) == null ? null : Double.valueOf(item.get(7)));
daily.setPriceFluctuationRange(item.get(8) == null ? null : Double.valueOf(item.get(8)));
daily.setVolume(item.get(9) == null ? null : Double.valueOf(item.get(9)));
daily.setTurnover(item.get(10) == null ? null : Double.valueOf(item.get(10)));
daily.setFactor(factor);
daily.setStock(stock);
dailyService.save(daily);
}
}
log.info("Trade date: {} {}", tradeDate, count);
return true;
} catch (Exception exception) {
log.error("Error", exception);
status.setRollbackOnly();
return false;
} }
var response = tuShareService.dailyList(tradeDate);
transactionTemplate.execute(status -> {
try {
var count = 0;
for (List<String> item : response.data().items()) {
var code = item.get(0);
if (stocksMap.containsKey(code)) {
count++;
var stock = stocksMap.get(code);
var factor = factorMap.get(code);
var daily = new Daily();
daily.setTradeDate(LocalDate.parse(item.get(1), TuShareService.TRADE_FORMAT));
daily.setOpen(item.get(2) == null ? null : Double.parseDouble(item.get(2)));
daily.setHigh(item.get(3) == null ? null : Double.parseDouble(item.get(3)));
daily.setLow(item.get(4) == null ? null : Double.parseDouble(item.get(4)));
daily.setClose(item.get(5) == null ? null : Double.parseDouble(item.get(5)));
daily.setPreviousClose(item.get(6) == null ? null : Double.parseDouble(item.get(6)));
daily.setPriceChangeAmount(item.get(7) == null ? null : Double.parseDouble(item.get(7)));
daily.setPriceFluctuationRange(item.get(8) == null ? null : Double.parseDouble(item.get(8)));
daily.setVolume(item.get(9) == null ? null : Double.parseDouble(item.get(9)));
daily.setTurnover(item.get(10) == null ? null : Double.parseDouble(item.get(10)) * 1000);
daily.setFactor(factor);
daily.setStock(stock);
dailyService.save(daily);
}
}
log.info("Trade date: {} {}", tradeDate, count);
return true;
} catch (Exception exception) {
log.error("Error", exception);
status.setRollbackOnly();
return false;
}
});
}); });
}
} }
} }