中断流程完整解析

本文从最简单的 LLM 纯对话模式入手,逐步引入 Agent 工具调用和子 Agent 进程管理,完整解析系统中断(Cancel)的实现原理。


1. LLM 模式中断:最简单的情况

1.1 LLM 模式的流式调用

在 LLM 模式下,系统只有两个核心组件参与:

  • ChatHandler:主循环,负责驱动 LLM 流式输出和推送消息
  • LLMClient:对 OpenAI API 的薄封装,chat_stream() 是一个 async generator

请求处理流程很直白:

前端 → WebSocket → ChatHandler.handle_chat()
                        ↓
                   llm_iterator = llm_client.chat_stream(messages, session_id)
                        ↓
                   主循环:每 10ms 一轮
                   ├── llm_next_task = create_task(llm_iterator.__anext__())
                   ├── 等待 llm_next_task 完成 → 拿到 LLMChunk
                   ├── 提交 TTS / 推送文本
                   └── 按序推送结果到前端

整个过程只有一条 await 链,没有子进程,没有工具调用。

1.2 LLM 模式的中断路径

当用户点击中断按钮:

sequenceDiagram participant FE as 前端 participant API as POST /api/interrupt participant SM as SessionManager participant CH as ChatHandler<br/>主循环 participant TASK as llm_next_task<br/>[asyncio.Task] participant GEN as LLMClient<br/>.chat_stream() participant LLM as OpenAI API FE->>API: {session_id: "abc"} API->>SM: await cancel("abc") SM->>SM: sessions["abc"]["cancelled"] = True API-->>FE: 200 OK Note over CH: ~10ms 后,主循环轮询到 CH->>SM: await is_cancelled("abc") SM-->>CH: True CH->>TASK: llm_next_task.cancel() Note over TASK: CancelledError 被注入到 Task<br/>当前 await 点 TASK->>GEN: CancelledError GEN->>LLM: HTTP 连接被关闭 Note over LLM: 服务端检测到断连<br/>停止生成 token TASK-->>CH: CancelledError Note over CH: except CancelledError: pass CH->>FE: bus.push({type: "interrupted"}) CH->>CH: break 退出主循环

整个 LLM 模式的中断就三步

步骤 动作 效果
1 cancel() 设置布尔标志 标记会话为已中断
2 ChatHandler 轮询 is_cancelledllm_next_task.cancel() 注入 CancelledError 到 generator 的当前 await 点
3 CancelledError 传播到 async for chunk in stream HTTP 连接关闭,OpenAI 停止生成,节省 token

这里有两个关键点需要理解:

CancelledError 从哪来? 不是 SessionManager 产生的。是 ChatHandler 调用 llm_next_task.cancel() 产生的。asyncio.Task.cancel() 的原理是:向 Task 当前挂起的 await 点注入一个 CancelledError,就像在那个 await 位置凭空 raise CancelledError() 一样。

为什么能省 token? LLMClient 的 chat_stream() 内部是 async for chunk in stream,CancelledError 会打断这个 await,底层的 HTTP 连接随之关闭。OpenAI 服务端检测到客户端断连后会停止生成 token,不再计费。

1.3 LLM 模式中断的协程层次

websocket_chat() [协程 - FastAPI ASGI 框架驱动]
├── bus.run(websocket) [Task A - 独立消费队列推送 WebSocket]
└── ChatHandler.handle_chat() [协程 - websocket_chat 内 await]
    └── llm_next_task [Task B - 包装 generator.__anext__()]
        └── LLMClient.chat_stream() [async generator]
            └── async for chunk in stream (← CancelledError 注入点)
                └── HTTP 连接 → OpenAI API

中断时 CancelledError 的传播:llm_next_task.cancel()async for chunk in stream → HTTP 断开 → Task 状态变为 CANCELLED → ChatHandler 捕获 except CancelledError: pass → 推送 interrupted → break。

非常简洁。但当引入工具调用后,事情变得复杂。


2. Agent 模式中断:引入工具调用

2.1 Agent 模式与 LLM 模式的核心区别

Agent 模式使用 AgentClient 替代 LLMClient。两者都暴露 chat_stream() 接口,ChatHandler 不关心内部差异。但 AgentClient.chat_stream() 内部多了:

  1. 规划轮(第 0 轮):纯文本,判断是否需要工具
  2. Agent 循环(第 1 轮起):LLM 调用 → 工具执行 → 结果返回 → 再次 LLM 调用 → ...
  3. 审批等待:工具执行前需要用户确认
  4. 子 Agentrun_sub_agents 工具会启动子进程

ChatHandler 的主循环代码完全不变——它只是从 llm_iteratorLLMChunk。复杂度全部在 AgentClient.chat_stream() 这个 generator 内部。

2.2 AgentClient 内部的中断检查点

AgentClient.chat_stream() 在 5 个位置检查 is_cancelled

# 检查点 1: 规划轮开始前(:338)
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 2: 规划轮流式接收中(:221,每个 chunk)
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 3: Agent 循环入口(:354,每轮开始)
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 4: Agent 循环流式接收中(:402,每个 chunk)
if session_id and await session_manager.is_cancelled(session_id):
    return

# 检查点 5: 工具执行前(:494,每个工具调用前)
if session_id and await session_manager.is_cancelled(session_id):
    return

这些是协作式检查:generator 主动查询标志然后 return,让 generator 优雅关闭。这与 CancelledError 的被动注入形成互补——如果 generator 正好卡在一个长时间 await(比如等待工具执行),CancelledError 能立即打断它。

2.3 Agent 模式中断的完整时序

sequenceDiagram participant FE as 前端 participant API as POST /api/interrupt participant SM as SessionManager participant AM as ApprovalManager participant CH as ChatHandler participant TASK as llm_next_task participant GEN as AgentClient<br/>.chat_stream() participant MCP as MCP 工具 FE->>API: {session_id: "abc"} API->>SM: await cancel("abc") rect rgb(255, 235, 235) Note over SM: 设标志 + 回调 SM->>SM: cancelled = True SM->>AM: cancel_session("abc") Note over AM: 唤醒审批等待 event.set() end API-->>FE: 200 OK rect rgb(235, 235, 255) Note over CH: ~10ms 后轮询到 CH->>SM: is_cancelled → True CH->>TASK: llm_next_task.cancel() end alt 场景 A: generator 在等 LLM 流式 TASK->>GEN: CancelledError GEN->>GEN: async for chunk in stream 被打断 Note over GEN: HTTP 关闭,省 token else 场景 B: generator 在等 MCP 工具 TASK->>GEN: CancelledError GEN->>MCP: await call_tool 被打断 Note over MCP: 客户端不再等待结果<br/>MCP 服务端工具继续执行到完成 else 场景 C: generator 在等审批 Note over AM: 回调已设 approved=False Note over GEN: generator 检查到 is_cancelled → return end TASK-->>CH: CancelledError / StopAsyncIteration CH->>FE: bus.push({type: "interrupted"})

关键理解:与 LLM 模式相比,Agent 模式的 generator 可能卡在三种不同的 await 上。CancelledError 注入的位置不同,效果也不同:

generator 卡在 CancelledError 效果
async for chunk in stream HTTP 关闭,OpenAI 停止生成
await mcp_client.call_tool() 客户端停止等待,MCP 服务端工具继续执行到完成
await session_manager.is_cancelled() generator 自己 return(协作式退出)

3. 子 Agent 中断:引入子进程和回调链

3.1 系统启动:回调链注册

在理解子 Agent 取消之前,先看清系统启动时谁把谁连在了一起。

sequenceDiagram participant Init as managers/__init__.py participant SM as SessionManager participant AM as ApprovalManager participant LS as websocket_server<br/>lifespan() participant AC as AgentClient participant REG as sub_agent/__init__.py<br/>register_sub_agent_tool() participant PM as ProcessManager Note over Init: 包导入时执行(全局单例) Init->>SM: SessionManager() Init->>AM: ApprovalManager() Init->>SM: on_cancel(approval_manager.cancel_session) Note over SM: _cancel_callbacks = [AM.cancel_session] Note over LS: 服务启动时执行 LS->>AC: AgentClient(llm_config, agent_config) LS->>AC: await connect_mcp_servers() LS->>REG: register_sub_agent_tool(agent_client, session_manager) REG->>PM: ProcessManager(session_manager) REG->>SM: on_cancel(pm.cancel_group) Note over SM: _cancel_callbacks = [AM.cancel_session, PM.cancel_group] REG->>AC: register_internal_tool("run_sub_agents", handler)

启动完成后,session_manager._cancel_callbacks 中有两个回调:

序号 回调 注册位置 作用
1 approval_manager.cancel_session managers/__init__.py:13 唤醒所有卡在审批等待的协程
2 pm.cancel_group sub_agent/__init__.py:126 向所有子 Agent 子进程发 SIGTERM

注意:纯 LLM 模式中回调 2 不存在(因为未注册 register_sub_agent_tool)。回调 1 在 Agent 模式下才有意义(LLM 模式没有审批流程)。

3.2 正常请求:协程与子进程启动链

以一次涉及子 Agent 的完整请求为例,标注每一层的协程/Task/进程关系:

sequenceDiagram participant FE as 前端 participant WS as websocket_chat()<br/>[协程 - ASGI 驱动] participant BUS as SessionMessageBus<br/>[Task A: bus.run] participant CH as ChatHandler.handle_chat()<br/>[协程 - WS 内 await] participant GEN as AgentClient.chat_stream()<br/>[async generator] participant LLM as OpenAI API participant PM as ProcessManager.spawn_group()<br/>[协程 - generator 内 await] participant MON as _monitor()<br/>[Task C: per sub-process] participant SUB as python -m sub_agent.runner<br/>[子进程, PID=xxx] participant NODE as node playwright-mcp<br/>[孙进程, PID=yyy] FE->>WS: WebSocket 消息 {session_id, messages} Note over WS: session_manager.register(session_id) WS->>BUS: asyncio.create_task(bus.run(websocket)) Note over BUS: Task A:从 bus 队列读取消息,推送到 WebSocket WS->>CH: await handle_text_chat(session_id, messages) CH->>GEN: llm_iterator = agent_client.chat_stream(messages, session_id) Note over CH: 主循环开始 loop ChatHandler 主循环 (每 10ms 一轮) CH->>CH: llm_next_task [Task B] = create_task(llm_iterator.__anext__()) Note over CH: 检查 is_cancelled → False,继续 Note over GEN: generator 被 __anext__() 驱动执行 GEN->>LLM: await stream = client.chat.completions.create(stream=True) LLM-->>GEN: chunk (tool_call: run_sub_agents) GEN-->>CH: yield LLMChunk(tool_call: start) GEN-->>CH: yield LLMChunk(tool_call: approval_required) Note over CH: 等待用户审批... FE->>CH: POST /api/tool-approve {approved: true} GEN-->>CH: yield LLMChunk(tool_call: executing) Note over GEN: 审批通过,执行工具 GEN->>PM: await _execute_tool("run_sub_agents", args) PM->>SUB: asyncio.create_subprocess_exec("python", "-m", "sub_agent.runner", spec) PM->>MON: asyncio.create_task(_monitor(proc)) Note over MON: Task C:逐行读取 sub 进程 stderr Note over SUB: 子进程启动 SUB->>NODE: FastMCP StdioTransport 启动 node playwright-mcp NODE->>NODE: 启动 Chrome loop 子进程 agent_loop SUB->>SUB: 调用 LLM → 执行工具 → 重复 end SUB-->>PM: stdout: 结果 JSON MON-->>PM: _monitor 完成 PM-->>GEN: spawn_group 返回聚合结果 GEN-->>CH: yield LLMChunk(tool_call: result) end

3.3 协程/Task/进程层次总结

websocket_chat() [协程 - FastAPI ASGI 框架驱动,非我们创建的 Task]
│
├── bus.run(websocket) [Task A - asyncio.create_task, 独立消费队列推送 WebSocket]
│
├── ChatHandler.handle_chat() [协程 - websocket_chat 内 await,共享同一调用栈]
│   │
│   ├── llm_next_task [Task B - asyncio.create_task, 包装 generator.__anext__()]
│   │   └── AgentClient.chat_stream() [async generator - 被 Task B 驱动]
│   │       ├── await stream (LLM 流式)
│   │       └── await _execute_tool() → spawn_group()
│   │           ├── _monitor(proc0) [Task C0 - asyncio.create_task]
│   │           ├── _monitor(proc1) [Task C1 - asyncio.create_task]
│   │           └── await asyncio.wait(monitor_tasks)
│   │
│   └── approval_wait_task [Task D - asyncio.create_task, 等待审批 Event]
│
└── 子进程 (OS 级别,不在 asyncio 事件循环内)
    └── python -m sub_agent.runner [子进程]
        └── node @playwright/mcp [孙进程, start_new_session=True]
            └── Chrome [曾孙进程]

Task 与协程的区别:Task(A/B/C/D)是用 asyncio.create_task() 创建的独立调度单元,有自己的调用栈,可以被单独 .cancel()。协程(websocket_chat、handle_chat)是通过 await 串在一起的,共享调用栈,不能单独取消。

关键理解ChatHandler → llm_next_task → generator → _execute_tool → spawn_group → asyncio.wait 是一条连续的 await 链。ChatHandler 的主循环在等 llm_next_task 完成,而 llm_next_task 内部在等 spawn_groupspawn_group 在等子进程。整条链在同一个 asyncio 事件循环中,cancel 一个 Task 会沿链传播 CancelledError。


4. 子 Agent 取消:完整传递链

4.1 全局时序图

sequenceDiagram participant FE as 前端 participant API as POST /api/interrupt participant SM as SessionManager participant AM as ApprovalManager participant PM_CB as ProcessManager<br/>.cancel_group() participant CH as ChatHandler<br/>主循环 participant TASK as llm_next_task<br/>[asyncio.Task] participant GEN as AgentClient<br/>.chat_stream() participant SG as ProcessManager<br/>.spawn_group() participant SUB as 子进程<br/>agent_loop FE->>API: {session_id: "abc"} API->>SM: await cancel("abc") rect rgb(255, 235, 235) Note over SM: 第一步:设标志 SM->>SM: sessions["abc"]["cancelled"] = True end rect rgb(235, 255, 235) Note over SM: 第二步:调用回调链(同步顺序执行)(回调链只是实现优雅退出,不做兜底) SM->>AM: await cancel_session("abc") Note over AM: 所有 pending 审批标记为 rejected<br/>event.set() 唤醒,优雅拒绝审批 SM->>PM_CB: await cancel_group("abc") Note over PM_CB: 遍历该 session 的所有子进程<br/>proc.terminate() 发 SIGTERM PM_CB->>SUB: SIGTERM Note over SUB: _sigterm_handler:<br/>_cancelled = True end API-->>FE: 200 OK rect rgb(235, 235, 255) Note over CH: 第三步:ChatHandler 轮询检测(~10ms 后) CH->>SM: await is_cancelled("abc") SM-->>CH: True CH->>TASK: llm_next_task.cancel() Note over TASK: CancelledError 被注入到 Task end rect rgb(255, 255, 220) Note over TASK: 第四步:CancelledError 沿 await 链传播 TASK->>GEN: CancelledError GEN->>SG: CancelledError(如果正在 await spawn_group) Note over SG: except asyncio.CancelledError: SG->>SG: cancel monitor tasks SG->>SUB: _terminate_and_kill()<br/>SIGTERM → 等10s → SIGKILL SG->>TASK: raise CancelledError end TASK-->>CH: CancelledError Note over CH: except CancelledError: pass rect rgb(240, 240, 240) Note over CH: 第五步:ChatHandler 收尾 CH->>FE: bus.push({type: "interrupted"}) CH->>CH: break 退出主循环 Note over CH: finally: 清理 approval_wait_task end rect rgb(240, 255, 240) Note over SUB: 第六步:子进程优雅退出 Note over SUB: agent_loop 检查 _cancelled → True<br/>return "cancelled" SUB->>SUB: runner.py: await client.close() Note over SUB: keep_alive=False → disconnect()<br/>→ killpg() → Chrome 关闭 SUB->>SUB: 进程退出 end

4.2 两条路径的关系

取消信号通过两条独立路径到达子进程,形成双保险:

flowchart TB CANCEL["session_manager.cancel(session_id)"] FLAG["sessions[sid]['cancelled'] = True"] CB1["回调1: approval_manager.cancel_session()"] CB2["回调2: pm.cancel_group()"] CANCEL --> FLAG CANCEL --> CB1 CANCEL --> CB2 CB1 --> WAKE["唤醒所有 pending 审批<br/>event.set(), approved=False"] CB2 --> SIGTERM_A["路径 A: proc.terminate()<br/>即时 SIGTERM"] FLAG --> POLL["ChatHandler 轮询<br/>is_cancelled() → True"] POLL --> CANCEL_TASK["llm_next_task.cancel()"] CANCEL_TASK --> CE["CancelledError 传播"] CE --> SG_CATCH["spawn_group except CancelledError"] SG_CATCH --> SIGTERM_B["路径 B: _terminate_and_kill()<br/>SIGTERM → 10s → SIGKILL"] SIGTERM_A --> SUB_FLAG["子进程: _cancelled = True"] SIGTERM_B --> SUB_FLAG SUB_FLAG --> EXIT["子进程优雅退出<br/>close() Chrome"] style SIGTERM_A fill:#fdd,stroke:#c00 style SIGTERM_B fill:#fdd,stroke:#c00 style CB1 fill:#dfd,stroke:#0a0 style FLAG fill:#ddf,stroke:#00c
路径 A(回调直接 SIGTERM) 路径 B(CancelledError 传播)
触发时机 cancel()立即执行 ChatHandler ~10ms 后轮询到
谁发 SIGTERM cancel_group() _terminate_and_kill()
SIGKILL 兜底 有(10 秒后)
能打断 LLM 流式 不能(只管子进程) (cancel Task 断开 HTTP 连接)
能打断审批等待 不能(由回调1 负责) 不需要(回调1 已处理)

路径 A 更快,路径 B 更彻底。两条都会发 SIGTERM,proc.terminate() 幂等,重复无害。


5. CancelledError 的产生与传播

5.1 CancelledError 从哪来?

无论是 LLM 模式还是 Agent 模式,CancelledError 的产生者都是同一个——ChatHandler

# chat_handler.py:226
llm_next_task.cancel()   # ← 这一行产生 CancelledError

asyncio.Task.cancel() 的原理:向 Task 当前挂起的 await 点注入一个 CancelledError 异常。就像在 Task 内部那个 await 的位置凭空 raise CancelledError() 一样。

5.2 CancelledError 注入到哪?

取决于 llm_next_task 此刻卡在哪个 await 上。

LLM 模式下llm_next_task 包装的是 LLMClient.chat_stream().__anext__(),generator 内部只有一个 await

async for chunk in stream   ← CancelledError 注入点(唯一)

Agent 模式下llm_next_task 包装的是 AgentClient.chat_stream().__anext__(),generator 内部可能卡在多个 await

flowchart TD TASK["llm_next_task.cancel()"] CE["CancelledError 被注入"] TASK --> CE CE --> WHERE{"generator 此刻卡在哪个 await?"} WHERE -->|"await stream.__anext__()"| A["场景 A: LLM 流式接收"] WHERE -->|"await _execute_tool()"| B["场景 B: MCP 工具执行"] WHERE -->|"await _execute_tool()<br/>→ spawn_group()"| C["场景 C: 子 Agent 执行"] A --> A1["HTTP 连接被关闭<br/>OpenAI 停止生成 → 省 token"] A --> A2["CancelledError 从 generator 向上传播<br/>到 llm_next_task"] B --> B1["客户端停止等待结果<br/>MCP 服务端工具继续执行到完成"] B --> B2["CancelledError 从 generator 向上传播"] C --> C1["spawn_group() 捕获 CancelledError"] C1 --> C2["cancel 所有 monitor tasks"] C1 --> C3["_terminate_and_kill(processes)<br/>SIGTERM → 10s → SIGKILL"] C1 --> C4["raise CancelledError(继续向上传播)"] C4 --> C5["CancelledError 从 generator 向上传播"] A2 --> CATCH B2 --> CATCH C5 --> CATCH CATCH["ChatHandler 捕获<br/>except CancelledError: pass"] style A1 fill:#dfd style C3 fill:#fdd

5.3 传播链详解(场景 C:子 Agent 执行中)

这是最复杂的场景。关键点:CancelledError 不是直接打到 readline() 上的monitor_tasks 是独立的 Task(Task C),不在 llm_next_task(Task B)的调用栈里。CancelledError 只能沿 Task B 自己的 await 链传播,打中 asyncio.wait(),然后由 spawn_group 手动发起第二波 cancel 给 monitor tasks。

sequenceDiagram participant CH as ChatHandler participant TB as Task B<br/>llm_next_task participant GEN as chat_stream()<br/>[generator] participant SG as spawn_group() participant AW as asyncio.wait() participant TC as Task C0 / C1<br/>_monitor() participant PROC as 子进程 Note over TC: await proc.stderr.readline() Note over AW: 等待 monitor_tasks 完成 rect rgb(255, 235, 235) Note over CH,AW: 第一波:CancelledError 沿 Task B 的 await 链注入 CH->>TB: llm_next_task.cancel() TB->>GEN: CancelledError 注入到<br/>await _execute_tool() (:547) GEN->>SG: 穿过 _execute_tool()<br/>穿过 run_sub_agents_handler() SG->>AW: CancelledError 到达<br/>await asyncio.wait() (:146) AW-->>SG: raise CancelledError end Note over TC: 此时 monitor tasks 还活着!<br/>readline() 还在等数据 rect rgb(235, 235, 255) Note over SG,PROC: 第二波:spawn_group 手动清理(except 块内) Note over SG: except asyncio.CancelledError (:185) SG->>TC: for t in monitor_tasks: t.cancel() Note over TC: CancelledError 注入到<br/>await readline() TC-->>SG: await gather 消费异常 SG->>PROC: _terminate_and_kill()<br/>SIGTERM → 10s → SIGKILL end rect rgb(255, 255, 220) Note over SG,CH: 第三阶段:CancelledError 继续向上传播(raise) SG-->>GEN: raise CancelledError Note over GEN: 不捕获,generator 被终止 GEN-->>TB: CancelledError Note over TB: Task 状态变为 CANCELLED TB-->>CH: CancelledError Note over CH: except CancelledError: pass<br/>最终在这里被消化 (:226-229) end

为什么不是直接打在 readline() 上? 因为 asyncio.Task.cancel() 只能注入到该 Task 自己当前挂起的 await 点。Task B 的 await 链是:

Task B (llm_next_task)
  → __anext__()
    → chat_stream() generator
      → _execute_tool()
        → spawn_group()
          → asyncio.wait()      ← Task B 的最深 await 点,CancelledError 打这里

而 Task C(monitor)有自己的独立调用栈:

Task C0 (_monitor)
  → readline()                  ← Task C0 的 await 点,Task B 的 cancel 打不到这里

spawn_groupexcept CancelledError 块就是中转站——接住 Task B 的取消,翻译成对 Task C 的取消,再 raise 继续往上抛。

5.4 场景 A:LLM 流式接收中(LLM 模式和 Agent 模式通用)

llm_next_task.cancel()
│
▼
LLMClient/AgentClient.chat_stream()
│ async for chunk in stream:            # ← CancelledError 注入点
│                                       # HTTP 连接被关闭
│                                       # OpenAI 服务端检测到断连,停止生成
│
│ CancelledError 向上传播
▼
ChatHandler: except CancelledError: pass

效果:OpenAI 停止生成 token,节省费用。这在 LLM 模式和 Agent 模式中效果完全一致。

5.5 如果此时没有子 Agent 在运行?

spawn_group 不在调用栈中,CancelledError 只需要穿过 async for chunk in streamawait mcp_client.call_tool(),传播链更短,行为更简单。


6. 子进程的取消响应

子进程不参与 asyncio 事件循环,也不会收到 CancelledError。它通过 OS 信号(SIGTERM) 接收取消通知。

6.1 信号处理

# sub_agent/agent_loop.py:45-58
_cancelled = False

def _sigterm_handler(signum, frame):
    global _cancelled
    _cancelled = True   # 只设标志,不立即退出

signal.signal(signal.SIGTERM, _sigterm_handler)

6.2 检查点

# agent_loop.py
for step in range(max_steps):
    if _cancelled:                      # 检查点 1:轮次入口
        return _result("cancelled")

    # ... LLM 调用 ...
    # ... 工具执行(期间不检查)...

    if _cancelled:                      # 检查点 2:工具执行后
        return _result("cancelled")

关键设计:工具执行期间不中断。MCP 工具(如 browser_navigate)可能有副作用,中途打断可能导致状态不一致。等当前工具跑完再退出。

6.3 退出清理

# runner.py:201-208
for client in mcp_clients:
    await client.close()        # keep_alive=False → 触发完整清理
                                # → disconnect() → killpg() → Chrome 关闭
print(f"{label} 完成")

# asyncio.run(main()) 返回,子进程退出

7. 流程图总结

7.1 从简单到复杂:三种中断模式对比

LLM 模式(最简单)
──────────────────
cancel() → 标志 → ChatHandler 轮询 → llm_next_task.cancel()
                                       → CancelledError → HTTP 关闭
只涉及: asyncio 事件循环内的一条 await 链


Agent 模式(无子 Agent)
──────────────────────
cancel() → 标志 + 回调1(审批)
        → ChatHandler 轮询 → llm_next_task.cancel()
                              → CancelledError → HTTP 关闭 / MCP 打断
多了: 审批回调 + 工具执行打断


Agent 模式(有子 Agent)
──────────────────────
cancel() → 标志 + 回调1(审批) + 回调2(SIGTERM)  ← 路径 A
        → ChatHandler 轮询 → llm_next_task.cancel()
                              → CancelledError → spawn_group 捕获
                              → _terminate_and_kill (SIGTERM+SIGKILL)  ← 路径 B
                              → 子进程 _cancelled 标志 → 优雅退出
多了: 跨进程 SIGTERM + 双路径保险

7.2 取消信号传播全景

flowchart LR subgraph 前端 BTN["中断按钮"] end subgraph "主进程 (asyncio 事件循环)" API["POST /api/interrupt"] SM["SessionManager.cancel()"] FLAG["cancelled = True"] CB1["回调1:<br/>ApprovalManager<br/>.cancel_session()"] CB2["回调2:<br/>ProcessManager<br/>.cancel_group()"] CH["ChatHandler<br/>is_cancelled 轮询"] CANCEL_TASK["llm_next_task<br/>.cancel()"] CE["CancelledError<br/>沿 await 链传播"] SG["spawn_group()<br/>except CancelledError"] TK["_terminate_and_kill()"] end subgraph "子进程 (独立 Python 进程)" SH["SIGTERM handler<br/>_cancelled = True"] AL["agent_loop<br/>检查 _cancelled"] CLOSE["client.close()<br/>keep_alive=False"] end subgraph "孙进程" NODE["node playwright-mcp"] CHROME["Chrome"] end BTN -->|POST| API API --> SM SM --> FLAG SM --> CB1 SM --> CB2 CB1 -->|"event.set()"| WAKE["唤醒审批等待"] CB2 -->|"SIGTERM (路径A)"| SH FLAG -.->|"轮询"| CH CH --> CANCEL_TASK CANCEL_TASK --> CE CE --> SG SG --> TK TK -->|"SIGTERM+SIGKILL (路径B)"| SH SH --> AL AL -->|"return cancelled"| CLOSE CLOSE -->|"killpg()"| NODE NODE -->|"进程组终止"| CHROME style FLAG fill:#ddf style CB1 fill:#dfd style CB2 fill:#fdd style CE fill:#ffd style SH fill:#fdd

7.3 各层的取消机制对比

层级 取消机制 取消粒度 延迟
SessionManager 布尔标志 cancelled=True 即时设置 0ms
ApprovalManager event.set() 唤醒 即时唤醒 0ms
ProcessManager (路径A) proc.terminate() SIGTERM fire-and-forget 0ms
ChatHandler 轮询 is_cancelled 每 10ms 一次 0~10ms
LLMClient/AgentClient llm_next_task.cancel() → CancelledError 打断当前 await 0ms
ProcessManager (路径B) _terminate_and_kill() SIGTERM+SIGKILL 等待 10s 后强杀 10s
子进程 agent_loop _cancelled 标志 当前工具执行完后 0~30s
Chrome os.killpg() 进程组终止 即时 0ms

7.4 为什么不用 CancelledError 一路到底?

因为 CancelledError 只在 asyncio 事件循环内有效,跨不了进程边界。子进程有自己的事件循环,主进程的 CancelledError 传不进去。所以:

  • 主进程内部:CancelledError(asyncio 原生机制)
  • 跨进程:SIGTERM(OS 原生机制)
  • 子进程内部:布尔标志轮询(最简单可靠)