系统架构文档

双向流式语音对话系统,支持 LLM 纯对话和 Agent 工具调用两种模式。


架构总览

graph LR FE["前端\nBrowser"] SRV["服务器\nFastAPI"] BUS["MessageBus\n会话级消息队列"] ASR["ASR\nWhisper / 阿里云"] LLM["LLM\nOpenAI Compatible"] MCP["MCP 服务器\n外部工具(SSE / stdio)"] INT["内部工具\nrun_command / run_sub_agents"] TTS["TTS 集群\nGPT-SoVITS × N"] SK["Skills\n开放技能生态"] FE <-->|"WebSocket\n(Opus / JSON)"| BUS BUS <--> SRV SRV -->|"PCM"| ASR ASR -->|"text"| SRV SRV <-->|"stream"| LLM SRV <-->|"SSE / stdio"| MCP SRV <-->|"主进程内直接调用"| INT INT -.->|"实时输出"| BUS SRV -->|"HTTP"| TTS TTS -->|"Opus (LB编码)\n或 WAV (直连)"| SRV SK -.->|"SKILL.md\n描述注入"| SRV

核心数据结构

classDiagram class LLMChunk { +str text # TTS用文本(无标签,已剥离voice/意图标签) +str raw_text # 原始输出(含标签:voice/no_tool/need_tool) +bool voice # 是否带<voice>标签(ChatHandler根据use_voice决定是否TTS) +ToolCallInfo tool_call # 工具调用(可选) +dict hook # Hook 注入事件(可选,持久 hook 专用) +bool round_end # 本轮LLM调用的最后一个chunk } class ToolCallInfo { +str name +dict arguments +str result +bool approval_required } class TTSResult { +bool success +str text +str raw_text +bytes audio_data +str audio_format # "wav" 或 "opus" +bool is_end +bool is_text_only +dict tool_call # 工具调用信息(可选) +dict hook # Hook 注入信息(可选) +bool is_round_end # 轮次结束信号 } LLMChunk "1" --> "0..1" ToolCallInfo TTSResult ..> LLMChunk : 由ChatHandler转换

作用域分层

系统中存在三层生命周期截然不同的状态,对应三种资源:

作用域 生命周期 标识符 示例 管理者
请求 一次 WS 消息处理 session_id 审批 / TTS 任务 / 子进程 / 表单 session_manager、5 个 on_cancel 回调
对话 用户"新建对话"到"清空对话" conversation_id Playwright Chrome profile / per-conv MCP subprocess sandbox_registry在 cancel 链上)
全局 服务器进程生命周期 shared MCP 客户端 / LLM client / TTS client / 技能目录 lifespan 管理

前端在 localStorage 里持久化 conversation_id,每条 WS 消息带上。后端 session_manager 拒绝同 conversation_id 的并发 session_id("该对话已有活跃会话")。Sandbox 销毁只发生于 conv_id 轮换后 LRU 驱逐、或服务器 shutdown。


时序图

1. 语音输入 → AI语音回复

sequenceDiagram actor User participant Browser participant WS as WebSocket Server participant Bus as MessageBus participant ASR participant CH as ChatHandler participant LLM as LLMClient participant TTS as TTSClient User->>Browser: 说话(触发VAD) Browser->>Browser: MediaRecorder 录制 Opus Browser->>Browser: 检测到静音 → stop() Browser->>WS: voice_input {session_id, audio_base64, format:opus} Note over WS: 创建 Bus + 消费者 Task WS->>WS: opus_to_pcm()(在 WS 层解码) WS->>CH: handle_voice_chat(audio_array, messages) CH->>ASR: transcribe(audio_array) ASR-->>CH: ASRResult(text) CH->>Bus: bus.push({type:asr_result, text}) Bus->>Browser: WebSocket 推送 Browser->>Browser: 显示用户消息 loop 双层流水线 CH->>LLM: chat_stream(messages) LLM-->>CH: LLMChunk(text, voice=true) CH->>TTS: submit(task_id, text) → Future Note over CH: 并发:LLM产句 & TTS合成同时进行 TTS-->>CH: TTSResult(audio_data) CH->>Bus: 按task_id有序 bus.push() Bus->>Browser: {type:audio, audio:base64, format:opus} Browser->>Browser: audioQueue入队 → 顺序播放 Browser->>User: 播放语音 end CH->>TTS: submit(is_end=True) TTS-->>CH: TTSResult(is_end=True) CH->>Bus: bus.push({type:end}) Bus->>Browser: {type:end} Browser->>Browser: checkAndSaveHistory() Browser->>Browser: 返回监听状态

2. Agent 工具调用流

sequenceDiagram actor User participant Browser participant WS as WebSocket Server participant Bus as MessageBus participant CH as ChatHandler participant AM as ApprovalManager participant AGC as AgentClient participant MCP as MCPClient participant INT as 内部工具<br/>(run_command等) participant LLM as LLM API User->>Browser: 输入请求 Browser->>WS: chat {use_agent:true, messages} Note over WS: 创建 Bus + 消费者 Task Note over WS: selected_client = agent_client<br/>(use_agent 由 WS 决定,通过依赖注入传给 CH) WS->>CH: ChatHandler(llm_client=agent_client, ...) WS->>CH: handle_text_chat(messages) Note over AGC,LLM: ── 第 0 轮:规划轮(不传 tools,意图识别) ── CH->>AGC: chat_stream(messages) AGC->>AGC: full_messages.append(planning_prompt) AGC->>LLM: completions(无 tools,无工具描述) alt LLM 输出 <no_tool>(闲聊/问答) LLM-->>AGC: 流式输出 "<voice>你好呀,有什么...<no_tool>" AGC-->>CH: LLMChunk(voice=true, text="你好呀,有什么...") CH->>Bus: {type:audio} 语音回复(use_voice=true时) AGC-->>CH: LLMChunk(round_end=true) CH->>Bus: {type:round_end} Note over Browser: historySegments 插入 separator AGC->>AGC: 流结束后检测到 <no_tool>,拔出 planning_prompt,清理意图标签,补 assistant + [continue] Note over AGC: 续写轮(_run_no_tool_continuation) AGC->>AGC: inject no_tool_continuation_prompt AGC->>LLM: completions(无 tools) LLM-->>AGC: 流式输出完整回答 AGC-->>CH: LLMChunk(分句 yield) AGC-->>CH: LLMChunk(round_end=true) AGC->>AGC: 拔出 continuation_prompt Bus->>Browser: {type:end} else LLM 输出 <need_tool>(需要工具) LLM-->>AGC: 流式输出 "<voice>好的,我来帮你查看...<need_tool>" AGC-->>CH: LLMChunk(voice=true, text="好的,我来帮你查看...") CH->>Bus: {type:audio} 语音播报计划 AGC-->>CH: LLMChunk(round_end=true) CH->>Bus: {type:round_end} Note over Browser: historySegments 插入 separator AGC->>AGC: 流结束后检测到 <need_tool>,拔出 planning_prompt,清理意图标签,补 assistant + [continue] loop Agent 循环(最多 max_agent_steps 次) AGC->>LLM: completions (带tools定义) LLM-->>AGC: 流式文本 + tool_calls AGC-->>CH: LLMChunk(tool_call: start) CH->>Bus: {type:tool_call, state:start} AGC-->>CH: LLMChunk(tool_call: approval_required) CH->>AM: register(session_id, task_id) CH->>Bus: {type:tool_call, state:approval_required} CH->>CH: create_task(approval_manager.wait(key)) Note over CH: 非阻塞,主循环继续收集 TTS 结果 alt 用户批准 / auto-approve Browser->>WS: POST /api/tool-approve {approved:true} WS->>AM: respond(key, True) → Event.set() else 用户拒绝 Browser->>WS: POST /api/tool-approve {approved:false} WS->>AM: cancel() → Event.set() CH->>Bus: {type:tool_rejected} CH->>Bus: {type:interrupted} Note over CH: 中断会话 end alt 内部工具 (run_command, run_sub_agents 等) AGC->>INT: handler(arguments, session_id) Note over INT: 主进程内直接执行<br/>可实时推送输出到 Bus INT-->>AGC: result else MCP 工具 AGC->>MCP: call_tool(name, arguments) MCP-->>AGC: result end AGC->>LLM: 追加 tool 结果,继续循环 LLM-->>AGC: 流式最终回复文本 AGC-->>CH: LLMChunk(voice=true, "结果是...") CH->>Bus: {type:audio} 语音播报结果 AGC-->>CH: LLMChunk(round_end=true) CH->>Bus: {type:round_end} end Bus->>Browser: {type:end} end

3. 用户中断流

sequenceDiagram actor User participant Browser participant WS as WebSocket Server participant SM as SessionManager participant AM as ApprovalManager participant CH as ChatHandler participant LLM as LLMClient / AgentClient participant TTS as TTSClient Worker Note over Browser,TTS: 正在进行对话(LLM流式 + TTS合成中) User->>Browser: 点击"中断"按钮 Browser->>Browser: 进入「中断中…」状态(interrupting=true,按钮灰红禁用) Browser->>WS: POST /api/interrupt {session_id} Note over Browser: 15 秒超时兜底;期间音频继续正常播放消费 WS->>SM: was_active = cancel(session_id) SM->>SM: existed = session_id in _sessions<br/>sessions[id].cancelled = True(若 existed) SM->>AM: on_cancel 回调 → cancel_session(session_id) AM->>AM: 唤醒所有 pending Event (approved=False) Note over SM: 其他回调: TTSClient 清零计数,<br/>ProcessManager SIGTERM 子Agent,<br/>CommandManager SIGTERM run_command,<br/>FormManager 取消 pending 表单 WS-->>Browser: {success, was_active} alt was_active == false (后端 handler 已结束) Note over Browser: 不会再推 interrupted,清 15s timeout<br/>立即 resetSession() + 显示「已中断对话」 end Note over CH,LLM: 各层在下一次轮询时检测到中断 LLM->>SM: is_cancelled(session_id) → True LLM-->>LLM: 退出流式输出循环 (async for chunk 中断) Note over CH: approval_wait_task.done() 返回 False → 中断 CH->>SM: is_cancelled(session_id) → True CH-->>CH: 退出主循环 TTS->>SM: is_cancelled(session_id) → True TTS-->>TTS: Worker 跳过剩余 TTS 任务 alt active_tool_info 有值(审批或执行阶段) CH->>CH: 读取 bus.tool_output_cache 作为 partial_result WS->>Browser: {type:tool_rejected, reason: ..., partial_result: ...} Note over Browser: 消息过滤器放行 tool_rejected<br/>session guard 检查 session_id<br/>匹配则写入 historySegments(含 partial_result) end WS->>Browser: {type:interrupted} Note over Browser: 消息过滤器放行 interrupted<br/>session guard 检查 session_id Browser->>Browser: resetSession():停止音频播放 (audioPlayer.pause) Browser->>Browser: 清空 audioQueue,重置 approvalPendingTaskId Browser->>Browser: 保存正在播放内容到 historySegments Browser->>Browser: flushHistorySegments() 落盘到 conversationHistory Browser->>Browser: 状态机归零 + 恢复 UI + 显示系统消息 Browser->>Browser: 返回监听/输入

注:ChatHandler 不再直接操作 WebSocket,所有推送经 SessionMessageBus 的 FIFO 队列。中断时 ChatHandler 先推 tool_rejected(如果有活跃工具,reason 取决于 approval_pending;执行阶段的中断还会附带 partial_result,从 bus.tool_output_cache 读取并剥离 _id 字段),再推 interrupted。前端消息过滤器对这两种类型放行(不丢弃),但各自的处理器会先做 session guard 检查(currentSessionId !== data.session_id),若新会话已开始则丢弃/跳过,防止旧会话收尾消息污染新会话状态。

前端两阶段中断:用户点击中断按钮后,前端进入「中断中…」中间状态(interrupting=true,按钮灰红禁用,阻止新消息发送),期间音频继续正常播放消费。直到后端返回 interrupted 事件后,才触发 resetSession():停止音频、清空队列、将 tool_rejected 等 segment 落盘到 conversationHistory、状态归零、恢复 UI。设有 15 秒超时兜底,防止后端异常时前端卡死。这保证了内部工具的部分结果缓存(tool_output_cache)不会因为前端过早发送新消息而被丢弃。语音模式下「中断中…」同步反映到 voice 状态文本(红色禁用样式,最高优先级覆盖 playingStarted/asrResult 等并发状态回调)。

was_active 短路收尾/api/interrupt 响应中 was_active=false 表示后端已无此会话(handler 已结束,仅前端还在播残留音频),此时后端不会再推 interrupted。前端检测到后清 15s timeout,立即 resetSession() 并显示「已中断对话」,避免卡死在「中断中…」状态 15 秒。

session_manager.cancel() 的回调链包含 5 个回调:ApprovalManager(唤醒审批)、TTSClient(清零队列计数)、ProcessManager(SIGTERM 子 Agent 进程)、CommandManager(SIGTERM run_command 子进程)、FormManager(取消 pending 表单 future 并 push form_cancel)。详见 CANCEL_FLOW.mdSandboxRegistry 故意不在此链上——Sandbox 是对话作用域(跨多 session),用户中断后浏览器登录态等应继续保留。

前端系统消息:用户点击「中断」→ interrupted 处理器中显示 已中断对话;用户点击「拒绝」→ 显示 已中断会话;审批超时 → interrupted 处理器显示 工具审批超时,已自动拒绝调用并中断会话。用户主动操作时设置 cancelledSessionId,后续 interrupted 经过滤器放行后 userInitiatedInterrupt=true,不重复显示。

LLM 层中断检查点(轮询式,非实时):

检查位置 组件 说明
规划轮流式循环 AgentClient 每个 stream chunk 前检查
Agent 循环入口 AgentClient 每轮开始前检查
Agent 流式循环 AgentClient 每个 stream chunk 前检查
工具执行前 AgentClient 每个工具调用前检查
审批等待后 AgentClient 审批返回后检查是否被取消
LLM 流式循环 LLMClient 每个 stream chunk 前检查(非Agent模式)

4. TTS 负载均衡 + 并发控制

sequenceDiagram participant CH as ChatHandler participant TC as TTSClient participant SEM as Semaphore(3) participant Q as GlobalQueue participant W1 as Worker-1 participant W2 as Worker-2 participant LB as LoadBalancer:9880 participant B1 as TTS后端:9881 participant B2 as TTS后端:9882 CH->>TC: submit(task_id=0, text="句子A") TC->>Q: put {task_id:0, future_A} CH->>TC: submit(task_id=1, text="句子B") TC->>Q: put {task_id:1, future_B} CH->>TC: submit(task_id=2, text="句子C") TC->>Q: put {task_id:2, future_C} W1->>Q: get → task_id=0 W2->>Q: get → task_id=1 W1->>SEM: acquire(已有0个) W2->>SEM: acquire(已有1个) W1->>LB: GET /tts?text=句子A LB->>B1: Round-robin → :9881 W2->>LB: GET /tts?text=句子B LB->>B2: Round-robin → :9882 B1-->>LB: WAV LB-->>LB: wav_to_opus() 编码 LB-->>W1: Opus (audio/webm) W1->>SEM: release W1-->>CH: future_A.set_result(TTSResult, format=opus) B2-->>LB: WAV LB-->>W2: Opus (audio/webm) W2->>SEM: release W2-->>CH: future_B.set_result(TTSResult, format=opus) Note over CH: next_id=0 → 先推送A,再推送B(保序) Note over CH: audio_format=opus → 直接透传,跳过二次编码

队列深度兜底降级

GPT-SoVITS 后端串行处理请求,当TTS模型变慢时 global_queue 会无限堆积,导致雪崩式超时。TTSClient.submit() 在入队前检查 active_depth(所有活跃会话的待处理计数之和),超过 max_queue_depth(默认 num_workers × 20)时直接返回 TTSResult(is_text_only=True),前端只显示文字不播放语音。

  • _pending_counts 字典:按 session_id 统计待处理任务数。Worker 取出任务时递减,计数归零时删除 key,避免内存泄漏。
  • cancel 回调session_manager.on_cancel 注册 _on_session_cancel,会话中断时立即清零该 session 的计数,防止已中断会话的残留任务虚占深度导致新会话被误降级。
  • Semaphore 内二次检查:Worker 在获取 semaphore 后、发起 HTTP 请求前再次检查 is_cancelled,避免已中断会话的任务浪费并发槽位。
  • 不重试_do_tts() 失败后不重试,避免慢请求长时间占用 semaphore。

5. 多模态(图片附件)

用户可通过前端附件面板上传图片(点击/拖拽/粘贴),图片以 base64 data URI 内联在消息中发送。

消息格式

content 字段支持两种格式:

// 纯文本(传统格式)
{ "role": "user", "content": "你好" }

// 多模态(图片 + 文本)
{ "role": "user", "content": [
    { "type": "text", "text": "这张图是什么?" },
    { "type": "image_url", "image_url": { "url": "data:image/png;base64,..." } }
]}

处理流程

sequenceDiagram participant FE as 前端 participant WS as WebSocket Server participant LC as LLMClient / AgentClient participant API as LLM API FE->>WS: content: [text, image_url] WS->>WS: validate_attachments()<br/>(MIME白名单 + 10MB上限) alt 校验失败 WS-->>FE: {type:error, error:"不支持的图片格式"} else 校验通过 WS->>LC: messages(含多模态 content) LC->>LC: _build_messages() LC->>LC: sanitize_messages()<br/>(兼容 list content) alt multimodal = True LC->>API: 原样发送(保留 image_url) else multimodal = False LC->>LC: strip_images()<br/>(移除 image_url,退化为字符串) LC->>API: 发送纯文本 messages end API-->>LC: 流式响应 LC-->>WS: LLMChunk WS-->>FE: 文本 / 音频 end

Provider 多模态能力

llm/providers.py 中每个 provider 配置 "multimodal": True/False

模型 multimodal 说明
claude-sonnet-4-6, claude-haiku-4-5 True 原生支持图片
kimi-k2.5 True 原生支持图片
glm-5, qwen3.5-plus, deepseek-v3.2 False 不支持,静默过滤

不支持多模态的模型在 _build_messages() 中调用 strip_images() 静默移除 image_url 项,list content 退化回字符串,前端和用户无感知。


6. Skills 技能系统

支持开放 skill 生态(兼容 Codex 模式),通过读取 SKILL.md 的 frontmatter 摘要注入 system prompt,LLM 自行遵循技能指引。Skill 可通过 hooks/ 目录声明式注入提示词到 Agent 流程的 5 个检查点(详见 HOOK_SYSTEM.md)。

架构

skills/
├── self-improving-agent-3.0.4/
│   ├── _meta.json          # version 等元数据
│   ├── SKILL.md            # frontmatter (name, description) + 完整说明
│   └── ...
├── find-skills-0.1.0/
│   ├── _meta.json
│   ├── SKILL.md
│   └── ...

关键组件

文件 职责
skills_engine/skills_config.py SkillInfo 数据类 + scan_skills() 扫描 skills/ 目录读取 frontmatter
skills_engine/skills_context_manager.py SkillsContextManager:聚合启用 skill 的描述文本,生成注入用的 system 块
config.py LLMConfig.skills_prompt_suffix:Skills 注入的提示词前缀

数据流

sequenceDiagram participant FE as 前端 participant WS as WebSocket Server participant SCM as SkillsContextManager participant LLM as LLM API FE->>WS: {enabled_skills: ["fabric-query"], use_agent: true} WS->>SCM: SkillsContextManager(enabled_skills, suffix) SCM->>SCM: scan_skills() → 读取 SKILL.md frontmatter SCM-->>WS: get_session_prompt() → 聚合文本 WS->>WS: messages.insert(1, {role: "system", content: skills_prompt}) WS->>LLM: messages(含 skills system 块) LLM-->>WS: 流式响应(LLM 自行遵循 skill 指引)

注入仅在 Agent 模式下生效(use_agent=True)。前端通过 GET /api/skills 获取可用列表,用户选择的 skill 持久化到 localStorage。

上下文截断白名单

MCP 工具 get_skill 返回 skill 的完整目录结构和 SKILL.md 内容。前端 buildMessagesForSend() 中,NO_TRUNCATE_TOOLS 白名单内的工具结果不做截断,保证 LLM 能看到完整的 skill 说明。