本文将详细介绍如何使用Spring Boot集成AnythingLLM,实现一个高性能的流式AI对话中转服务。

技术架构

前端 → Spring Boot中转服务 → AnythingLLM → LLM模型
     ↓ 流式响应
   实时AI对话

核心技术选型

  • Spring Boot MVC: 主框架
  • Reactor Core: 响应式流处理(随Spring Boot自带)
  • OkHttp: HTTP客户端,优秀的流式处理能力
  • Jackson: JSON处理(Spring Boot内置)

实现步骤

1. 添加依赖

<dependencies>
    <!-- Spring Boot Web Starter (已包含Jackson) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- OkHttp用于流式HTTP请求 -->
    <dependency>
        <groupId>com.squareup.okhttp3</groupId>
        <artifactId>okhttp</artifactId>
        <version>4.12.0</version>
    </dependency>
</dependencies>

注意:

  • 不需要额外添加spring-boot-starter-webflux,我们使用传统的Spring MVC + Reactor实现响应式
  • 不需要单独引入jackson-databind,已包含在spring-boot-starter-web

2. 配置文件

# application.yml
rag:
  server:
    url: http://your-anythingllm-server:3001/api
    slug: your_workspace_slug
    auth:
      apikey: YOUR_API_KEY

3. 请求DTO

@Data
public class ChatRequest {
    /**
     * 用户消息
     */
    private String message;
}

4. 核心服务层实现

@Slf4j
@Service
public class RagChatService {

    private final OkHttpClient httpClient;
    private final ObjectMapper objectMapper;

    @Value("${rag.server.url}")
    private String ragServerUrl;

    @Value("${rag.server.slug}")
    private String workspaceSlug;

    @Value("${rag.server.auth.apikey}")
    private String apiKey;

    @Autowired
    public RagChatService(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
        this.httpClient = new OkHttpClient.Builder()
                .connectTimeout(10, TimeUnit.SECONDS)
                .readTimeout(30, TimeUnit.SECONDS)
                .build();
    }

    /**
     * 流式聊天处理,返回Flux流
     */
    public Flux<String> processStreamChat(String message) {
        return Flux.<String>create(sink -> {
                    try {
                        log.info("开始处理流式聊天,消息: {}", message);

                        // 构建请求URL
                        String url = ragServerUrl + "/v1/workspace/" + workspaceSlug + "/stream-chat";

                        // 构建请求体
                        Map<String, Object> requestBody = new HashMap<>();
                        requestBody.put("message", message);
                        requestBody.put("mode", "query"); // query模式不保留对话历史

                        String requestJson = objectMapper.writeValueAsString(requestBody);

                        // 构建OkHttp请求
                        RequestBody body = RequestBody.create(
                                MediaType.get("application/json"),
                                requestJson
                        );

                        Request request = new Request.Builder()
                                .url(url)
                                .post(body)
                                .addHeader("Authorization", "Bearer " + apiKey)
                                .addHeader("Accept", "text/event-stream")
                                .addHeader("Content-Type", "application/json")
                                .build();

                        // 执行流式请求
                        try (Response response = httpClient.newCall(request).execute()) {
                            if (!response.isSuccessful()) {
                                sink.error(new IOException("请求失败: " + response.code()));
                                return;
                            }

                            ResponseBody responseBody = response.body();
                            if (responseBody == null) {
                                sink.error(new IOException("响应体为空"));
                                return;
                            }

                            // 关键:使用BufferedReader逐行读取流式数据
                            try (BufferedReader reader = new BufferedReader(responseBody.charStream())) {
                                String line;
                                while ((line = reader.readLine()) != null && !sink.isCancelled()) {
                                    String processedData = processLine(line);
                                    if (processedData != null) {
                                        sink.next(processedData); // 实时发送数据
                                    }
                                }
                            }

                            sink.complete();

                        } catch (Exception e) {
                            sink.error(e);
                        }

                    } catch (Exception e) {
                        sink.error(e);
                    }
                })
                .subscribeOn(Schedulers.boundedElastic()) // I/O线程池
                .publishOn(Schedulers.parallel()); // 并行发布
    }

    /**
     * 处理SSE数据行
     */
    private String processLine(String line) {
        if (line == null || line.trim().isEmpty()) {
            return null;
        }

        try {
            // AnythingLLM返回的是标准SSE格式: "data: {json}"
            if (line.startsWith("data: ")) {
                String jsonData = line.substring(6); // 去掉 "data: " 前缀

                if (jsonData.trim().isEmpty()) {
                    return null;
                }

                // 验证JSON格式
                objectMapper.readTree(jsonData);

                return jsonData; // 返回纯JSON,Spring Boot会自动包装成SSE格式

            } else {
                return null; // 忽略非data行
            }

        } catch (Exception e) {
            log.error("处理数据行异常: {}", e.getMessage());
            return null;
        }
    }
}

5. 控制器层实现

@Slf4j
@RestController
@RequestMapping("/api/rag")
public class RagChatController {

    private final RagChatService chatService;

    @Autowired
    public RagChatController(RagChatService chatService) {
        this.chatService = chatService;
    }

    /**
     * 流式聊天接口
     * 关键:produces = MediaType.TEXT_EVENT_STREAM_VALUE
     */
    @PostMapping(value = "/stream-chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamChat(@RequestBody ChatRequest request) {
        log.info("收到流式聊天请求,消息: {}", request.getMessage());

        // 参数验证
        if (!StringUtils.hasText(request.getMessage())) {
            return Flux.error(new IllegalArgumentException("消息不能为空"));
        }

        // 调用服务层,返回响应式流
        return chatService.processStreamChat(request.getMessage())
                .doOnNext(data -> log.debug("发送数据到客户端: {}", data))
                .doOnError(error -> log.error("流式聊天发生异常", error))
                .doOnComplete(() -> log.info("流式聊天完成"))
                .onErrorResume(error -> {
                    // 错误处理,返回错误JSON
                    return Flux.just("{\"error\":\"" + error.getMessage() + 
                                   "\",\"type\":\"abort\",\"close\":true}");
                });
    }
}

关键技术要点

1. 响应式编程的正确使用

❌ 错误做法:

@Async
public void processStreamChat(String message, SseEmitter emitter) {
    // 使用传统的阻塞式编程 + 异步注解
    // 这会导致批量输出,而非真正的流式
}

✅ 正确做法:

public Flux<String> processStreamChat(String message) {
    return Flux.create(sink -> {
        // 响应式流,每读取一行立即通过sink.next()发送
    });
}

2. HTTP客户端选择

我们尝试了三种方案:

方案 优势 劣势 适用场景
RestTemplate 简单易用 阻塞式,不适合流式 非流式场景
WebClient Spring WebFlux生态 需要额外依赖,SSE处理复杂 全栈响应式场景
OkHttp + Reactor 流式处理优秀,无需WebFlux 需额外依赖 流式处理首选

3. 线程调度优化

.subscribeOn(Schedulers.boundedElastic()) // I/O密集型任务
.publishOn(Schedulers.parallel());        // CPU密集型任务

4. SSE数据格式处理

AnythingLLM返回的SSE格式:

data: {"uuid":"xxx","textResponse":"你好","close":false}
data: {"uuid":"xxx","textResponse":"!","close":false}
data: {"uuid":"xxx","textResponse":"","sources":[...],"close":true}

我们的处理策略:

  1. 解析data:前缀
  2. 提取JSON数据
  3. 验证格式
  4. 通过Flux流式返回

前端集成示例

function sendMessage(message) {
    fetch('/api/rag/stream-chat', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
            'Accept': 'text/event-stream',
        },
        body: JSON.stringify({message: message})
    }).then(response => {
        const reader = response.body.getReader();
        const decoder = new TextDecoder();
        let buffer = '';
        
        function readStream() {
            return reader.read().then(({done, value}) => {
                if (done) return;
                
                const chunk = decoder.decode(value, {stream: true});
                buffer += chunk;
                
                const lines = buffer.split('\n');
                buffer = lines.pop() || '';
                
                lines.forEach(line => {
                    if (line.startsWith('data:')) {
                        const jsonData = line.substring(5).trim();
                        if (jsonData) {
                            const data = JSON.parse(jsonData);
                            if (data.textResponse) {
                                // 实时显示AI回复
                                displayText(data.textResponse);
                            }
                        }
                    }
                });
                
                return readStream();
            });
        }
        
        return readStream();
    });
}

性能优化建议

1. 连接池配置

private final OkHttpClient httpClient = new OkHttpClient.Builder()
    .connectionPool(new ConnectionPool(50, 5, TimeUnit.MINUTES))
    .connectTimeout(10, TimeUnit.SECONDS)
    .readTimeout(30, TimeUnit.SECONDS)
    .build();

2. 背压处理

return Flux.create(sink -> {
    // 检查客户端是否已断开
    if (sink.isCancelled()) {
        return;
    }
    // 处理逻辑...
}, FluxSink.OverflowStrategy.BUFFER);

3. 异步任务执行器配置

@Configuration
public class AsyncConfig implements WebMvcConfigurer {
    
    @Bean
    public AsyncTaskExecutor asyncTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.initialize();
        return executor;
    }
    
    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setTaskExecutor(asyncTaskExecutor());
        configurer.setDefaultTimeout(30000);
    }
}

常见问题解决

1. 流式输出变成批量输出

原因: 使用了阻塞式HTTP客户端或错误的响应式编程方式 解决: 使用OkHttp + Flux.create()

2. 中文乱码问题

原因: 字符编码设置不当 解决:

try (BufferedReader reader = new BufferedReader(
    new InputStreamReader(responseBody.byteStream(), StandardCharsets.UTF_8))) {
    // 读取逻辑
}

3. 连接超时问题

原因: 默认超时时间过短 解决: 合理设置超时时间

.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS) // AI响应可能较慢