本文从最简单的 LLM 纯对话模式入手,逐步引入 Agent 工具调用和子 Agent 进程管理,完整解析系统中断(Cancel)的实现原理。


1. LLM 模式中断:最简单的情况

1.1 LLM 模式的流式调用

在 LLM 模式下,系统只有两个核心组件参与:

  • ChatHandler:主循环,负责驱动 LLM 流式输出和推送消息
  • LLMClient:对 OpenAI API 的薄封装,chat_stream() 是一个 async generator

请求处理流程很直白:

前端 → WebSocket → ChatHandler.handle_chat()
                        ↓
                   llm_iterator = llm_client.chat_stream(messages, session_id)
                        ↓
                   主循环:每 10ms 一轮
                   ├── llm_next_task = create_task(llm_iterator.__anext__())
                   ├── 等待 llm_next_task 完成 → 拿到 LLMChunk
                   ├── 提交 TTS / 推送文本
                   └── 按序推送结果到前端

整个过程只有一条 await 链,没有子进程,没有工具调用。

1.2 LLM 模式的中断路径

当用户点击中断按钮:

sequenceDiagram participant FE as 前端 participant API as POST /api/interrupt participant SM as SessionManager participant CH as ChatHandler<br/>主循环 participant TASK as llm_next_task<br/>[asyncio.Task] participant GEN as LLMClient<br/>.chat_stream() participant LLM as OpenAI API FE->>API: {session_id: "abc"} API->>SM: await cancel("abc") SM->>SM: sessions["abc"]["cancelled"] = True API-->>FE: 200 OK Note over CH: ~10ms 后,主循环轮询到 CH->>SM: await is_cancelled("abc") SM-->>CH: True CH->>TASK: llm_next_task.cancel() Note over TASK: CancelledError 被注入到 Task<br/>当前 await 点 TASK->>GEN: CancelledError GEN->>LLM: HTTP 连接被关闭 Note over LLM: 服务端检测到断连<br/>停止生成 token TASK-->>CH: CancelledError Note over CH: except CancelledError: pass CH->>FE: bus.push({type: "interrupted"}) CH->>CH: break 退出主循环

整个 LLM 模式的中断就三步

步骤 动作 效果
1 cancel() 设置布尔标志 标记会话为已中断
2 ChatHandler 轮询 is_cancelledllm_next_task.cancel() 注入 CancelledError 到 generator 的当前 await 点
3 CancelledError 传播到 async for chunk in stream HTTP 连接关闭,OpenAI 停止生成,节省 token

这里有两个关键点需要理解:

CancelledError 从哪来? 不是 SessionManager 产生的。是 ChatHandler 调用 llm_next_task.cancel() 产生的。asyncio.Task.cancel() 的原理是:向 Task 当前挂起的 await 点注入一个 CancelledError,就像在那个 await 位置凭空 raise CancelledError() 一样。

为什么能省 token? LLMClient 的 chat_stream() 内部是 async for chunk in stream,CancelledError 会打断这个 await,底层的 HTTP 连接随之关闭。OpenAI 服务端检测到客户端断连后会停止生成 token,不再计费。

1.3 LLM 模式中断的协程层次

websocket_chat() [协程 - FastAPI ASGI 框架驱动]
├── bus.run(websocket) [Task A - 独立消费队列推送 WebSocket]
└── ChatHandler.handle_chat() [协程 - websocket_chat 内 await]
    └── llm_next_task [Task B - 包装 generator.__anext__()]
        └── LLMClient.chat_stream() [async generator]
            └── async for chunk in stream (← CancelledError 注入点)
                └── HTTP 连接 → OpenAI API

中断时 CancelledError 的传播:llm_next_task.cancel()async for chunk in stream → HTTP 断开 → Task 状态变为 CANCELLED → ChatHandler 捕获 except CancelledError: pass → 推送 interrupted → break。

非常简洁。但当引入工具调用后,事情变得复杂。


1.4 Voice 模式中断:ASR 阶段的特殊性

文本对话 / Agent 模式的 cancel 都从 ChatHandler 主循环出发——它在 await llm_next_task 上轮询 is_cancelled 然后注入 CancelledError。但语音模式多了一个 ASR 阶段,发生在 ChatHandler.handle_chat 被调用之前:用户开口 → 前端持续推 PCM 帧 → FunASR 服务端按 max_sentence_silence=500ms 切句、累积多个 _accumulated → 前端 1.3s 静音判定 turn-end 发 voice_stop → 后端 _ensure_stopped(timeout=0.5) flush 剩余 sentence_end → 这时才把累积全文 append 到 messages 调 handle_chat。

整个 ASR 阶段由 VoiceTurn._run(独立 task)跑,分两段阻塞:

# 段 1:等 turn-end(前端 voice_stop) / cancel
done, pending = await asyncio.wait([
    asyncio.create_task(self._user_stopped.wait()),   # 前端 voice_stop → on_user_stop()
    asyncio.create_task(self._cancelled.wait()),       # turn.cancel() 内部
    asyncio.create_task(cancel_evt.wait()),            # session_manager.cancel_event
], return_when=asyncio.FIRST_COMPLETED)

# 段 2:flush SDK,500ms 兜底(防 SDK 慢拖累整体延迟)
await self._ensure_stopped(timeout=0.5)

session_managerregister 时给每个 session 建了一个 cancel_event: asyncio.Eventcancel() 时 set。VoiceTurn 通过 get_cancel_event(sid) 拿到引用并并行 await。

1.4.1 取消路径

sequenceDiagram participant FE as 前端 participant API as POST /api/interrupt participant SM as SessionManager participant VT as VoiceTurn._run<br/>[Task] participant ASR as FunASR SDK participant Bus as MessageBus Note over VT: 段 1:await asyncio.wait([_user_stopped, _cancelled, cancel_evt])<br/>阻塞,等任一事件 FE->>API: {session_id} API->>SM: cancel(session_id) SM->>SM: sessions[id]['cancelled'] = True<br/>sessions[id]['cancel_event'].set() SM->>SM: 释放 conv_id 槽位 Note over SM: 同步触发 5 个 on_cancel 回调(详见第 3 章) SM-->>VT: cancel_event.set() 唤醒 wait VT->>VT: cancel_evt.is_set() → True 走中断分支 VT->>Bus: push({type:interrupted, session_id}) VT-->>VT: _run 退出(return) Note over VT: 协调器 finally:turn.cancel() → unregister → push_end → unregister_bus VT->>ASR: recognition.stop() (asyncio.to_thread)

ASR 段 1 中断不需要 CancelledError 注入——wait 是协作式的,cancel_event.set() 直接让它返回,比注入异常更轻。这是相比 LLM/Agent 模式的简化:那两种模式下 generator 卡在 HTTP/MCP/审批等多种 await 上,必须靠 CancelledError 强制打断。

1.4.2 stop-flush 阶段(段 2)的二次 cancel 检查

段 2 的 await self._ensure_stopped(timeout=0.5) 是阻塞 await,不监听任何 event。如果用户恰好在前端 1.3s 静音判定之后、SDK flush 之前这个最多 500ms 窗口内按了中断按钮,cancel_event 被 set 但 _run 看不到。

为此 _run 在 stop return 后再查一次 cancel:

await self._ensure_stopped(timeout=0.5)   # 段 2
if self._cancelled.is_set() or self._is_session_cancelled():
    await self._bus.push({type: "interrupted", ...})
    return                                 # 不进 handle_chat

如果不查,stop return 后 _run 会继续 push asr_result + 进 handle_chat;handle_chat 内部 is_cancelled 检查会立刻 push 一次 interrupted 退出,但前端已经先收到 asr_result(导致 partial 气泡 commit)→ 紧接收到 interrupted(resetSession discardPartialBubble 时气泡已 commit 不再是 partial)→ UI 残留一条不该有的 user 消息。二次检查避免这个 UI 不一致。

1.4.3 LLM/TTS 阶段:复用现有机制

VoiceTurn._run 在二次 cancel 检查通过后调 await self._handler.handle_chat(...)。从这一刻起,turn 进入 LLM/TTS 流,跟文本/Agent 模式完全一致:handle_chat 内部的 is_cancelled 轮询点 + llm_next_task.cancel() 机制接管。

也就是说 voice 模式有三段互补的中断机制

阶段 阻塞在哪 中断机制
段 1(等 turn-end) asyncio.wait([_user_stopped, _cancelled, cancel_evt]) cancel_event.set() 协作式唤醒
段 2(stop flush) await asyncio.wait_for(to_thread(stop), timeout=0.5) 不响应 event;stop 自然完成(≤500ms)后二次检查 _cancelled
LLM/TTS(handle_chat 内) llm_next_task 等 stream / 工具 / 审批 llm_next_task.cancel() 注入 CancelledError

三段都 push interrupted 给前端,前端 queue.js 看到后走 resetSessiondiscardPartialBubble(清未 commit 的 partial 用户气泡)+ restoreUI → voice 状态回 listening。

1.4.4 cancel 函数的幂等键

VoiceTurn.cancel()_cancelled.is_set() 做幂等检查(早期版本错误地用 _stopped,因为 _stopped 也被 _run 的段 2 设为 True,会让用户首次按中断按钮被 cancel 函数当成"已 cancel"直接 return,丢失中断信号)。

cancel 函数本身只做四件事,不再 cancel _task,靠 events + 段 2 二次检查让 _run 自己优雅退出:

async def cancel(self):
    if self._cancelled.is_set():
        return
    self._cancelled.set()
    self._started.set()        # 兜底唤醒排队的 feed
    self._user_stopped.set()   # 兜底唤醒段 1 的 wait
    await self._ensure_stopped()  # 幂等 stop SDK

不主动 _task.cancel() 的原因:CancelledError 会打断 _run 正在 await bus.push interrupted 的过程,导致前端收不到 interrupted(push_end 哨兵会先到 consumer)。让 _run 自己看到 events 后走中断分支推 interrupted 再 return,更可靠。

1.4.5 Bridge 跨线程的取消语义

VoiceTurnBridge 持有 dashscope SDK 后台线程 → 主 loop 的转发器。cancel 流程中:

  • cancel() / 段 2 都通过 _ensure_stopped() 幂等调 recognition.stop()asyncio.to_thread)→ SDK 内部清理后台线程;_recognition_stopped 标志防 cancel 与 _run 重复 stop
  • 后台线程在 stop 完成前可能还会触发 on_eventrun_coroutine_threadsafe(bus.push, loop);段 2 的 to_thread 让出主 loop 时这些 dispatch 被 FIFO 跑完 _accumulated += text(同步赋值在 await bus.push 之前),保证 stop return 时数据完整
  • bus 已 closed 时 push 抛 ConnectionAbortedError → 在 SDK 线程的 future 里被吞掉,无害
  • _started Event 在 cancel 时强制 set,让 feed() 中排队的 await self._started.wait() 能返回,再被 _stopped 拦掉,避免协程永久卡住

整套设计让 SDK 线程"自然消亡"而不是强制 kill,规避了第三方 SDK 内部状态损坏的风险。


2. Agent 模式中断:引入工具调用

2.1 Agent 模式与 LLM 模式的核心区别

Agent 模式使用 AgentClient 替代 LLMClient。两者都暴露 chat_stream() 接口,ChatHandler 不关心内部差异。但 AgentClient.chat_stream() 内部多了:

  1. 规划轮(第 0 轮):纯文本,判断是否需要工具
  2. 续写轮<no_tool> 路径):纯文本,完整回答用户问题(不进入 Agent 循环)
  3. Agent 循环<need_tool> 路径,第 1 轮起):LLM 调用 → 工具执行 → 结果返回 → 再次 LLM 调用 → ...
  4. 审批等待:工具执行前需要用户确认
  5. 子 Agentrun_sub_agents 工具会启动子进程

ChatHandler 的主循环代码完全不变——它只是从 llm_iteratorLLMChunk。复杂度全部在 AgentClient.chat_stream() 这个 generator 内部。

2.2 AgentClient 内部的中断检查点

AgentClient.chat_stream() 在 8 个位置检查 is_cancelled

# 检查点 1: 规划轮开始前
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 2: 规划轮流式接收中(每个 chunk)
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 3: <no_tool> 续写轮开始前(_run_no_tool_continuation 入口)
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 4: <no_tool> 续写轮流式接收中(每个 chunk)
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 5: Agent 循环入口(每轮开始)
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 6: Agent 循环流式接收中(每个 chunk)
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 7: 工具执行前(每个工具调用前)
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 8: 审批等待后(审批返回后检查是否被取消)
if session_id and await session_manager.is_cancelled(session_id):
    return

这些是协作式检查:generator 主动查询标志然后 return,让 generator 优雅关闭。这与 CancelledError 的被动注入形成互补——如果 generator 正好卡在一个长时间 await(比如等待工具执行),CancelledError 能立即打断它。

2.3 Agent 模式中断的完整时序

sequenceDiagram participant FE as 前端 participant API as POST /api/interrupt participant SM as SessionManager participant AM as ApprovalManager participant CH as ChatHandler participant TASK as llm_next_task participant GEN as AgentClient<br/>.chat_stream() participant MCP as MCP 工具 FE->>FE: 进入「中断中…」状态(interrupting=true,按钮灰红禁用)<br/>voice 模式下状态文本切到「中断中…」 FE->>API: {session_id: "abc"} Note over FE: 15 秒超时兜底;期间音频继续正常播放消费 API->>SM: was_active = await cancel("abc") rect rgb(255, 235, 235) Note over SM: 设标志 + 回调;was_active 反映会话是否还在 _sessions 中 SM->>SM: existed = session_id in _sessions<br/>cancelled = True(若 existed) SM->>AM: cancel_session("abc") Note over AM: 唤醒审批等待 event.set() end API-->>FE: 200 OK {was_active} alt was_active == false (后端 handler 已结束,仅前端还在播音频) Note over FE: 不会再收到 interrupted 事件<br/>清 15s timeout,立即 resetSession()<br/>显示「已中断对话」 else was_active == true (后端会话仍在跑) Note over FE: 等待后端推 interrupted 事件后再 resetSession end rect rgb(235, 235, 255) Note over CH: ~10ms 后轮询到 CH->>SM: is_cancelled → True CH->>TASK: llm_next_task.cancel() end alt 场景 A: generator 在等 LLM 流式 TASK->>GEN: CancelledError GEN->>GEN: async for chunk in stream 被打断 Note over GEN: HTTP 关闭,省 token else 场景 B: generator 在等 MCP 工具 TASK->>GEN: CancelledError GEN->>MCP: await call_tool 被打断 Note over MCP: 客户端不再等待结果<br/>MCP 服务端工具继续执行到完成 else 场景 C: generator 在等审批 Note over AM: 回调已设 approved=False Note over GEN: generator 检查到 is_cancelled → return end TASK-->>CH: CancelledError / StopAsyncIteration alt 有 active_tool_info(审批或执行阶段) CH->>CH: 读取 bus.tool_output_cache 作为 partial_result(执行阶段,剥离 _id) CH->>FE: bus.push({type: "tool_rejected",<br/>reason: "用户拒绝了此工具调用" 或 "工具调用被中断",<br/>partial_result: ...}) Note over FE: session guard:若新会话已开始<br/>(currentSessionId ≠ data.session_id)→ 丢弃<br/>否则写入 historySegments(含 partial_result) end CH->>FE: bus.push({type: "interrupted"}) Note over FE: session guard:若新会话已开始 → 跳过 resetSession FE->>FE: resetSession():停音频、清队列、<br/>保存播放中内容、flush + 状态归零 + 恢复 UI FE->>FE: 显示系统消息

tool_rejected 的 reason 取决于中断时工具所处阶段approval_pending=True"用户拒绝了此工具调用"(无 partial_result),否则(executing 阶段)→ "工具调用被中断"(附带 partial_result,从 bus.tool_output_cache 读取并剥离 _id 字段)。

前端 session guardtool_rejected 处理器检查 ctx.currentSessionId !== data.session_id,若新会话已开始则丢弃。interrupted 处理器的 guard 条件为 ctx.currentSessionId && !ctx.interrupting && ctx.currentSessionId !== data.session_id——额外的 !ctx.interrupting 保证:中断等待期间(按钮禁用,不可能有新会话)的 interrupted 事件不被误跳过。

前端系统消息:三种中断场景各显示不同的系统提示:

触发场景 系统消息 显示位置
用户点击「中断」按钮 已中断对话 interrupted 处理器中(userInitiatedInterrupt 时)显示
用户点击「拒绝」按钮 已中断会话 approveTool(false) 中立即显示
审批超时(300s) 工具审批超时,已自动拒绝调用并中断会话 interrupted 处理器中(!userInitiatedInterrupt 时)显示

用户主动操作(中断/拒绝)时,前端先设置 cancelledSessionId。后续到达的 interrupted 走消息过滤器的放行路径,userInitiatedInterrupt=true,不重复显示系统消息。

前端两阶段中断:用户点击中断后,前端进入「中断中…」中间状态(interrupting=true,按钮灰红禁用,阻止新消息发送),期间音频继续正常播放消费。直到后端返回 interrupted 事件后,才触发 resetSession()(停音频、清队列、保存播放中内容、flush、状态归零、恢复 UI)。设有 15 秒超时兜底。这保证了内部工具的部分结果缓存不会因为前端过早发送新消息而被丢弃。

语音模式下「中断中…」同步反映到 voice 状态文本(updateVoiceStateText('interrupting'),红色/禁用样式),并在 updateVoiceStateText 入口对 context.interrupting 做最高优先级覆盖,防止并发的 playingStarted / asrResult / sessionEnded 回调把状态重置回「对话进行中…」。

was_active 短路/api/interrupt 响应新增 was_active 字段(来自 SessionManager.cancel() 的返回值,表示 session 是否还在 _sessions 里)。当后端 handler 已经结束、只剩前端音频队列在消费时(典型场景:LLM/工具已全部跑完,前端还在按序播 TTS 音频),后端不会再推 interrupted 事件,若没有这条短路就会卡满 15 秒超时。前端检测到 was_active === false && context.interrupting 时:清 _interruptTimeoutresetSession() → 显示「已中断对话」,立即收尾。

关键理解:与 LLM 模式相比,Agent 模式的 generator 可能卡在三种不同的 await 上。CancelledError 注入的位置不同,效果也不同:

generator 卡在 CancelledError 效果
async for chunk in stream HTTP 关闭,OpenAI 停止生成
await mcp_client.call_tool() 客户端停止等待,MCP 服务端工具继续执行到完成
await session_manager.is_cancelled() generator 自己 return(协作式退出)

3. 子 Agent 中断:引入子进程和回调链

3.1 系统启动:回调链注册

在理解子 Agent 取消之前,先看清系统启动时谁把谁连在了一起。

sequenceDiagram participant Init as managers/__init__.py participant SM as SessionManager participant AM as ApprovalManager participant TTS as TTSClient participant LS as websocket_server<br/>lifespan() participant AC as AgentClient participant REG as sub_agent/__init__.py<br/>register_sub_agent_tool() participant PM as ProcessManager participant CM_REG as internal_tools/__init__.py<br/>register_command_tools() participant CM as CommandManager Note over Init: 包导入时执行(全局单例) Init->>SM: SessionManager() Init->>AM: ApprovalManager() Init->>SM: on_cancel(approval_manager.cancel_session) Note over SM: _cancel_callbacks = [AM.cancel_session] Note over TTS: TTSClient.__init__() 单例初始化 TTS->>SM: on_cancel(tts_client._on_session_cancel) Note over SM: _cancel_callbacks = [AM.cancel_session, TTS._on_session_cancel] Note over LS: 服务启动时执行 LS->>AC: AgentClient(llm_config, agent_config) LS->>AC: await connect_mcp_servers() LS->>REG: register_sub_agent_tool(agent_client, session_manager) REG->>PM: ProcessManager(session_manager) REG->>SM: on_cancel(pm.cancel_group) Note over SM: _cancel_callbacks = [AM, TTS, PM.cancel_group] REG->>AC: register_internal_tool("run_sub_agents", handler) LS->>CM_REG: register_command_tools(agent_client, session_manager) CM_REG->>CM: CommandManager(session_manager) CM_REG->>SM: on_cancel(cm.cancel_session) Note over SM: _cancel_callbacks = [AM, TTS, PM, CM.cancel_session] CM_REG->>AC: register_internal_tool("run_command", handler) CM_REG->>AC: register_internal_tool("check_command", handler) LS->>FM_REG: register_form_tool(agent_client, session_manager) FM_REG->>FM: FormManager(session_manager) FM_REG->>SM: on_cancel(fm.cancel_session) Note over SM: _cancel_callbacks = [AM, TTS, PM, CM, FM.cancel_session] FM_REG->>AC: register_internal_tool("ask_user_question", handler)

启动完成后,session_manager._cancel_callbacks 中有五个回调:

序号 回调 注册位置 作用
1 approval_manager.cancel_session managers/__init__.py:13 唤醒所有卡在审批等待的协程
2 tts_client._on_session_cancel tts_client_singleton.py:93 清零该 session 的 TTS 待处理计数,防止已中断会话的残留任务虚占队列深度导致新会话被误降级
3 pm.cancel_group sub_agent/__init__.py:126 向所有子 Agent 子进程发 SIGTERM
4 cm.cancel_session internal_tools/__init__.py:28 向所有 run_command 前台/后台子进程发 SIGTERM
5 fm.cancel_session internal_tools/__init__.py:155 取消该 session 所有 pending 表单 future(ask_user_question 工具),触发 form_cancel 推送通知前端 dismiss

注意:纯 LLM 模式中回调 3、4、5 不存在(因为未注册 register_sub_agent_tool / register_command_tools / register_form_tool)。回调 1 在 Agent 模式下才有意义(LLM 模式没有审批流程)。回调 2 始终存在(TTSClient 在初始化时注册)。

SandboxRegistry 故意不在此链上(详见 SANDBOX.md)。这五个回调对应的都是会话作用域资源(审批、TTS 任务计数、子进程、命令、表单),随 session 结束一起释放是正确的;而 Sandbox 是对话作用域资源,跨多个 session 存活——用户中断当前请求但下一轮期望浏览器登录态还在,所以 cancel 必须不触碰 sandbox。Sandbox 的销毁只发生于:conv_id 轮换 / LRU 驱逐 / 服务器 shutdown。

3.2 正常请求:协程与子进程启动链

以一次涉及子 Agent 的完整请求为例,标注每一层的协程/Task/进程关系:

sequenceDiagram participant FE as 前端 participant WS as websocket_chat()<br/>[协程 - ASGI 驱动] participant BUS as SessionMessageBus<br/>[Task A: bus.run] participant CH as ChatHandler.handle_chat()<br/>[协程 - WS 内 await] participant GEN as AgentClient.chat_stream()<br/>[async generator] participant LLM as OpenAI API participant PM as ProcessManager.spawn_group()<br/>[协程 - generator 内 await] participant MON as _monitor()<br/>[Task C: per sub-process] participant SUB as python -m sub_agent.runner<br/>[子进程, PID=xxx] participant NODE as node playwright-mcp<br/>[孙进程, PID=yyy] FE->>WS: WebSocket 消息 {session_id, messages} Note over WS: session_manager.register(session_id) WS->>BUS: asyncio.create_task(bus.run(websocket)) Note over BUS: Task A:从 bus 队列读取消息,推送到 WebSocket WS->>CH: await handle_text_chat(session_id, messages) CH->>GEN: llm_iterator = agent_client.chat_stream(messages, session_id) Note over CH: 主循环开始 loop ChatHandler 主循环 (每 10ms 一轮) CH->>CH: llm_next_task [Task B] = create_task(llm_iterator.__anext__()) Note over CH: 检查 is_cancelled → False,继续 Note over GEN: generator 被 __anext__() 驱动执行 GEN->>LLM: await stream = client.chat.completions.create(stream=True) LLM-->>GEN: chunk (tool_call: run_sub_agents) GEN-->>CH: yield LLMChunk(tool_call: start) GEN-->>CH: yield LLMChunk(tool_call: approval_required) Note over CH: 等待用户审批... FE->>CH: POST /api/tool-approve {approved: true} GEN-->>CH: yield LLMChunk(tool_call: executing) Note over GEN: 审批通过,执行工具 GEN->>PM: await _execute_tool("run_sub_agents", args) PM->>SUB: asyncio.create_subprocess_exec("python", "-m", "sub_agent.runner", spec) PM->>MON: asyncio.create_task(_monitor(proc)) Note over MON: Task C:逐行读取 sub 进程 stderr Note over SUB: 子进程启动 SUB->>NODE: FastMCP StdioTransport 启动 node playwright-mcp NODE->>NODE: 启动 Chrome loop 子进程 agent_loop SUB->>SUB: 调用 LLM → 执行工具 → 重复 end SUB-->>PM: stdout: 结果 JSON MON-->>PM: _monitor 完成 PM-->>GEN: spawn_group 返回聚合结果 GEN-->>CH: yield LLMChunk(tool_call: result) end

3.3 协程/Task/进程层次总结

websocket_chat() [协程 - FastAPI ASGI 框架驱动,非我们创建的 Task]
│
├── bus.run(websocket) [Task A - asyncio.create_task, 独立消费队列推送 WebSocket]
│
├── ChatHandler.handle_chat() [协程 - websocket_chat 内 await,共享同一调用栈]
│   │
│   ├── llm_next_task [Task B - asyncio.create_task, 包装 generator.__anext__()]
│   │   └── AgentClient.chat_stream() [async generator - 被 Task B 驱动]
│   │       ├── await stream (LLM 流式)
│   │       └── await _execute_tool() → spawn_group()
│   │           ├── _monitor(proc0) [Task C0 - asyncio.create_task]
│   │           ├── _monitor(proc1) [Task C1 - asyncio.create_task]
│   │           └── await asyncio.wait(monitor_tasks)
│   │
│   └── approval_wait_task [Task D - asyncio.create_task, 等待审批 Event]
│
└── 子进程 (OS 级别,不在 asyncio 事件循环内)
    └── python -m sub_agent.runner [子进程]
        └── node @playwright/mcp [孙进程, start_new_session=True]
            └── Chrome [曾孙进程]

Task 与协程的区别:Task(A/B/C/D)是用 asyncio.create_task() 创建的独立调度单元,有自己的调用栈,可以被单独 .cancel()。协程(websocket_chat、handle_chat)是通过 await 串在一起的,共享调用栈,不能单独取消。

关键理解ChatHandler → llm_next_task → generator → _execute_tool → spawn_group → asyncio.wait 是一条连续的 await 链。ChatHandler 的主循环在等 llm_next_task 完成,而 llm_next_task 内部在等 spawn_groupspawn_group 在等子进程。整条链在同一个 asyncio 事件循环中,cancel 一个 Task 会沿链传播 CancelledError。


4. 子 Agent 取消:完整传递链

4.1 全局时序图

sequenceDiagram participant FE as 前端 participant API as POST /api/interrupt participant SM as SessionManager participant AM as ApprovalManager participant PM_CB as ProcessManager<br/>.cancel_group() participant CH as ChatHandler<br/>主循环 participant TASK as llm_next_task<br/>[asyncio.Task] participant GEN as AgentClient<br/>.chat_stream() participant SG as ProcessManager<br/>.spawn_group() participant SUB as 子进程<br/>agent_loop FE->>API: {session_id: "abc"} API->>SM: await cancel("abc") rect rgb(255, 235, 235) Note over SM: 第一步:设标志 SM->>SM: sessions["abc"]["cancelled"] = True end rect rgb(235, 255, 235) Note over SM: 第二步:调用回调链(同步顺序执行)(回调链只是实现优雅退出,不做兜底) SM->>AM: await cancel_session("abc") Note over AM: 所有 pending 审批标记为 rejected<br/>event.set() 唤醒,优雅拒绝审批 SM->>PM_CB: await cancel_group("abc") Note over PM_CB: 遍历该 session 的所有子进程树<br/>整树软终止(POSIX killpg / Windows taskkill /T)<br/>避免 runner 下的 chrome 等孙进程成孤儿 PM_CB->>SUB: SIGTERM(POSIX 整组 / Windows 父子树) Note over SUB: _sigterm_handler:<br/>_cancelled = True SM->>SM: await cm.cancel_session("abc") Note over SM: CommandManager: 前台/后台进程树软终止<br/>+ cancel monitor_task<br/>避免 shell 下的实际命令进程成孤儿 end API-->>FE: 200 OK rect rgb(235, 235, 255) Note over CH: 第三步:ChatHandler 轮询检测(~10ms 后) CH->>SM: await is_cancelled("abc") SM-->>CH: True CH->>TASK: llm_next_task.cancel() Note over TASK: CancelledError 被注入到 Task end rect rgb(255, 255, 220) Note over TASK: 第四步:CancelledError 沿 await 链传播 TASK->>GEN: CancelledError GEN->>SG: CancelledError(如果正在 await spawn_group) Note over SG: except asyncio.CancelledError: SG->>SG: cancel monitor tasks SG->>SUB: _terminate_and_kill()<br/>SIGTERM → 等10s → SIGKILL SG->>TASK: raise CancelledError end TASK-->>CH: CancelledError Note over CH: except CancelledError: pass rect rgb(240, 240, 240) Note over CH: 第五步:ChatHandler 收尾 alt active_tool_info 有值 CH->>FE: bus.push({type: "tool_rejected",<br/>reason 取决于 approval_pending}) end CH->>FE: bus.push({type: "interrupted"}) CH->>CH: break 退出主循环 Note over CH: finally: 清理 approval_wait_task end rect rgb(240, 255, 240) Note over SUB: 第六步:子进程优雅退出 Note over SUB: agent_loop 检查 _cancelled → True<br/>return "cancelled" SUB->>SUB: runner.py: await client.close() Note over SUB: keep_alive=False → disconnect()<br/>→ killpg() → Chrome 关闭 SUB->>SUB: 进程退出 end

4.2 两条路径的关系

取消信号通过两条独立路径到达子进程,形成双保险。子 Agent(ProcessManager)和命令执行(CommandManager)使用完全相同的双路径模式

flowchart TB CANCEL["session_manager.cancel(session_id)"] FLAG["sessions[sid]['cancelled'] = True"] CB1["回调1: approval_manager.cancel_session()"] CB2["回调3: pm.cancel_group()"] CB3["回调4: cm.cancel_session()"] CANCEL --> FLAG CANCEL --> CB1 CANCEL --> CB2 CANCEL --> CB3 CB1 --> WAKE["唤醒所有 pending 审批<br/>event.set(), approved=False"] CB2 --> SIGTERM_A1["路径 A: 整树软终止(子 Agent)<br/>POSIX killpg / Windows taskkill /T"] CB3 --> SIGTERM_A2["路径 A: 整树软终止(run_command 前台+后台)<br/>POSIX killpg / Windows taskkill /T"] FLAG --> POLL["ChatHandler 轮询<br/>is_cancelled() → True"] POLL --> CANCEL_TASK["llm_next_task.cancel()"] CANCEL_TASK --> CE["CancelledError 传播"] CE --> SG_CATCH["spawn_group / run_foreground<br/>except CancelledError"] SG_CATCH --> SIGTERM_B["路径 B: _terminate_and_kill() 整树<br/>POSIX: SIGTERM → 10s → SIGKILL<br/>Windows: 直接 taskkill /F /T → 5s 等待"] SIGTERM_A1 --> SUB_FLAG["子 Agent 进程: _cancelled = True"] SIGTERM_B --> SUB_FLAG SIGTERM_B --> CMD_KILL["run_command 子进程: 被终止"] SIGTERM_A2 --> CMD_KILL SUB_FLAG --> EXIT["子进程优雅退出<br/>close() Chrome"] style SIGTERM_A1 fill:#fdd,stroke:#c00 style SIGTERM_A2 fill:#fdd,stroke:#c00 style SIGTERM_B fill:#fdd,stroke:#c00 style CB1 fill:#dfd,stroke:#0a0 style FLAG fill:#ddf,stroke:#00c
路径 A(回调直接软终止) 路径 B(CancelledError 传播)
触发时机 cancel()立即执行 ChatHandler ~10ms 后轮询到
谁发信号 cancel_group() / cm.cancel_session() _terminate_and_kill()
强杀兜底 有(POSIX 10 秒后 SIGKILL;Windows 直接 taskkill /F)
能打断 LLM 流式 不能(只管子进程) (cancel Task 断开 HTTP 连接)
能打断审批等待 不能(由回调1 负责) 不需要(回调1 已处理)

路径 A 更快,路径 B 更彻底。两条都走整树软终止(POSIX killpg / Windows taskkill /T),幂等,重复无害。

进程树整组杀的必要性:单纯 proc.terminate() 在 POSIX 上只杀 shell(/bin/sh -c),在 Windows 上只杀 cmd.exe,会留下 python/git/chrome 等真正干活的孙进程成孤儿。修复方案是在创建子进程时用 start_new_session=True(POSIX)让子进程成为新进程组组长,杀时用 os.killpg(pgid, signum) 整组杀;Windows 上用 taskkill /T /PID 按父子关系递归杀整棵树。

Windows 软阶段跳过:CLI 进程不响应 taskkill 不带 /F 时发的 WM_CLOSE,软阶段实际等于 no-op。_terminate_and_kill 在 Windows 上直接走 taskkill /F /T 强杀整树 + 5 秒短等待,避免浪费 10 秒。POSIX 保留两阶段 SIGTERM → wait → SIGKILL,给进程优雅退出机会。

CommandManager 与 ProcessManager 的取消对比

ProcessManager(子 Agent) CommandManager(run_command)
路径 A 回调 cancel_group(): 整树软终止 cancel_session(): 前台/后台进程树软终止 + cancel monitor_task
路径 B 捕获点 spawn_group()except CancelledError 前台: run_foreground()except CancelledError
后台: _monitor_output()except CancelledError
路径 B 清理 cancel monitor tasks + _terminate_and_kill 前台: drain reader tasks + _terminate_and_kill
后台: _terminate_and_kill(仅目标 bg.proc)
子进程退出方式 信号处理器设 _cancelled → 优雅退出 被 SIGTERM/SIGKILL 直接终止
后台进程 无(全部前台等待) 有独立的 _bg_procs 字典管理,cancel 时遍历

前台 vs 后台 路径 B 的触发差异:前台的 CancelledError 是从 ChatHandler → llm_next_task → ... → run_foreground 的 await 链传播过来的(同一条 Task 的 cancel);后台 monitor 不在这条 await 链上,它的 CancelledError 来自 cancel_session主动 bg.monitor_task.cancel()(独立 Task 被定向取消)。两者最终都进入 except CancelledError 块走 _terminate_and_kill 兜底强杀,避免后台进程吞 SIGTERM 后变僵尸。


5. CancelledError 的产生与传播

5.1 CancelledError 从哪来?

无论是 LLM 模式还是 Agent 模式,CancelledError 的产生者都是同一个——ChatHandler

# chat_handler.py, handle_chat 主循环 section 0
llm_next_task.cancel()   # ← 这一行产生 CancelledError

asyncio.Task.cancel() 的原理:向 Task 当前挂起的 await 点注入一个 CancelledError 异常。就像在 Task 内部那个 await 的位置凭空 raise CancelledError() 一样。

5.2 CancelledError 注入到哪?

取决于 llm_next_task 此刻卡在哪个 await 上。

LLM 模式下llm_next_task 包装的是 LLMClient.chat_stream().__anext__(),generator 内部只有一个 await

async for chunk in stream   ← CancelledError 注入点(唯一)

Agent 模式下llm_next_task 包装的是 AgentClient.chat_stream().__anext__(),generator 内部可能卡在多个 await

flowchart TD TASK["llm_next_task.cancel()"] CE["CancelledError 被注入"] TASK --> CE CE --> WHERE{"generator 此刻卡在哪个 await?"} WHERE -->|"await stream.__anext__()"| A["场景 A: LLM 流式接收"] WHERE -->|"await _execute_tool()"| B["场景 B: MCP 工具执行"] WHERE -->|"await _execute_tool()<br/>→ spawn_group()"| C["场景 C: 子 Agent 执行"] WHERE -->|"await _execute_tool()<br/>→ run_foreground()"| D["场景 D: run_command 前台执行"] A --> A1["HTTP 连接被关闭<br/>OpenAI 停止生成 → 省 token"] A --> A2["CancelledError 从 generator 向上传播<br/>到 llm_next_task"] B --> B1["客户端停止等待结果<br/>MCP 服务端工具继续执行到完成"] B --> B2["CancelledError 从 generator 向上传播"] C --> C1["spawn_group() 捕获 CancelledError"] C1 --> C2["cancel 所有 monitor tasks"] C1 --> C3["_terminate_and_kill(processes)<br/>SIGTERM → 10s → SIGKILL"] C1 --> C4["raise CancelledError(继续向上传播)"] C4 --> C5["CancelledError 从 generator 向上传播"] D --> D1["run_foreground() 捕获 CancelledError"] D1 --> D2["_terminate_and_kill(proc)<br/>SIGTERM → 10s → SIGKILL"] D1 --> D3["drain reader tasks + flush 残留 bus 输出"] D1 --> D4["返回 '命令已被中断' 结果"] D4 --> D5["CancelledError 从 generator 向上传播"] A2 --> CATCH B2 --> CATCH C5 --> CATCH D5 --> CATCH CATCH["ChatHandler 捕获<br/>except CancelledError: pass"] style A1 fill:#dfd style C3 fill:#fdd style D2 fill:#fdd

5.3 传播链详解(场景 C:子 Agent 执行中)

这是最复杂的场景。关键点:CancelledError 不是直接打到 readline() 上的monitor_tasks 是独立的 Task(Task C),不在 llm_next_task(Task B)的调用栈里。CancelledError 只能沿 Task B 自己的 await 链传播,打中 asyncio.wait(),然后由 spawn_group 手动发起第二波 cancel 给 monitor tasks。

sequenceDiagram participant CH as ChatHandler participant TB as Task B<br/>llm_next_task participant GEN as chat_stream()<br/>[generator] participant SG as spawn_group() participant AW as asyncio.wait() participant TC as Task C0 / C1<br/>_monitor() participant PROC as 子进程 Note over TC: await proc.stderr.readline() Note over AW: 等待 monitor_tasks 完成 rect rgb(255, 235, 235) Note over CH,AW: 第一波:CancelledError 沿 Task B 的 await 链注入 CH->>TB: llm_next_task.cancel() TB->>GEN: CancelledError 注入到<br/>await _execute_tool() (:547) GEN->>SG: 穿过 _execute_tool()<br/>穿过 run_sub_agents_handler() SG->>AW: CancelledError 到达<br/>await asyncio.wait() (:146) AW-->>SG: raise CancelledError end Note over TC: 此时 monitor tasks 还活着!<br/>readline() 还在等数据 rect rgb(235, 235, 255) Note over SG,PROC: 第二波:spawn_group 手动清理(except 块内) Note over SG: except asyncio.CancelledError (:185) SG->>TC: for t in monitor_tasks: t.cancel() Note over TC: CancelledError 注入到<br/>await readline() TC-->>SG: await gather 消费异常 SG->>PROC: _terminate_and_kill()<br/>SIGTERM → 10s → SIGKILL end rect rgb(255, 255, 220) Note over SG,CH: 第三阶段:CancelledError 继续向上传播(raise) SG-->>GEN: raise CancelledError Note over GEN: 不捕获,generator 被终止 GEN-->>TB: CancelledError Note over TB: Task 状态变为 CANCELLED TB-->>CH: CancelledError Note over CH: except CancelledError: pass<br/>最终在这里被消化 (:226-229) end

为什么不是直接打在 readline() 上? 因为 asyncio.Task.cancel() 只能注入到该 Task 自己当前挂起的 await 点。Task B 的 await 链是:

Task B (llm_next_task)
  → __anext__()
    → chat_stream() generator
      → _execute_tool()
        → spawn_group()
          → asyncio.wait()      ← Task B 的最深 await 点,CancelledError 打这里

而 Task C(monitor)有自己的独立调用栈:

Task C0 (_monitor)
  → readline()                  ← Task C0 的 await 点,Task B 的 cancel 打不到这里

spawn_groupexcept CancelledError 块就是中转站——接住 Task B 的取消,翻译成对 Task C 的取消,再 raise 继续往上抛。

5.4 场景 A:LLM 流式接收中(LLM 模式和 Agent 模式通用)

llm_next_task.cancel()
│
▼
LLMClient/AgentClient.chat_stream()
│ async for chunk in stream:            # ← CancelledError 注入点
│                                       # HTTP 连接被关闭
│                                       # OpenAI 服务端检测到断连,停止生成
│
│ CancelledError 向上传播
▼
ChatHandler: except CancelledError: pass

效果:OpenAI 停止生成 token,节省费用。这在 LLM 模式和 Agent 模式中效果完全一致。

5.5 如果此时没有子 Agent 在运行?

spawn_group 不在调用栈中,CancelledError 只需要穿过 async for chunk in streamawait mcp_client.call_tool(),传播链更短,行为更简单。


6. 子进程的取消响应

子进程不参与 asyncio 事件循环,也不会收到 CancelledError。它通过 OS 信号(SIGTERM) 接收取消通知。

6.1 信号处理

# sub_agent/agent_loop.py:45-58
_cancelled = False

def _sigterm_handler(signum, frame):
    global _cancelled
    _cancelled = True   # 只设标志,不立即退出

signal.signal(signal.SIGTERM, _sigterm_handler)

6.2 检查点

# agent_loop.py
for step in range(max_steps):
    if _cancelled:                      # 检查点 1:轮次入口
        return _result("cancelled")

    # ... LLM 调用 ...
    # ... 工具执行(期间不检查)...

    if _cancelled:                      # 检查点 2:工具执行后
        return _result("cancelled")

关键设计:工具执行期间不中断。MCP 工具(如 browser_navigate)可能有副作用,中途打断可能导致状态不一致。等当前工具跑完再退出。

6.3 退出清理

# runner.py:201-208
for client in mcp_clients:
    await client.close()        # keep_alive=False → 触发完整清理
                                # → disconnect() → killpg() → Chrome 关闭
print(f"{label} 完成")

# asyncio.run(main()) 返回,子进程退出

7. 流程图总结

7.1 从简单到复杂:三种中断模式对比

LLM 模式(最简单)
──────────────────
cancel() → 标志 → ChatHandler 轮询 → llm_next_task.cancel()
                                       → CancelledError → HTTP 关闭
只涉及: asyncio 事件循环内的一条 await 链


Agent 模式(无子 Agent)
──────────────────────
cancel() → 标志 + 回调1(审批)
        → ChatHandler 轮询 → llm_next_task.cancel()
                              → CancelledError → HTTP 关闭 / MCP 打断
多了: 审批回调 + 工具执行打断


Agent 模式(有子 Agent)
──────────────────────
cancel() → 标志 + 回调1(审批) + 回调3(PM SIGTERM)  ← 路径 A
        → ChatHandler 轮询 → llm_next_task.cancel()
                              → CancelledError → spawn_group 捕获
                              → _terminate_and_kill (SIGTERM+SIGKILL)  ← 路径 B
                              → 子进程 _cancelled 标志 → 优雅退出
多了: 跨进程 SIGTERM + 双路径保险


Agent 模式(run_command 前台执行中)
──────────────────────────────────
cancel() → 标志 + 回调4(CM SIGTERM)  ← 路径 A
        → ChatHandler 轮询 → llm_next_task.cancel()
                              → CancelledError → run_foreground 捕获
                              → _terminate_and_kill (SIGTERM+SIGKILL)  ← 路径 B
与子 Agent 相同的双路径模式,但子进程无信号处理器,直接被终止

7.2 取消信号传播全景

flowchart LR subgraph 前端 BTN["中断按钮"] end subgraph "主进程 (asyncio 事件循环)" API["POST /api/interrupt"] SM["SessionManager.cancel()"] FLAG["cancelled = True"] CB1["回调1:<br/>ApprovalManager<br/>.cancel_session()"] CB_TTS["回调2:<br/>TTSClient<br/>._on_session_cancel()"] CB2["回调3:<br/>ProcessManager<br/>.cancel_group()"] CB3["回调4:<br/>CommandManager<br/>.cancel_session()"] CH["ChatHandler<br/>is_cancelled 轮询"] CANCEL_TASK["llm_next_task<br/>.cancel()"] CE["CancelledError<br/>沿 await 链传播"] SG["spawn_group() /<br/>run_foreground()<br/>except CancelledError"] TK["_terminate_and_kill()"] end subgraph "子 Agent 进程 (独立 Python 进程)" SH["SIGTERM handler<br/>_cancelled = True"] AL["agent_loop<br/>检查 _cancelled"] CLOSE["client.close()<br/>keep_alive=False"] end subgraph "孙进程" NODE["node playwright-mcp"] CHROME["Chrome"] end subgraph "run_command 子进程" CMD_PROC["shell 命令子进程<br/>(pip install / python 等)"] end BTN -->|POST| API API --> SM SM --> FLAG SM --> CB1 SM --> CB_TTS SM --> CB2 SM --> CB3 CB1 -->|"event.set()"| WAKE["唤醒审批等待"] CB_TTS -->|"pop pending_counts"| TTS_CLEAR["TTS 队列计数归零<br/>防止新会话误降级"] CB2 -->|"整树软终止 (路径A)<br/>killpg / taskkill /T"| SH CB3 -->|"整树软终止 (路径A)<br/>killpg / taskkill /T"| CMD_PROC FLAG -.->|"轮询"| CH CH --> CANCEL_TASK CANCEL_TASK --> CE CE --> SG SG --> TK TK -->|"整树强杀 (路径B)<br/>POSIX: SIGTERM→10s→SIGKILL<br/>Windows: taskkill /F /T"| SH TK -->|"整树强杀 (路径B)<br/>POSIX: SIGTERM→10s→SIGKILL<br/>Windows: taskkill /F /T"| CMD_PROC SH --> AL AL -->|"return cancelled"| CLOSE CLOSE -->|"killpg()"| NODE NODE -->|"进程组终止"| CHROME style FLAG fill:#ddf style CB1 fill:#dfd style CB_TTS fill:#ffe style CB2 fill:#fdd style CB3 fill:#fdd style CE fill:#ffd style SH fill:#fdd

7.3 各层的取消机制对比

层级 取消机制 取消粒度 延迟
SessionManager 布尔标志 cancelled=True + cancel_event.set() 即时设置 0ms
ApprovalManager event.set() 唤醒 即时唤醒 0ms
TTSClient _pending_counts.pop() 清零计数 即时清零 0ms
VoiceTurn (ASR 阶段) cancel_event.wait() 协作式唤醒 → push interrupted → return 即时唤醒 0ms
ProcessManager (路径A) 整树软终止(killpg / taskkill /T fire-and-forget 0ms
CommandManager (路径A) 整树软终止,前台+后台(killpg / taskkill /T fire-and-forget 0ms
ChatHandler 轮询 is_cancelled 每 10ms 一次 0~10ms
LLMClient/AgentClient llm_next_task.cancel() → CancelledError 打断当前 await 0ms
ProcessManager (路径B) _terminate_and_kill() 整树 POSIX: SIGTERM→10s→SIGKILL;Windows: 直接 taskkill /F /T+5s POSIX 010s / Windows 05s
CommandManager (路径B) _terminate_and_kill() 整树(前台 run_foreground + 后台 _monitor_output 都覆盖) POSIX: SIGTERM→10s→SIGKILL;Windows: 直接 taskkill /F /T+5s POSIX 010s / Windows 05s
子 Agent 进程 agent_loop _cancelled 标志 当前工具执行完后 0~30s
Chrome os.killpg() 进程组终止(runner 退出时由 MCPClient 触发) 即时 0ms

7.4 为什么不用 CancelledError 一路到底?

因为 CancelledError 只在 asyncio 事件循环内有效,跨不了进程边界。子进程有自己的事件循环,主进程的 CancelledError 传不进去。所以:

  • 主进程内部:CancelledError(asyncio 原生机制)
  • 跨进程:SIGTERM(OS 原生机制)
  • 子进程内部:布尔标志轮询(最简单可靠)

三种机制,各管一段,拼成完整的取消链路。

run_command 的子进程(如 pip installpython script.py)不像子 Agent 那样有信号处理器,收到 SIGTERM 后直接退出。如果 SIGTERM 无效(进程卡死),路径 B 的 _terminate_and_kill 在 POSIX 上 10 秒后发 SIGKILL 强制终止;Windows 上 CLI 进程对 taskkill 不带 /F 时发的 WM_CLOSE 不响应,软阶段 no-op,因此直接走 taskkill /F /T 整树强杀 + 5 秒短等待。

整树杀的实现:主进程在 spawn 子进程时,POSIX 下用 start_new_session=True 让子进程成为新进程组组长,杀时 os.killpg(pgid, signum) 一次终止整组(含孙、曾孙);Windows 下不需要特殊 flag,taskkill /T /PID 利用系统记录的父子关系递归杀整棵树。这保证了 /bin/sh -c <cmd> 下的实际命令、cmd.exe /c <cmd> 下的 python.exe、runner 下的 node + chrome 都能被一并清理,不留孤儿。


8. 内部工具部分结果缓存(tool_output_cache)

内部工具(run_sub_agentsrun_command)执行时间较长,中断时可能已有部分结果。SessionMessageBus.tool_output_cache 用于实时缓存工具的中间输出,中断时由 ChatHandler 读取并附加到 tool_rejected 消息的 partial_result 字段。

8.1 缓存结构

单值缓存(dictNone),内嵌 _id 字段用于写入校验:

# None:无缓存(初始状态 / executing 入口清空后)
bus.tool_output_cache = None

# run_sub_agents 缓存(预填 pending 占位,监控期间滚动刷新)
bus.tool_output_cache = {
    "_id": group_id,
    "total": N,
    "completed": 0, "cancelled": 0, "failed": 0,
    "results": [
        {"agent_id": "sub_0", "uuid": "...", "status": "pending|running|completed|cancelled|error",
         "result": "<已收集进展或终态结果>", "steps": N},
        ...
    ],
}

# run_command 缓存
bus.tool_output_cache = {"_id": command_id, "output": "..."}

8.2 生命周期

executing 状态入口 → cache = None(清空上一轮残留)
    ↓
工具 handler 启动 → cache = {"_id": xxx, ...}(在首个 await 之前初始化)
    ↓
工具执行中 → 协程实时追加数据(写入前校验 cache._id == my_id)
    ↓
(run_sub_agents 专属)轮边界 step_snapshot → _monitor 同步写入 results[i].result
    (受 _TERMINAL_STATUSES 守卫保护,stdout 已 replace 终态后不回滚;不推送 bus)
    ↓
中断 → ChatHandler 读取 cache,剥离 _id,附加到 tool_rejected.partial_result
    或
正常完成 → 下一个 executing 入口清空 cache

run_sub_agents 的 SIGKILL 兜底:子 Agent 子进程被强杀(POSIX 10s grace 到点 / Windows 直接 taskkill /F)时 stdout 终态 JSON 来不及写出,常规路径下 cache 会停留在 runningresult=""agent_loop 每轮工具批执行完后向 stderr emit step_snapshot 事件(载荷为 _collect_progress(messages),拼接所有 assistant content),父端 _monitor 把这份快照同步写入 cache.results[i].result。强杀发生时,partial_result 拿到的是上一轮工具执行完时的进展,不再为空。详见 SUB_AGENT_DESIGN.md §4.2

8.3 _id 校验机制

asyncio 单线程模型中,同步 dict 操作在两个 await 之间是原子的。但取消后过期协程可能在新工具开始执行后才完成最后的写入。_id 校验防止这种情况:

# 写入侧(ProcessManager / CommandManager)
cache = bus.tool_output_cache
if cache is not None and cache.get("_id") == my_id:
    cache["results"].append(result)  # 校验通过才写入

# 读取侧(ChatHandler._build_tool_rejected_payload,同步函数)
# 同步 staticmethod,保证 cache 读取 → _id 剥离 在同一事件循环帧内完成
if not approval_pending:
    cache = bus.tool_output_cache
    if cache:
        payload["partial_result"] = {k: v for k, v in cache.items() if k != "_id"}

8.4 前端处理

前端收到含 partial_resulttool_rejected 后,在 flushHistorySegments() 中将部分结果拼接在 tool 消息内容前面:

{partial_result JSON}...
工具调用被中断

这样下一轮对话时 LLM 能看到工具的部分执行结果,避免完全丢失已完成的工作。