系统架构文档
双向流式语音对话系统,支持 LLM 纯对话和 Agent 工具调用两种模式。
架构总览
graph LR
FE["前端\nBrowser"]
SRV["服务器\nFastAPI"]
BUS["MessageBus\n会话级消息队列"]
ASR["ASR\nWhisper / 阿里云"]
LLM["LLM\nOpenAI Compatible"]
MCP["MCP 服务器\n外部工具(SSE / stdio)"]
INT["内部工具\nrun_command / run_sub_agents"]
TTS["TTS 集群\nGPT-SoVITS × N"]
SK["Skills\n开放技能生态"]
FE <-->|"WebSocket\n(Opus / JSON)"| BUS
BUS <--> SRV
SRV -->|"PCM"| ASR
ASR -->|"text"| SRV
SRV <-->|"stream"| LLM
SRV <-->|"SSE / stdio"| MCP
SRV <-->|"主进程内直接调用"| INT
INT -.->|"实时输出"| BUS
SRV -->|"HTTP"| TTS
TTS -->|"Opus (LB编码)\n或 WAV (直连)"| SRV
SK -.->|"SKILL.md\n描述注入"| SRV
核心数据结构
classDiagram
class LLMChunk {
+str text # TTS用文本(无标签,已剥离voice/意图标签)
+str raw_text # 原始输出(含标签:voice/no_tool/need_tool)
+bool voice # 是否带<voice>标签(ChatHandler根据use_voice决定是否TTS)
+ToolCallInfo tool_call # 工具调用(可选)
+dict hook # Hook 注入事件(可选,持久 hook 专用)
+bool round_end # 本轮LLM调用的最后一个chunk
}
class ToolCallInfo {
+str name
+dict arguments
+str result
+bool approval_required
}
class TTSResult {
+bool success
+str text
+str raw_text
+bytes audio_data
+str audio_format # "wav" 或 "opus"
+bool is_end
+bool is_text_only
+dict tool_call # 工具调用信息(可选)
+dict hook # Hook 注入信息(可选)
+bool is_round_end # 轮次结束信号
}
LLMChunk "1" --> "0..1" ToolCallInfo
TTSResult ..> LLMChunk : 由ChatHandler转换
作用域分层
系统中存在三层生命周期截然不同的状态,对应三种资源:
| 作用域 |
生命周期 |
标识符 |
示例 |
管理者 |
| 请求 |
一次 WS 消息处理 |
session_id |
审批 / TTS 任务 / 子进程 / 表单 |
session_manager、5 个 on_cancel 回调 |
| 对话 |
用户"新建对话"到"清空对话" |
conversation_id |
Playwright Chrome profile / per-conv MCP subprocess |
sandbox_registry(不在 cancel 链上) |
| 全局 |
服务器进程生命周期 |
— |
shared MCP 客户端 / LLM client / TTS client / 技能目录 |
lifespan 管理 |
前端在 localStorage 里持久化 conversation_id,每条 WS 消息带上。后端 session_manager 拒绝同 conversation_id 的并发 session_id("该对话已有活跃会话")。Sandbox 销毁只发生于 conv_id 轮换后 LRU 驱逐、或服务器 shutdown。
时序图
1. 语音输入 → AI语音回复
sequenceDiagram
actor User
participant Browser
participant WS as WebSocket Server
participant Bus as MessageBus
participant ASR
participant CH as ChatHandler
participant LLM as LLMClient
participant TTS as TTSClient
User->>Browser: 说话(触发VAD)
Browser->>Browser: MediaRecorder 录制 Opus
Browser->>Browser: 检测到静音 → stop()
Browser->>WS: voice_input {session_id, audio_base64, format:opus}
Note over WS: 创建 Bus + 消费者 Task
WS->>WS: opus_to_pcm()(在 WS 层解码)
WS->>CH: handle_voice_chat(audio_array, messages)
CH->>ASR: transcribe(audio_array)
ASR-->>CH: ASRResult(text)
CH->>Bus: bus.push({type:asr_result, text})
Bus->>Browser: WebSocket 推送
Browser->>Browser: 显示用户消息
loop 双层流水线
CH->>LLM: chat_stream(messages)
LLM-->>CH: LLMChunk(text, voice=true)
CH->>TTS: submit(task_id, text) → Future
Note over CH: 并发:LLM产句 & TTS合成同时进行
TTS-->>CH: TTSResult(audio_data)
CH->>Bus: 按task_id有序 bus.push()
Bus->>Browser: {type:audio, audio:base64, format:opus}
Browser->>Browser: audioQueue入队 → 顺序播放
Browser->>User: 播放语音
end
CH->>TTS: submit(is_end=True)
TTS-->>CH: TTSResult(is_end=True)
CH->>Bus: bus.push({type:end})
Bus->>Browser: {type:end}
Browser->>Browser: checkAndSaveHistory()
Browser->>Browser: 返回监听状态
2. Agent 工具调用流
sequenceDiagram
actor User
participant Browser
participant WS as WebSocket Server
participant Bus as MessageBus
participant CH as ChatHandler
participant AM as ApprovalManager
participant AGC as AgentClient
participant MCP as MCPClient
participant INT as 内部工具<br/>(run_command等)
participant LLM as LLM API
User->>Browser: 输入请求
Browser->>WS: chat {use_agent:true, messages}
Note over WS: 创建 Bus + 消费者 Task
Note over WS: selected_client = agent_client<br/>(use_agent 由 WS 决定,通过依赖注入传给 CH)
WS->>CH: ChatHandler(llm_client=agent_client, ...)
WS->>CH: handle_text_chat(messages)
Note over AGC,LLM: ── 第 0 轮:规划轮(不传 tools,意图识别) ──
CH->>AGC: chat_stream(messages)
AGC->>AGC: full_messages.append(planning_prompt)
AGC->>LLM: completions(无 tools,无工具描述)
alt LLM 输出 <no_tool>(闲聊/问答)
LLM-->>AGC: 流式输出 "<voice>你好呀,有什么...<no_tool>"
AGC-->>CH: LLMChunk(voice=true, text="你好呀,有什么...")
CH->>Bus: {type:audio} 语音回复(use_voice=true时)
AGC-->>CH: LLMChunk(round_end=true)
CH->>Bus: {type:round_end}
Note over Browser: historySegments 插入 separator
AGC->>AGC: 流结束后检测到 <no_tool>,拔出 planning_prompt,清理意图标签,补 assistant + [continue]
Note over AGC: 续写轮(_run_no_tool_continuation)
AGC->>AGC: inject no_tool_continuation_prompt
AGC->>LLM: completions(无 tools)
LLM-->>AGC: 流式输出完整回答
AGC-->>CH: LLMChunk(分句 yield)
AGC-->>CH: LLMChunk(round_end=true)
AGC->>AGC: 拔出 continuation_prompt
Bus->>Browser: {type:end}
else LLM 输出 <need_tool>(需要工具)
LLM-->>AGC: 流式输出 "<voice>好的,我来帮你查看...<need_tool>"
AGC-->>CH: LLMChunk(voice=true, text="好的,我来帮你查看...")
CH->>Bus: {type:audio} 语音播报计划
AGC-->>CH: LLMChunk(round_end=true)
CH->>Bus: {type:round_end}
Note over Browser: historySegments 插入 separator
AGC->>AGC: 流结束后检测到 <need_tool>,拔出 planning_prompt,清理意图标签,补 assistant + [continue]
loop Agent 循环(最多 max_agent_steps 次)
AGC->>LLM: completions (带tools定义)
LLM-->>AGC: 流式文本 + tool_calls
AGC-->>CH: LLMChunk(tool_call: start)
CH->>Bus: {type:tool_call, state:start}
AGC-->>CH: LLMChunk(tool_call: approval_required)
CH->>AM: register(session_id, task_id)
CH->>Bus: {type:tool_call, state:approval_required}
CH->>CH: create_task(approval_manager.wait(key))
Note over CH: 非阻塞,主循环继续收集 TTS 结果
alt 用户批准 / auto-approve
Browser->>WS: POST /api/tool-approve {approved:true}
WS->>AM: respond(key, True) → Event.set()
else 用户拒绝
Browser->>WS: POST /api/tool-approve {approved:false}
WS->>AM: cancel() → Event.set()
CH->>Bus: {type:tool_rejected}
CH->>Bus: {type:interrupted}
Note over CH: 中断会话
end
alt 内部工具 (run_command, run_sub_agents 等)
AGC->>INT: handler(arguments, session_id)
Note over INT: 主进程内直接执行<br/>可实时推送输出到 Bus
INT-->>AGC: result
else MCP 工具
AGC->>MCP: call_tool(name, arguments)
MCP-->>AGC: result
end
AGC->>LLM: 追加 tool 结果,继续循环
LLM-->>AGC: 流式最终回复文本
AGC-->>CH: LLMChunk(voice=true, "结果是...")
CH->>Bus: {type:audio} 语音播报结果
AGC-->>CH: LLMChunk(round_end=true)
CH->>Bus: {type:round_end}
end
Bus->>Browser: {type:end}
end
3. 用户中断流
sequenceDiagram
actor User
participant Browser
participant WS as WebSocket Server
participant SM as SessionManager
participant AM as ApprovalManager
participant CH as ChatHandler
participant LLM as LLMClient / AgentClient
participant TTS as TTSClient Worker
Note over Browser,TTS: 正在进行对话(LLM流式 + TTS合成中)
User->>Browser: 点击"中断"按钮
Browser->>Browser: 进入「中断中…」状态(interrupting=true,按钮灰红禁用)
Browser->>WS: POST /api/interrupt {session_id}
Note over Browser: 15 秒超时兜底;期间音频继续正常播放消费
WS->>SM: was_active = cancel(session_id)
SM->>SM: existed = session_id in _sessions<br/>sessions[id].cancelled = True(若 existed)
SM->>AM: on_cancel 回调 → cancel_session(session_id)
AM->>AM: 唤醒所有 pending Event (approved=False)
Note over SM: 其他回调: TTSClient 清零计数,<br/>ProcessManager SIGTERM 子Agent,<br/>CommandManager SIGTERM run_command,<br/>FormManager 取消 pending 表单
WS-->>Browser: {success, was_active}
alt was_active == false (后端 handler 已结束)
Note over Browser: 不会再推 interrupted,清 15s timeout<br/>立即 resetSession() + 显示「已中断对话」
end
Note over CH,LLM: 各层在下一次轮询时检测到中断
LLM->>SM: is_cancelled(session_id) → True
LLM-->>LLM: 退出流式输出循环 (async for chunk 中断)
Note over CH: approval_wait_task.done() 返回 False → 中断
CH->>SM: is_cancelled(session_id) → True
CH-->>CH: 退出主循环
TTS->>SM: is_cancelled(session_id) → True
TTS-->>TTS: Worker 跳过剩余 TTS 任务
alt active_tool_info 有值(审批或执行阶段)
CH->>CH: 读取 bus.tool_output_cache 作为 partial_result
WS->>Browser: {type:tool_rejected, reason: ..., partial_result: ...}
Note over Browser: 消息过滤器放行 tool_rejected<br/>session guard 检查 session_id<br/>匹配则写入 historySegments(含 partial_result)
end
WS->>Browser: {type:interrupted}
Note over Browser: 消息过滤器放行 interrupted<br/>session guard 检查 session_id
Browser->>Browser: resetSession():停止音频播放 (audioPlayer.pause)
Browser->>Browser: 清空 audioQueue,重置 approvalPendingTaskId
Browser->>Browser: 保存正在播放内容到 historySegments
Browser->>Browser: flushHistorySegments() 落盘到 conversationHistory
Browser->>Browser: 状态机归零 + 恢复 UI + 显示系统消息
Browser->>Browser: 返回监听/输入注:ChatHandler 不再直接操作 WebSocket,所有推送经 SessionMessageBus 的 FIFO 队列。中断时 ChatHandler 先推 tool_rejected(如果有活跃工具,reason 取决于 approval_pending;执行阶段的中断还会附带 partial_result,从 bus.tool_output_cache 读取并剥离 _id 字段),再推 interrupted。前端消息过滤器对这两种类型放行(不丢弃),但各自的处理器会先做 session guard 检查(currentSessionId !== data.session_id),若新会话已开始则丢弃/跳过,防止旧会话收尾消息污染新会话状态。
前端两阶段中断:用户点击中断按钮后,前端进入「中断中…」中间状态(interrupting=true,按钮灰红禁用,阻止新消息发送),期间音频继续正常播放消费。直到后端返回 interrupted 事件后,才触发 resetSession():停止音频、清空队列、将 tool_rejected 等 segment 落盘到 conversationHistory、状态归零、恢复 UI。设有 15 秒超时兜底,防止后端异常时前端卡死。这保证了内部工具的部分结果缓存(tool_output_cache)不会因为前端过早发送新消息而被丢弃。语音模式下「中断中…」同步反映到 voice 状态文本(红色禁用样式,最高优先级覆盖 playingStarted/asrResult 等并发状态回调)。
was_active 短路收尾:/api/interrupt 响应中 was_active=false 表示后端已无此会话(handler 已结束,仅前端还在播残留音频),此时后端不会再推 interrupted。前端检测到后清 15s timeout,立即 resetSession() 并显示「已中断对话」,避免卡死在「中断中…」状态 15 秒。
session_manager.cancel() 的回调链包含 5 个回调:ApprovalManager(唤醒审批)、TTSClient(清零队列计数)、ProcessManager(SIGTERM 子 Agent 进程)、CommandManager(SIGTERM run_command 子进程)、FormManager(取消 pending 表单 future 并 push form_cancel)。详见 CANCEL_FLOW.md。SandboxRegistry 故意不在此链上——Sandbox 是对话作用域(跨多 session),用户中断后浏览器登录态等应继续保留。
前端系统消息:用户点击「中断」→ interrupted 处理器中显示 已中断对话;用户点击「拒绝」→ 显示 已中断会话;审批超时 → interrupted 处理器显示 工具审批超时,已自动拒绝调用并中断会话。用户主动操作时设置 cancelledSessionId,后续 interrupted 经过滤器放行后 userInitiatedInterrupt=true,不重复显示。
LLM 层中断检查点(轮询式,非实时):
| 检查位置 |
组件 |
说明 |
| 规划轮流式循环 |
AgentClient |
每个 stream chunk 前检查 |
| Agent 循环入口 |
AgentClient |
每轮开始前检查 |
| Agent 流式循环 |
AgentClient |
每个 stream chunk 前检查 |
| 工具执行前 |
AgentClient |
每个工具调用前检查 |
| 审批等待后 |
AgentClient |
审批返回后检查是否被取消 |
| LLM 流式循环 |
LLMClient |
每个 stream chunk 前检查(非Agent模式) |
4. TTS 负载均衡 + 并发控制
sequenceDiagram
participant CH as ChatHandler
participant TC as TTSClient
participant SEM as Semaphore(3)
participant Q as GlobalQueue
participant W1 as Worker-1
participant W2 as Worker-2
participant LB as LoadBalancer:9880
participant B1 as TTS后端:9881
participant B2 as TTS后端:9882
CH->>TC: submit(task_id=0, text="句子A")
TC->>Q: put {task_id:0, future_A}
CH->>TC: submit(task_id=1, text="句子B")
TC->>Q: put {task_id:1, future_B}
CH->>TC: submit(task_id=2, text="句子C")
TC->>Q: put {task_id:2, future_C}
W1->>Q: get → task_id=0
W2->>Q: get → task_id=1
W1->>SEM: acquire(已有0个)
W2->>SEM: acquire(已有1个)
W1->>LB: GET /tts?text=句子A
LB->>B1: Round-robin → :9881
W2->>LB: GET /tts?text=句子B
LB->>B2: Round-robin → :9882
B1-->>LB: WAV
LB-->>LB: wav_to_opus() 编码
LB-->>W1: Opus (audio/webm)
W1->>SEM: release
W1-->>CH: future_A.set_result(TTSResult, format=opus)
B2-->>LB: WAV
LB-->>W2: Opus (audio/webm)
W2->>SEM: release
W2-->>CH: future_B.set_result(TTSResult, format=opus)
Note over CH: next_id=0 → 先推送A,再推送B(保序)
Note over CH: audio_format=opus → 直接透传,跳过二次编码队列深度兜底降级
GPT-SoVITS 后端串行处理请求,当TTS模型变慢时 global_queue 会无限堆积,导致雪崩式超时。TTSClient.submit() 在入队前检查 active_depth(所有活跃会话的待处理计数之和),超过 max_queue_depth(默认 num_workers × 20)时直接返回 TTSResult(is_text_only=True),前端只显示文字不播放语音。
_pending_counts 字典:按 session_id 统计待处理任务数。Worker 取出任务时递减,计数归零时删除 key,避免内存泄漏。
- cancel 回调:
session_manager.on_cancel 注册 _on_session_cancel,会话中断时立即清零该 session 的计数,防止已中断会话的残留任务虚占深度导致新会话被误降级。
- Semaphore 内二次检查:Worker 在获取 semaphore 后、发起 HTTP 请求前再次检查
is_cancelled,避免已中断会话的任务浪费并发槽位。
- 不重试:
_do_tts() 失败后不重试,避免慢请求长时间占用 semaphore。
5. 多模态(图片附件)
用户可通过前端附件面板上传图片(点击/拖拽/粘贴),图片以 base64 data URI 内联在消息中发送。
消息格式
content 字段支持两种格式:
// 纯文本(传统格式)
{ "role": "user", "content": "你好" }
// 多模态(图片 + 文本)
{ "role": "user", "content": [
{ "type": "text", "text": "这张图是什么?" },
{ "type": "image_url", "image_url": { "url": "data:image/png;base64,..." } }
]}
处理流程
sequenceDiagram
participant FE as 前端
participant WS as WebSocket Server
participant LC as LLMClient / AgentClient
participant API as LLM API
FE->>WS: content: [text, image_url]
WS->>WS: validate_attachments()<br/>(MIME白名单 + 10MB上限)
alt 校验失败
WS-->>FE: {type:error, error:"不支持的图片格式"}
else 校验通过
WS->>LC: messages(含多模态 content)
LC->>LC: _build_messages()
LC->>LC: sanitize_messages()<br/>(兼容 list content)
alt multimodal = True
LC->>API: 原样发送(保留 image_url)
else multimodal = False
LC->>LC: strip_images()<br/>(移除 image_url,退化为字符串)
LC->>API: 发送纯文本 messages
end
API-->>LC: 流式响应
LC-->>WS: LLMChunk
WS-->>FE: 文本 / 音频
endProvider 多模态能力
llm/providers.py 中每个 provider 配置 "multimodal": True/False:
| 模型 |
multimodal |
说明 |
| claude-sonnet-4-6, claude-haiku-4-5 |
True |
原生支持图片 |
| kimi-k2.5 |
True |
原生支持图片 |
| glm-5, qwen3.5-plus, deepseek-v3.2 |
False |
不支持,静默过滤 |
不支持多模态的模型在 _build_messages() 中调用 strip_images() 静默移除 image_url 项,list content 退化回字符串,前端和用户无感知。
6. Skills 技能系统
支持开放 skill 生态(兼容 Codex 模式),通过读取 SKILL.md 的 frontmatter 摘要注入 system prompt,LLM 自行遵循技能指引。Skill 可通过 hooks/ 目录声明式注入提示词到 Agent 流程的 5 个检查点(详见 HOOK_SYSTEM.md)。
架构
skills/
├── self-improving-agent-3.0.4/
│ ├── _meta.json # version 等元数据
│ ├── SKILL.md # frontmatter (name, description) + 完整说明
│ └── ...
├── find-skills-0.1.0/
│ ├── _meta.json
│ ├── SKILL.md
│ └── ...
关键组件
| 文件 |
职责 |
skills_engine/skills_config.py |
SkillInfo 数据类 + scan_skills() 扫描 skills/ 目录读取 frontmatter |
skills_engine/skills_context_manager.py |
SkillsContextManager:聚合启用 skill 的描述文本,生成注入用的 system 块 |
config.py |
LLMConfig.skills_prompt_suffix:Skills 注入的提示词前缀 |
数据流
sequenceDiagram
participant FE as 前端
participant WS as WebSocket Server
participant SCM as SkillsContextManager
participant LLM as LLM API
FE->>WS: {enabled_skills: ["fabric-query"], use_agent: true}
WS->>SCM: SkillsContextManager(enabled_skills, suffix)
SCM->>SCM: scan_skills() → 读取 SKILL.md frontmatter
SCM-->>WS: get_session_prompt() → 聚合文本
WS->>WS: messages.insert(1, {role: "system", content: skills_prompt})
WS->>LLM: messages(含 skills system 块)
LLM-->>WS: 流式响应(LLM 自行遵循 skill 指引)注入仅在 Agent 模式下生效(use_agent=True)。前端通过 GET /api/skills 获取可用列表,用户选择的 skill 持久化到 localStorage。
上下文截断白名单
MCP 工具 get_skill 返回 skill 的完整目录结构和 SKILL.md 内容。前端 buildMessagesForSend() 中,NO_TRUNCATE_TOOLS 白名单内的工具结果不做截断,保证 LLM 能看到完整的 skill 说明。
评论区