1
0

fix: 加入事务方式数据冲突

This commit is contained in:
2025-09-25 15:39:45 +08:00
parent 3d428d9d0a
commit edd18061eb
4 changed files with 54 additions and 36 deletions

View File

@@ -37,7 +37,11 @@ public abstract class TaskRunner {
taskRepository.saveAndFlush(task); taskRepository.saveAndFlush(task);
try { try {
var result = process(params, step -> taskRepository.updateStepById(task.getId(), step)); var result = process(params, step -> {
synchronized (task) {
taskRepository.updateStepById(task.getId(), step);
}
});
task.setStatus(Task.Status.SUCCESS); task.setStatus(Task.Status.SUCCESS);
task.setStep(1.0); task.setStep(1.0);

View File

@@ -10,15 +10,17 @@ import com.lanyuanxiaoyao.leopard.core.repository.StockRepository;
import com.lanyuanxiaoyao.leopard.server.helper.NumberHelper; import com.lanyuanxiaoyao.leopard.server.helper.NumberHelper;
import com.lanyuanxiaoyao.leopard.server.service.TuShareService; import com.lanyuanxiaoyao.leopard.server.service.TuShareService;
import java.time.LocalDate; import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionTemplate;
/** /**
* 更新日线数据 * 更新日线数据
@@ -32,16 +34,18 @@ public class UpdateDailyTask extends TaskRunner {
private final StockRepository stockRepository; private final StockRepository stockRepository;
private final DailyRepository dailyRepository; private final DailyRepository dailyRepository;
private final TransactionTemplate transactionTemplate;
private final TuShareService tuShareService; private final TuShareService tuShareService;
protected UpdateDailyTask(ApplicationContext context, StockRepository stockRepository, DailyRepository dailyRepository, TuShareService tuShareService) { protected UpdateDailyTask(ApplicationContext context, StockRepository stockRepository, DailyRepository dailyRepository, TransactionTemplate transactionTemplate, TuShareService tuShareService) {
super(context); super(context);
this.stockRepository = stockRepository; this.stockRepository = stockRepository;
this.dailyRepository = dailyRepository; this.dailyRepository = dailyRepository;
this.transactionTemplate = transactionTemplate;
this.tuShareService = tuShareService; this.tuShareService = tuShareService;
} }
@Transactional(rollbackFor = Throwable.class)
@Override @Override
public String process(Map<String, Object> params, StepUpdater updater) throws Exception { public String process(Map<String, Object> params, StepUpdater updater) throws Exception {
var tradeDates = new HashSet<LocalDate>(); var tradeDates = new HashSet<LocalDate>();
@@ -60,15 +64,18 @@ public class UpdateDailyTask extends TaskRunner {
.filter(date -> date.isBefore(nowDate) || date.isEqual(nowDate)) .filter(date -> date.isBefore(nowDate) || date.isEqual(nowDate))
.filter(date -> !existsTradeDates.contains(date)) .filter(date -> !existsTradeDates.contains(date))
.toList(); .toList();
for (int index = 0, total = targetTradeDates.size(); index < total; index++) { var total = targetTradeDates.size();
var tradeDate = targetTradeDates.get(index); var finished = new AtomicInteger(0);
targetTradeDates.parallelStream()
.forEach(tradeDate -> {
var factorResponse = tuShareService.factorList(tradeDate); var factorResponse = tuShareService.factorList(tradeDate);
var factorMap = new HashMap<String, Double>(); var factorMap = new HashMap<String, Double>();
for (List<String> item : factorResponse.data().items()) { for (List<String> item : factorResponse.data().items()) {
factorMap.put(item.get(0), Double.valueOf(item.get(2))); factorMap.put(item.get(0), Double.valueOf(item.get(2)));
} }
transactionTemplate.execute(status -> {
var response = tuShareService.dailyList(tradeDate); var response = tuShareService.dailyList(tradeDate);
var dailies = new ArrayList<Daily>();
for (List<String> item : response.data().items()) { for (List<String> item : response.data().items()) {
var code = item.get(0); var code = item.get(0);
if (stocksMap.containsKey(code)) { if (stocksMap.containsKey(code)) {
@@ -87,12 +94,14 @@ public class UpdateDailyTask extends TaskRunner {
daily.setTurnover(NumberUtil.parseDouble(item.get(10))); daily.setTurnover(NumberUtil.parseDouble(item.get(10)));
daily.setFactor(factor); daily.setFactor(factor);
daily.setStock(stock); daily.setStock(stock);
dailyRepository.save(daily); dailies.add(daily);
} }
} }
updater.update(index * 1.0 / total); dailyRepository.saveAll(dailies);
} return null;
});
updater.update(finished.incrementAndGet() * 1.0 / total);
});
return null; return null;
} }
} }

View File

@@ -17,6 +17,7 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/** /**
* 更新财务指标数据 * 更新财务指标数据
@@ -39,6 +40,7 @@ public class UpdateFinanceIndicatorTask extends TaskRunner {
this.tuShareService = tuShareService; this.tuShareService = tuShareService;
} }
@Transactional(rollbackFor = Throwable.class)
@Override @Override
public String process(Map<String, Object> params, StepUpdater updater) throws JsonProcessingException { public String process(Map<String, Object> params, StepUpdater updater) throws JsonProcessingException {
var stocks = stockRepository.findAll(); var stocks = stockRepository.findAll();

View File

@@ -0,0 +1,3 @@
spring:
datasource:
url: jdbc:postgresql://127.0.0.1:6785/leopard_dev