本文将详细介绍如何使用Spring Boot集成AnythingLLM,实现一个高性能的流式AI对话中转服务。
技术架构
前端 → Spring Boot中转服务 → AnythingLLM → LLM模型
↓ 流式响应
实时AI对话
核心技术选型
- Spring Boot MVC: 主框架
- Reactor Core: 响应式流处理(随Spring Boot自带)
- OkHttp: HTTP客户端,优秀的流式处理能力
- Jackson: JSON处理(Spring Boot内置)
实现步骤
1. 添加依赖
<dependencies>
<!-- Spring Boot Web Starter (已包含Jackson) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- OkHttp用于流式HTTP请求 -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>
</dependency>
</dependencies>
注意:
- 不需要额外添加
spring-boot-starter-webflux
,我们使用传统的Spring MVC + Reactor实现响应式 - 不需要单独引入
jackson-databind
,已包含在spring-boot-starter-web
中
2. 配置文件
# application.yml
rag:
server:
url: http://your-anythingllm-server:3001/api
slug: your_workspace_slug
auth:
apikey: YOUR_API_KEY
3. 请求DTO
@Data
public class ChatRequest {
/**
* 用户消息
*/
private String message;
}
4. 核心服务层实现
@Slf4j
@Service
public class RagChatService {
private final OkHttpClient httpClient;
private final ObjectMapper objectMapper;
@Value("${rag.server.url}")
private String ragServerUrl;
@Value("${rag.server.slug}")
private String workspaceSlug;
@Value("${rag.server.auth.apikey}")
private String apiKey;
@Autowired
public RagChatService(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.build();
}
/**
* 流式聊天处理,返回Flux流
*/
public Flux<String> processStreamChat(String message) {
return Flux.<String>create(sink -> {
try {
log.info("开始处理流式聊天,消息: {}", message);
// 构建请求URL
String url = ragServerUrl + "/v1/workspace/" + workspaceSlug + "/stream-chat";
// 构建请求体
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("message", message);
requestBody.put("mode", "query"); // query模式不保留对话历史
String requestJson = objectMapper.writeValueAsString(requestBody);
// 构建OkHttp请求
RequestBody body = RequestBody.create(
MediaType.get("application/json"),
requestJson
);
Request request = new Request.Builder()
.url(url)
.post(body)
.addHeader("Authorization", "Bearer " + apiKey)
.addHeader("Accept", "text/event-stream")
.addHeader("Content-Type", "application/json")
.build();
// 执行流式请求
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
sink.error(new IOException("请求失败: " + response.code()));
return;
}
ResponseBody responseBody = response.body();
if (responseBody == null) {
sink.error(new IOException("响应体为空"));
return;
}
// 关键:使用BufferedReader逐行读取流式数据
try (BufferedReader reader = new BufferedReader(responseBody.charStream())) {
String line;
while ((line = reader.readLine()) != null && !sink.isCancelled()) {
String processedData = processLine(line);
if (processedData != null) {
sink.next(processedData); // 实时发送数据
}
}
}
sink.complete();
} catch (Exception e) {
sink.error(e);
}
} catch (Exception e) {
sink.error(e);
}
})
.subscribeOn(Schedulers.boundedElastic()) // I/O线程池
.publishOn(Schedulers.parallel()); // 并行发布
}
/**
* 处理SSE数据行
*/
private String processLine(String line) {
if (line == null || line.trim().isEmpty()) {
return null;
}
try {
// AnythingLLM返回的是标准SSE格式: "data: {json}"
if (line.startsWith("data: ")) {
String jsonData = line.substring(6); // 去掉 "data: " 前缀
if (jsonData.trim().isEmpty()) {
return null;
}
// 验证JSON格式
objectMapper.readTree(jsonData);
return jsonData; // 返回纯JSON,Spring Boot会自动包装成SSE格式
} else {
return null; // 忽略非data行
}
} catch (Exception e) {
log.error("处理数据行异常: {}", e.getMessage());
return null;
}
}
}
5. 控制器层实现
@Slf4j
@RestController
@RequestMapping("/api/rag")
public class RagChatController {
private final RagChatService chatService;
@Autowired
public RagChatController(RagChatService chatService) {
this.chatService = chatService;
}
/**
* 流式聊天接口
* 关键:produces = MediaType.TEXT_EVENT_STREAM_VALUE
*/
@PostMapping(value = "/stream-chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChat(@RequestBody ChatRequest request) {
log.info("收到流式聊天请求,消息: {}", request.getMessage());
// 参数验证
if (!StringUtils.hasText(request.getMessage())) {
return Flux.error(new IllegalArgumentException("消息不能为空"));
}
// 调用服务层,返回响应式流
return chatService.processStreamChat(request.getMessage())
.doOnNext(data -> log.debug("发送数据到客户端: {}", data))
.doOnError(error -> log.error("流式聊天发生异常", error))
.doOnComplete(() -> log.info("流式聊天完成"))
.onErrorResume(error -> {
// 错误处理,返回错误JSON
return Flux.just("{\"error\":\"" + error.getMessage() +
"\",\"type\":\"abort\",\"close\":true}");
});
}
}
关键技术要点
1. 响应式编程的正确使用
❌ 错误做法:
@Async
public void processStreamChat(String message, SseEmitter emitter) {
// 使用传统的阻塞式编程 + 异步注解
// 这会导致批量输出,而非真正的流式
}
✅ 正确做法:
public Flux<String> processStreamChat(String message) {
return Flux.create(sink -> {
// 响应式流,每读取一行立即通过sink.next()发送
});
}
2. HTTP客户端选择
我们尝试了三种方案:
方案 | 优势 | 劣势 | 适用场景 |
---|---|---|---|
RestTemplate | 简单易用 | 阻塞式,不适合流式 | 非流式场景 |
WebClient | Spring WebFlux生态 | 需要额外依赖,SSE处理复杂 | 全栈响应式场景 |
OkHttp + Reactor | 流式处理优秀,无需WebFlux | 需额外依赖 | 流式处理首选 |
3. 线程调度优化
.subscribeOn(Schedulers.boundedElastic()) // I/O密集型任务
.publishOn(Schedulers.parallel()); // CPU密集型任务
4. SSE数据格式处理
AnythingLLM返回的SSE格式:
data: {"uuid":"xxx","textResponse":"你好","close":false}
data: {"uuid":"xxx","textResponse":"!","close":false}
data: {"uuid":"xxx","textResponse":"","sources":[...],"close":true}
我们的处理策略:
- 解析
data:
前缀 - 提取JSON数据
- 验证格式
- 通过Flux流式返回
前端集成示例
function sendMessage(message) {
fetch('/api/rag/stream-chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
},
body: JSON.stringify({message: message})
}).then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
function readStream() {
return reader.read().then(({done, value}) => {
if (done) return;
const chunk = decoder.decode(value, {stream: true});
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() || '';
lines.forEach(line => {
if (line.startsWith('data:')) {
const jsonData = line.substring(5).trim();
if (jsonData) {
const data = JSON.parse(jsonData);
if (data.textResponse) {
// 实时显示AI回复
displayText(data.textResponse);
}
}
}
});
return readStream();
});
}
return readStream();
});
}
性能优化建议
1. 连接池配置
private final OkHttpClient httpClient = new OkHttpClient.Builder()
.connectionPool(new ConnectionPool(50, 5, TimeUnit.MINUTES))
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.build();
2. 背压处理
return Flux.create(sink -> {
// 检查客户端是否已断开
if (sink.isCancelled()) {
return;
}
// 处理逻辑...
}, FluxSink.OverflowStrategy.BUFFER);
3. 异步任务执行器配置
@Configuration
public class AsyncConfig implements WebMvcConfigurer {
@Bean
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.initialize();
return executor;
}
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(asyncTaskExecutor());
configurer.setDefaultTimeout(30000);
}
}
常见问题解决
1. 流式输出变成批量输出
原因: 使用了阻塞式HTTP客户端或错误的响应式编程方式 解决: 使用OkHttp + Flux.create()
2. 中文乱码问题
原因: 字符编码设置不当 解决:
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(responseBody.byteStream(), StandardCharsets.UTF_8))) {
// 读取逻辑
}
3. 连接超时问题
原因: 默认超时时间过短 解决: 合理设置超时时间
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS) // AI响应可能较慢
评论区