diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/controller/chat/ChatController.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/controller/chat/ChatController.java index a45a28e..7c5abb1 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/controller/chat/ChatController.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/controller/chat/ChatController.java @@ -6,10 +6,13 @@ import com.lanyuanxiaoyao.service.ai.web.entity.vo.MessageVO; import com.lanyuanxiaoyao.service.ai.web.tools.ChartTool; import com.lanyuanxiaoyao.service.ai.web.tools.TableTool; import com.lanyuanxiaoyao.service.ai.web.tools.YarnTool; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import jakarta.servlet.http.HttpServletResponse; import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Optional; +import java.util.concurrent.TimeoutException; import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,24 +101,34 @@ public class ChatController { @PostMapping("async") public SseEmitter chatAsync( - @RequestBody ImmutableList messages + @RequestBody ImmutableList messages, + HttpServletResponse httpResponse ) { - SseEmitter emitter = new SseEmitter(); - buildRequest(messages) - .stream() - .chatResponse() - .subscribe( - response -> { - try { - emitter.send(toMessage(response)); - } catch (IOException e) { - emitter.completeWithError(e); - throw new RuntimeException(e); - } - }, - emitter::completeWithError, - emitter::complete - ); + httpResponse.setHeader("X-Accel-Buffering", "no"); + + SseEmitter emitter = new SseEmitter(20 * 1000L); + ExecutorProvider.EXECUTORS.submit(() -> { + buildRequest(messages) + .stream() + .chatResponse() + .subscribe( + response -> { + try { + emitter.send( + SseEmitter.event() + .data(toMessage(response)) + .reconnectTime(5000) + .build() + ); + } catch (IOException e) { + emitter.completeWithError(e); + } + }, + emitter::completeWithError, + emitter::complete + ); + }); + emitter.onTimeout(() -> emitter.completeWithError(new TimeoutException("SseEmitter Timeout"))); return emitter; }