本文从最简单的 LLM 纯对话模式入手,逐步引入 Agent 工具调用和子 Agent 进程管理,完整解析系统中断(Cancel)的实现原理。
1. LLM 模式中断:最简单的情况
1.1 LLM 模式的流式调用
在 LLM 模式下,系统只有两个核心组件参与:
- ChatHandler:主循环,负责驱动 LLM 流式输出和推送消息
- LLMClient:对 OpenAI API 的薄封装,
chat_stream()是一个 async generator
请求处理流程很直白:
前端 → WebSocket → ChatHandler.handle_chat()
↓
llm_iterator = llm_client.chat_stream(messages, session_id)
↓
主循环:每 10ms 一轮
├── llm_next_task = create_task(llm_iterator.__anext__())
├── 等待 llm_next_task 完成 → 拿到 LLMChunk
├── 提交 TTS / 推送文本
└── 按序推送结果到前端
整个过程只有一条 await 链,没有子进程,没有工具调用。
1.2 LLM 模式的中断路径
当用户点击中断按钮:
整个 LLM 模式的中断就三步:
| 步骤 | 动作 | 效果 |
|---|---|---|
| 1 | cancel() 设置布尔标志 |
标记会话为已中断 |
| 2 | ChatHandler 轮询 is_cancelled → llm_next_task.cancel() |
注入 CancelledError 到 generator 的当前 await 点 |
| 3 | CancelledError 传播到 async for chunk in stream |
HTTP 连接关闭,OpenAI 停止生成,节省 token |
这里有两个关键点需要理解:
CancelledError 从哪来? 不是 SessionManager 产生的。是 ChatHandler 调用 llm_next_task.cancel() 产生的。asyncio.Task.cancel() 的原理是:向 Task 当前挂起的 await 点注入一个 CancelledError,就像在那个 await 位置凭空 raise CancelledError() 一样。
为什么能省 token? LLMClient 的 chat_stream() 内部是 async for chunk in stream,CancelledError 会打断这个 await,底层的 HTTP 连接随之关闭。OpenAI 服务端检测到客户端断连后会停止生成 token,不再计费。
1.3 LLM 模式中断的协程层次
websocket_chat() [协程 - FastAPI ASGI 框架驱动]
├── bus.run(websocket) [Task A - 独立消费队列推送 WebSocket]
└── ChatHandler.handle_chat() [协程 - websocket_chat 内 await]
└── llm_next_task [Task B - 包装 generator.__anext__()]
└── LLMClient.chat_stream() [async generator]
└── async for chunk in stream (← CancelledError 注入点)
└── HTTP 连接 → OpenAI API
中断时 CancelledError 的传播:llm_next_task.cancel() → async for chunk in stream → HTTP 断开 → Task 状态变为 CANCELLED → ChatHandler 捕获 except CancelledError: pass → 推送 interrupted → break。
非常简洁。但当引入工具调用后,事情变得复杂。
1.4 Voice 模式中断:ASR 阶段的特殊性
文本对话 / Agent 模式的 cancel 都从 ChatHandler 主循环出发——它在 await llm_next_task 上轮询 is_cancelled 然后注入 CancelledError。但语音模式多了一个 ASR 阶段,发生在 ChatHandler.handle_chat 被调用之前:用户开口 → 前端持续推 PCM 帧 → FunASR 服务端按 max_sentence_silence=500ms 切句、累积多个 _accumulated → 前端 1.3s 静音判定 turn-end 发 voice_stop → 后端 _ensure_stopped(timeout=0.5) flush 剩余 sentence_end → 这时才把累积全文 append 到 messages 调 handle_chat。
整个 ASR 阶段由 VoiceTurn._run(独立 task)跑,分两段阻塞:
# 段 1:等 turn-end(前端 voice_stop) / cancel
done, pending = await asyncio.wait([
asyncio.create_task(self._user_stopped.wait()), # 前端 voice_stop → on_user_stop()
asyncio.create_task(self._cancelled.wait()), # turn.cancel() 内部
asyncio.create_task(cancel_evt.wait()), # session_manager.cancel_event
], return_when=asyncio.FIRST_COMPLETED)
# 段 2:flush SDK,500ms 兜底(防 SDK 慢拖累整体延迟)
await self._ensure_stopped(timeout=0.5)
session_manager 在 register 时给每个 session 建了一个 cancel_event: asyncio.Event,cancel() 时 set。VoiceTurn 通过 get_cancel_event(sid) 拿到引用并并行 await。
1.4.1 取消路径
ASR 段 1 中断不需要 CancelledError 注入——wait 是协作式的,cancel_event.set() 直接让它返回,比注入异常更轻。这是相比 LLM/Agent 模式的简化:那两种模式下 generator 卡在 HTTP/MCP/审批等多种 await 上,必须靠 CancelledError 强制打断。
1.4.2 stop-flush 阶段(段 2)的二次 cancel 检查
段 2 的 await self._ensure_stopped(timeout=0.5) 是阻塞 await,不监听任何 event。如果用户恰好在前端 1.3s 静音判定之后、SDK flush 之前这个最多 500ms 窗口内按了中断按钮,cancel_event 被 set 但 _run 看不到。
为此 _run 在 stop return 后再查一次 cancel:
await self._ensure_stopped(timeout=0.5) # 段 2
if self._cancelled.is_set() or self._is_session_cancelled():
await self._bus.push({type: "interrupted", ...})
return # 不进 handle_chat
如果不查,stop return 后 _run 会继续 push asr_result + 进 handle_chat;handle_chat 内部 is_cancelled 检查会立刻 push 一次 interrupted 退出,但前端已经先收到 asr_result(导致 partial 气泡 commit)→ 紧接收到 interrupted(resetSession discardPartialBubble 时气泡已 commit 不再是 partial)→ UI 残留一条不该有的 user 消息。二次检查避免这个 UI 不一致。
1.4.3 LLM/TTS 阶段:复用现有机制
VoiceTurn._run 在二次 cancel 检查通过后调 await self._handler.handle_chat(...)。从这一刻起,turn 进入 LLM/TTS 流,跟文本/Agent 模式完全一致:handle_chat 内部的 is_cancelled 轮询点 + llm_next_task.cancel() 机制接管。
也就是说 voice 模式有三段互补的中断机制:
| 阶段 | 阻塞在哪 | 中断机制 |
|---|---|---|
| 段 1(等 turn-end) | asyncio.wait([_user_stopped, _cancelled, cancel_evt]) |
cancel_event.set() 协作式唤醒 |
| 段 2(stop flush) | await asyncio.wait_for(to_thread(stop), timeout=0.5) |
不响应 event;stop 自然完成(≤500ms)后二次检查 _cancelled |
| LLM/TTS(handle_chat 内) | llm_next_task 等 stream / 工具 / 审批 |
llm_next_task.cancel() 注入 CancelledError |
三段都 push interrupted 给前端,前端 queue.js 看到后走 resetSession → discardPartialBubble(清未 commit 的 partial 用户气泡)+ restoreUI → voice 状态回 listening。
1.4.4 cancel 函数的幂等键
VoiceTurn.cancel() 用 _cancelled.is_set() 做幂等检查(早期版本错误地用 _stopped,因为 _stopped 也被 _run 的段 2 设为 True,会让用户首次按中断按钮被 cancel 函数当成"已 cancel"直接 return,丢失中断信号)。
cancel 函数本身只做四件事,不再 cancel _task,靠 events + 段 2 二次检查让 _run 自己优雅退出:
async def cancel(self):
if self._cancelled.is_set():
return
self._cancelled.set()
self._started.set() # 兜底唤醒排队的 feed
self._user_stopped.set() # 兜底唤醒段 1 的 wait
await self._ensure_stopped() # 幂等 stop SDK
不主动 _task.cancel() 的原因:CancelledError 会打断 _run 正在 await bus.push interrupted 的过程,导致前端收不到 interrupted(push_end 哨兵会先到 consumer)。让 _run 自己看到 events 后走中断分支推 interrupted 再 return,更可靠。
1.4.5 Bridge 跨线程的取消语义
VoiceTurn 的 Bridge 持有 dashscope SDK 后台线程 → 主 loop 的转发器。cancel 流程中:
cancel()/ 段 2 都通过_ensure_stopped()幂等调recognition.stop()(asyncio.to_thread)→ SDK 内部清理后台线程;_recognition_stopped标志防 cancel 与 _run 重复 stop- 后台线程在 stop 完成前可能还会触发
on_event→run_coroutine_threadsafe(bus.push, loop);段 2 的 to_thread 让出主 loop 时这些 dispatch 被 FIFO 跑完_accumulated += text(同步赋值在 await bus.push 之前),保证 stop return 时数据完整 - bus 已 closed 时 push 抛
ConnectionAbortedError→ 在 SDK 线程的 future 里被吞掉,无害 _startedEvent 在 cancel 时强制 set,让feed()中排队的await self._started.wait()能返回,再被_stopped拦掉,避免协程永久卡住
整套设计让 SDK 线程"自然消亡"而不是强制 kill,规避了第三方 SDK 内部状态损坏的风险。
2. Agent 模式中断:引入工具调用
2.1 Agent 模式与 LLM 模式的核心区别
Agent 模式使用 AgentClient 替代 LLMClient。两者都暴露 chat_stream() 接口,ChatHandler 不关心内部差异。但 AgentClient.chat_stream() 内部多了:
- 规划轮(第 0 轮):纯文本,判断是否需要工具
- 续写轮(
<no_tool>路径):纯文本,完整回答用户问题(不进入 Agent 循环) - Agent 循环(
<need_tool>路径,第 1 轮起):LLM 调用 → 工具执行 → 结果返回 → 再次 LLM 调用 → ... - 审批等待:工具执行前需要用户确认
- 子 Agent:
run_sub_agents工具会启动子进程
ChatHandler 的主循环代码完全不变——它只是从 llm_iterator 拿 LLMChunk。复杂度全部在 AgentClient.chat_stream() 这个 generator 内部。
2.2 AgentClient 内部的中断检查点
AgentClient.chat_stream() 在 8 个位置检查 is_cancelled:
# 检查点 1: 规划轮开始前
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 2: 规划轮流式接收中(每个 chunk)
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 3: <no_tool> 续写轮开始前(_run_no_tool_continuation 入口)
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 4: <no_tool> 续写轮流式接收中(每个 chunk)
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 5: Agent 循环入口(每轮开始)
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 6: Agent 循环流式接收中(每个 chunk)
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 7: 工具执行前(每个工具调用前)
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 8: 审批等待后(审批返回后检查是否被取消)
if session_id and await session_manager.is_cancelled(session_id):
return
这些是协作式检查:generator 主动查询标志然后 return,让 generator 优雅关闭。这与 CancelledError 的被动注入形成互补——如果 generator 正好卡在一个长时间 await(比如等待工具执行),CancelledError 能立即打断它。
2.3 Agent 模式中断的完整时序
tool_rejected 的 reason 取决于中断时工具所处阶段:approval_pending=True → "用户拒绝了此工具调用"(无 partial_result),否则(executing 阶段)→ "工具调用被中断"(附带 partial_result,从 bus.tool_output_cache 读取并剥离 _id 字段)。
前端 session guard:tool_rejected 处理器检查 ctx.currentSessionId !== data.session_id,若新会话已开始则丢弃。interrupted 处理器的 guard 条件为 ctx.currentSessionId && !ctx.interrupting && ctx.currentSessionId !== data.session_id——额外的 !ctx.interrupting 保证:中断等待期间(按钮禁用,不可能有新会话)的 interrupted 事件不被误跳过。
前端系统消息:三种中断场景各显示不同的系统提示:
| 触发场景 | 系统消息 | 显示位置 |
|---|---|---|
| 用户点击「中断」按钮 | 已中断对话 |
interrupted 处理器中(userInitiatedInterrupt 时)显示 |
| 用户点击「拒绝」按钮 | 已中断会话 |
approveTool(false) 中立即显示 |
| 审批超时(300s) | 工具审批超时,已自动拒绝调用并中断会话 |
interrupted 处理器中(!userInitiatedInterrupt 时)显示 |
用户主动操作(中断/拒绝)时,前端先设置 cancelledSessionId。后续到达的 interrupted 走消息过滤器的放行路径,userInitiatedInterrupt=true,不重复显示系统消息。
前端两阶段中断:用户点击中断后,前端进入「中断中…」中间状态(interrupting=true,按钮灰红禁用,阻止新消息发送),期间音频继续正常播放消费。直到后端返回 interrupted 事件后,才触发 resetSession()(停音频、清队列、保存播放中内容、flush、状态归零、恢复 UI)。设有 15 秒超时兜底。这保证了内部工具的部分结果缓存不会因为前端过早发送新消息而被丢弃。
语音模式下「中断中…」同步反映到 voice 状态文本(updateVoiceStateText('interrupting'),红色/禁用样式),并在 updateVoiceStateText 入口对 context.interrupting 做最高优先级覆盖,防止并发的 playingStarted / asrResult / sessionEnded 回调把状态重置回「对话进行中…」。
was_active 短路:/api/interrupt 响应新增 was_active 字段(来自 SessionManager.cancel() 的返回值,表示 session 是否还在 _sessions 里)。当后端 handler 已经结束、只剩前端音频队列在消费时(典型场景:LLM/工具已全部跑完,前端还在按序播 TTS 音频),后端不会再推 interrupted 事件,若没有这条短路就会卡满 15 秒超时。前端检测到 was_active === false && context.interrupting 时:清 _interruptTimeout → resetSession() → 显示「已中断对话」,立即收尾。
关键理解:与 LLM 模式相比,Agent 模式的 generator 可能卡在三种不同的 await 上。CancelledError 注入的位置不同,效果也不同:
| generator 卡在 | CancelledError 效果 |
|---|---|
async for chunk in stream |
HTTP 关闭,OpenAI 停止生成 |
await mcp_client.call_tool() |
客户端停止等待,MCP 服务端工具继续执行到完成 |
await session_manager.is_cancelled() |
generator 自己 return(协作式退出) |
3. 子 Agent 中断:引入子进程和回调链
3.1 系统启动:回调链注册
在理解子 Agent 取消之前,先看清系统启动时谁把谁连在了一起。
启动完成后,session_manager._cancel_callbacks 中有五个回调:
| 序号 | 回调 | 注册位置 | 作用 |
|---|---|---|---|
| 1 | approval_manager.cancel_session |
managers/__init__.py:13 |
唤醒所有卡在审批等待的协程 |
| 2 | tts_client._on_session_cancel |
tts_client_singleton.py:93 |
清零该 session 的 TTS 待处理计数,防止已中断会话的残留任务虚占队列深度导致新会话被误降级 |
| 3 | pm.cancel_group |
sub_agent/__init__.py:126 |
向所有子 Agent 子进程发 SIGTERM |
| 4 | cm.cancel_session |
internal_tools/__init__.py:28 |
向所有 run_command 前台/后台子进程发 SIGTERM |
| 5 | fm.cancel_session |
internal_tools/__init__.py:155 |
取消该 session 所有 pending 表单 future(ask_user_question 工具),触发 form_cancel 推送通知前端 dismiss |
注意:纯 LLM 模式中回调 3、4、5 不存在(因为未注册 register_sub_agent_tool / register_command_tools / register_form_tool)。回调 1 在 Agent 模式下才有意义(LLM 模式没有审批流程)。回调 2 始终存在(TTSClient 在初始化时注册)。
SandboxRegistry 故意不在此链上(详见 SANDBOX.md)。这五个回调对应的都是会话作用域资源(审批、TTS 任务计数、子进程、命令、表单),随 session 结束一起释放是正确的;而 Sandbox 是对话作用域资源,跨多个 session 存活——用户中断当前请求但下一轮期望浏览器登录态还在,所以 cancel 必须不触碰 sandbox。Sandbox 的销毁只发生于:conv_id 轮换 / LRU 驱逐 / 服务器 shutdown。
3.2 正常请求:协程与子进程启动链
以一次涉及子 Agent 的完整请求为例,标注每一层的协程/Task/进程关系:
3.3 协程/Task/进程层次总结
websocket_chat() [协程 - FastAPI ASGI 框架驱动,非我们创建的 Task]
│
├── bus.run(websocket) [Task A - asyncio.create_task, 独立消费队列推送 WebSocket]
│
├── ChatHandler.handle_chat() [协程 - websocket_chat 内 await,共享同一调用栈]
│ │
│ ├── llm_next_task [Task B - asyncio.create_task, 包装 generator.__anext__()]
│ │ └── AgentClient.chat_stream() [async generator - 被 Task B 驱动]
│ │ ├── await stream (LLM 流式)
│ │ └── await _execute_tool() → spawn_group()
│ │ ├── _monitor(proc0) [Task C0 - asyncio.create_task]
│ │ ├── _monitor(proc1) [Task C1 - asyncio.create_task]
│ │ └── await asyncio.wait(monitor_tasks)
│ │
│ └── approval_wait_task [Task D - asyncio.create_task, 等待审批 Event]
│
└── 子进程 (OS 级别,不在 asyncio 事件循环内)
└── python -m sub_agent.runner [子进程]
└── node @playwright/mcp [孙进程, start_new_session=True]
└── Chrome [曾孙进程]
Task 与协程的区别:Task(A/B/C/D)是用 asyncio.create_task() 创建的独立调度单元,有自己的调用栈,可以被单独 .cancel()。协程(websocket_chat、handle_chat)是通过 await 串在一起的,共享调用栈,不能单独取消。
关键理解:ChatHandler → llm_next_task → generator → _execute_tool → spawn_group → asyncio.wait 是一条连续的 await 链。ChatHandler 的主循环在等 llm_next_task 完成,而 llm_next_task 内部在等 spawn_group,spawn_group 在等子进程。整条链在同一个 asyncio 事件循环中,cancel 一个 Task 会沿链传播 CancelledError。
4. 子 Agent 取消:完整传递链
4.1 全局时序图
4.2 两条路径的关系
取消信号通过两条独立路径到达子进程,形成双保险。子 Agent(ProcessManager)和命令执行(CommandManager)使用完全相同的双路径模式:
| 路径 A(回调直接软终止) | 路径 B(CancelledError 传播) | |
|---|---|---|
| 触发时机 | cancel() 内立即执行 |
ChatHandler ~10ms 后轮询到 |
| 谁发信号 | cancel_group() / cm.cancel_session() |
_terminate_and_kill() |
| 强杀兜底 | 无 | 有(POSIX 10 秒后 SIGKILL;Windows 直接 taskkill /F) |
| 能打断 LLM 流式 | 不能(只管子进程) | 能(cancel Task 断开 HTTP 连接) |
| 能打断审批等待 | 不能(由回调1 负责) | 不需要(回调1 已处理) |
路径 A 更快,路径 B 更彻底。两条都走整树软终止(POSIX killpg / Windows taskkill /T),幂等,重复无害。
进程树整组杀的必要性:单纯 proc.terminate() 在 POSIX 上只杀 shell(/bin/sh -c),在 Windows 上只杀 cmd.exe,会留下 python/git/chrome 等真正干活的孙进程成孤儿。修复方案是在创建子进程时用 start_new_session=True(POSIX)让子进程成为新进程组组长,杀时用 os.killpg(pgid, signum) 整组杀;Windows 上用 taskkill /T /PID 按父子关系递归杀整棵树。
Windows 软阶段跳过:CLI 进程不响应 taskkill 不带 /F 时发的 WM_CLOSE,软阶段实际等于 no-op。_terminate_and_kill 在 Windows 上直接走 taskkill /F /T 强杀整树 + 5 秒短等待,避免浪费 10 秒。POSIX 保留两阶段 SIGTERM → wait → SIGKILL,给进程优雅退出机会。
CommandManager 与 ProcessManager 的取消对比:
| ProcessManager(子 Agent) | CommandManager(run_command) | |
|---|---|---|
| 路径 A 回调 | cancel_group(): 整树软终止 |
cancel_session(): 前台/后台进程树软终止 + cancel monitor_task |
| 路径 B 捕获点 | spawn_group() 的 except CancelledError |
前台: run_foreground() 的 except CancelledError |
后台: _monitor_output() 的 except CancelledError |
||
| 路径 B 清理 | cancel monitor tasks + _terminate_and_kill |
前台: drain reader tasks + _terminate_and_kill |
后台: _terminate_and_kill(仅目标 bg.proc) |
||
| 子进程退出方式 | 信号处理器设 _cancelled → 优雅退出 |
被 SIGTERM/SIGKILL 直接终止 |
| 后台进程 | 无(全部前台等待) | 有独立的 _bg_procs 字典管理,cancel 时遍历 |
前台 vs 后台 路径 B 的触发差异:前台的 CancelledError 是从 ChatHandler → llm_next_task → ... → run_foreground 的 await 链传播过来的(同一条 Task 的 cancel);后台 monitor 不在这条 await 链上,它的 CancelledError 来自 cancel_session 里主动 bg.monitor_task.cancel()(独立 Task 被定向取消)。两者最终都进入 except CancelledError 块走 _terminate_and_kill 兜底强杀,避免后台进程吞 SIGTERM 后变僵尸。
5. CancelledError 的产生与传播
5.1 CancelledError 从哪来?
无论是 LLM 模式还是 Agent 模式,CancelledError 的产生者都是同一个——ChatHandler:
# chat_handler.py, handle_chat 主循环 section 0
llm_next_task.cancel() # ← 这一行产生 CancelledError
asyncio.Task.cancel() 的原理:向 Task 当前挂起的 await 点注入一个 CancelledError 异常。就像在 Task 内部那个 await 的位置凭空 raise CancelledError() 一样。
5.2 CancelledError 注入到哪?
取决于 llm_next_task 此刻卡在哪个 await 上。
LLM 模式下,llm_next_task 包装的是 LLMClient.chat_stream().__anext__(),generator 内部只有一个 await:
async for chunk in stream ← CancelledError 注入点(唯一)
Agent 模式下,llm_next_task 包装的是 AgentClient.chat_stream().__anext__(),generator 内部可能卡在多个 await:
5.3 传播链详解(场景 C:子 Agent 执行中)
这是最复杂的场景。关键点:CancelledError 不是直接打到 readline() 上的。monitor_tasks 是独立的 Task(Task C),不在 llm_next_task(Task B)的调用栈里。CancelledError 只能沿 Task B 自己的 await 链传播,打中 asyncio.wait(),然后由 spawn_group 手动发起第二波 cancel 给 monitor tasks。
为什么不是直接打在 readline() 上? 因为 asyncio.Task.cancel() 只能注入到该 Task 自己当前挂起的 await 点。Task B 的 await 链是:
Task B (llm_next_task)
→ __anext__()
→ chat_stream() generator
→ _execute_tool()
→ spawn_group()
→ asyncio.wait() ← Task B 的最深 await 点,CancelledError 打这里
而 Task C(monitor)有自己的独立调用栈:
Task C0 (_monitor)
→ readline() ← Task C0 的 await 点,Task B 的 cancel 打不到这里
spawn_group 的 except CancelledError 块就是中转站——接住 Task B 的取消,翻译成对 Task C 的取消,再 raise 继续往上抛。
5.4 场景 A:LLM 流式接收中(LLM 模式和 Agent 模式通用)
llm_next_task.cancel()
│
▼
LLMClient/AgentClient.chat_stream()
│ async for chunk in stream: # ← CancelledError 注入点
│ # HTTP 连接被关闭
│ # OpenAI 服务端检测到断连,停止生成
│
│ CancelledError 向上传播
▼
ChatHandler: except CancelledError: pass
效果:OpenAI 停止生成 token,节省费用。这在 LLM 模式和 Agent 模式中效果完全一致。
5.5 如果此时没有子 Agent 在运行?
那 spawn_group 不在调用栈中,CancelledError 只需要穿过 async for chunk in stream 或 await mcp_client.call_tool(),传播链更短,行为更简单。
6. 子进程的取消响应
子进程不参与 asyncio 事件循环,也不会收到 CancelledError。它通过 OS 信号(SIGTERM) 接收取消通知。
6.1 信号处理
# sub_agent/agent_loop.py:45-58
_cancelled = False
def _sigterm_handler(signum, frame):
global _cancelled
_cancelled = True # 只设标志,不立即退出
signal.signal(signal.SIGTERM, _sigterm_handler)
6.2 检查点
# agent_loop.py
for step in range(max_steps):
if _cancelled: # 检查点 1:轮次入口
return _result("cancelled")
# ... LLM 调用 ...
# ... 工具执行(期间不检查)...
if _cancelled: # 检查点 2:工具执行后
return _result("cancelled")
关键设计:工具执行期间不中断。MCP 工具(如 browser_navigate)可能有副作用,中途打断可能导致状态不一致。等当前工具跑完再退出。
6.3 退出清理
# runner.py:201-208
for client in mcp_clients:
await client.close() # keep_alive=False → 触发完整清理
# → disconnect() → killpg() → Chrome 关闭
print(f"{label} 完成")
# asyncio.run(main()) 返回,子进程退出
7. 流程图总结
7.1 从简单到复杂:三种中断模式对比
LLM 模式(最简单)
──────────────────
cancel() → 标志 → ChatHandler 轮询 → llm_next_task.cancel()
→ CancelledError → HTTP 关闭
只涉及: asyncio 事件循环内的一条 await 链
Agent 模式(无子 Agent)
──────────────────────
cancel() → 标志 + 回调1(审批)
→ ChatHandler 轮询 → llm_next_task.cancel()
→ CancelledError → HTTP 关闭 / MCP 打断
多了: 审批回调 + 工具执行打断
Agent 模式(有子 Agent)
──────────────────────
cancel() → 标志 + 回调1(审批) + 回调3(PM SIGTERM) ← 路径 A
→ ChatHandler 轮询 → llm_next_task.cancel()
→ CancelledError → spawn_group 捕获
→ _terminate_and_kill (SIGTERM+SIGKILL) ← 路径 B
→ 子进程 _cancelled 标志 → 优雅退出
多了: 跨进程 SIGTERM + 双路径保险
Agent 模式(run_command 前台执行中)
──────────────────────────────────
cancel() → 标志 + 回调4(CM SIGTERM) ← 路径 A
→ ChatHandler 轮询 → llm_next_task.cancel()
→ CancelledError → run_foreground 捕获
→ _terminate_and_kill (SIGTERM+SIGKILL) ← 路径 B
与子 Agent 相同的双路径模式,但子进程无信号处理器,直接被终止
7.2 取消信号传播全景
7.3 各层的取消机制对比
| 层级 | 取消机制 | 取消粒度 | 延迟 |
|---|---|---|---|
| SessionManager | 布尔标志 cancelled=True + cancel_event.set() |
即时设置 | 0ms |
| ApprovalManager | event.set() 唤醒 |
即时唤醒 | 0ms |
| TTSClient | _pending_counts.pop() 清零计数 |
即时清零 | 0ms |
| VoiceTurn (ASR 阶段) | cancel_event.wait() 协作式唤醒 → push interrupted → return |
即时唤醒 | 0ms |
| ProcessManager (路径A) | 整树软终止(killpg / taskkill /T) |
fire-and-forget | 0ms |
| CommandManager (路径A) | 整树软终止,前台+后台(killpg / taskkill /T) |
fire-and-forget | 0ms |
| ChatHandler | 轮询 is_cancelled |
每 10ms 一次 | 0~10ms |
| LLMClient/AgentClient | llm_next_task.cancel() → CancelledError |
打断当前 await | 0ms |
| ProcessManager (路径B) | _terminate_and_kill() 整树 |
POSIX: SIGTERM→10s→SIGKILL;Windows: 直接 taskkill /F /T+5s |
POSIX 0 |
| CommandManager (路径B) | _terminate_and_kill() 整树(前台 run_foreground + 后台 _monitor_output 都覆盖) |
POSIX: SIGTERM→10s→SIGKILL;Windows: 直接 taskkill /F /T+5s |
POSIX 0 |
| 子 Agent 进程 agent_loop | _cancelled 标志 |
当前工具执行完后 | 0~30s |
| Chrome | os.killpg() 进程组终止(runner 退出时由 MCPClient 触发) |
即时 | 0ms |
7.4 为什么不用 CancelledError 一路到底?
因为 CancelledError 只在 asyncio 事件循环内有效,跨不了进程边界。子进程有自己的事件循环,主进程的 CancelledError 传不进去。所以:
- 主进程内部:CancelledError(asyncio 原生机制)
- 跨进程:SIGTERM(OS 原生机制)
- 子进程内部:布尔标志轮询(最简单可靠)
三种机制,各管一段,拼成完整的取消链路。
run_command 的子进程(如 pip install、python script.py)不像子 Agent 那样有信号处理器,收到 SIGTERM 后直接退出。如果 SIGTERM 无效(进程卡死),路径 B 的 _terminate_and_kill 在 POSIX 上 10 秒后发 SIGKILL 强制终止;Windows 上 CLI 进程对 taskkill 不带 /F 时发的 WM_CLOSE 不响应,软阶段 no-op,因此直接走 taskkill /F /T 整树强杀 + 5 秒短等待。
整树杀的实现:主进程在 spawn 子进程时,POSIX 下用 start_new_session=True 让子进程成为新进程组组长,杀时 os.killpg(pgid, signum) 一次终止整组(含孙、曾孙);Windows 下不需要特殊 flag,taskkill /T /PID 利用系统记录的父子关系递归杀整棵树。这保证了 /bin/sh -c <cmd> 下的实际命令、cmd.exe /c <cmd> 下的 python.exe、runner 下的 node + chrome 都能被一并清理,不留孤儿。
8. 内部工具部分结果缓存(tool_output_cache)
内部工具(run_sub_agents、run_command)执行时间较长,中断时可能已有部分结果。SessionMessageBus.tool_output_cache 用于实时缓存工具的中间输出,中断时由 ChatHandler 读取并附加到 tool_rejected 消息的 partial_result 字段。
8.1 缓存结构
单值缓存(dict 或 None),内嵌 _id 字段用于写入校验:
# None:无缓存(初始状态 / executing 入口清空后)
bus.tool_output_cache = None
# run_sub_agents 缓存(预填 pending 占位,监控期间滚动刷新)
bus.tool_output_cache = {
"_id": group_id,
"total": N,
"completed": 0, "cancelled": 0, "failed": 0,
"results": [
{"agent_id": "sub_0", "uuid": "...", "status": "pending|running|completed|cancelled|error",
"result": "<已收集进展或终态结果>", "steps": N},
...
],
}
# run_command 缓存
bus.tool_output_cache = {"_id": command_id, "output": "..."}
8.2 生命周期
executing 状态入口 → cache = None(清空上一轮残留)
↓
工具 handler 启动 → cache = {"_id": xxx, ...}(在首个 await 之前初始化)
↓
工具执行中 → 协程实时追加数据(写入前校验 cache._id == my_id)
↓
(run_sub_agents 专属)轮边界 step_snapshot → _monitor 同步写入 results[i].result
(受 _TERMINAL_STATUSES 守卫保护,stdout 已 replace 终态后不回滚;不推送 bus)
↓
中断 → ChatHandler 读取 cache,剥离 _id,附加到 tool_rejected.partial_result
或
正常完成 → 下一个 executing 入口清空 cache
run_sub_agents 的 SIGKILL 兜底:子 Agent 子进程被强杀(POSIX 10s grace 到点 / Windows 直接 taskkill /F)时 stdout 终态 JSON 来不及写出,常规路径下 cache 会停留在 running 且 result=""。agent_loop 每轮工具批执行完后向 stderr emit step_snapshot 事件(载荷为 _collect_progress(messages),拼接所有 assistant content),父端 _monitor 把这份快照同步写入 cache.results[i].result。强杀发生时,partial_result 拿到的是上一轮工具执行完时的进展,不再为空。详见 SUB_AGENT_DESIGN.md §4.2。
8.3 _id 校验机制
asyncio 单线程模型中,同步 dict 操作在两个 await 之间是原子的。但取消后过期协程可能在新工具开始执行后才完成最后的写入。_id 校验防止这种情况:
# 写入侧(ProcessManager / CommandManager)
cache = bus.tool_output_cache
if cache is not None and cache.get("_id") == my_id:
cache["results"].append(result) # 校验通过才写入
# 读取侧(ChatHandler._build_tool_rejected_payload,同步函数)
# 同步 staticmethod,保证 cache 读取 → _id 剥离 在同一事件循环帧内完成
if not approval_pending:
cache = bus.tool_output_cache
if cache:
payload["partial_result"] = {k: v for k, v in cache.items() if k != "_id"}
8.4 前端处理
前端收到含 partial_result 的 tool_rejected 后,在 flushHistorySegments() 中将部分结果拼接在 tool 消息内容前面:
{partial_result JSON}...
工具调用被中断
这样下一轮对话时 LLM 能看到工具的部分执行结果,避免完全丢失已完成的工作。
评论区