中断流程完整解析
本文从最简单的 LLM 纯对话模式入手,逐步引入 Agent 工具调用和子 Agent 进程管理,完整解析系统中断(Cancel)的实现原理。
1. LLM 模式中断:最简单的情况
1.1 LLM 模式的流式调用
在 LLM 模式下,系统只有两个核心组件参与:
- ChatHandler:主循环,负责驱动 LLM 流式输出和推送消息
- LLMClient:对 OpenAI API 的薄封装,
chat_stream() 是一个 async generator
请求处理流程很直白:
前端 → WebSocket → ChatHandler.handle_chat()
↓
llm_iterator = llm_client.chat_stream(messages, session_id)
↓
主循环:每 10ms 一轮
├── llm_next_task = create_task(llm_iterator.__anext__())
├── 等待 llm_next_task 完成 → 拿到 LLMChunk
├── 提交 TTS / 推送文本
└── 按序推送结果到前端
整个过程只有一条 await 链,没有子进程,没有工具调用。
1.2 LLM 模式的中断路径
当用户点击中断按钮:
sequenceDiagram
participant FE as 前端
participant API as POST /api/interrupt
participant SM as SessionManager
participant CH as ChatHandler<br/>主循环
participant TASK as llm_next_task<br/>[asyncio.Task]
participant GEN as LLMClient<br/>.chat_stream()
participant LLM as OpenAI API
FE->>API: {session_id: "abc"}
API->>SM: await cancel("abc")
SM->>SM: sessions["abc"]["cancelled"] = True
API-->>FE: 200 OK
Note over CH: ~10ms 后,主循环轮询到
CH->>SM: await is_cancelled("abc")
SM-->>CH: True
CH->>TASK: llm_next_task.cancel()
Note over TASK: CancelledError 被注入到 Task<br/>当前 await 点
TASK->>GEN: CancelledError
GEN->>LLM: HTTP 连接被关闭
Note over LLM: 服务端检测到断连<br/>停止生成 token
TASK-->>CH: CancelledError
Note over CH: except CancelledError: pass
CH->>FE: bus.push({type: "interrupted"})
CH->>CH: break 退出主循环
整个 LLM 模式的中断就三步:
| 步骤 |
动作 |
效果 |
| 1 |
cancel() 设置布尔标志 |
标记会话为已中断 |
| 2 |
ChatHandler 轮询 is_cancelled → llm_next_task.cancel() |
注入 CancelledError 到 generator 的当前 await 点 |
| 3 |
CancelledError 传播到 async for chunk in stream |
HTTP 连接关闭,OpenAI 停止生成,节省 token |
这里有两个关键点需要理解:
CancelledError 从哪来? 不是 SessionManager 产生的。是 ChatHandler 调用 llm_next_task.cancel() 产生的。asyncio.Task.cancel() 的原理是:向 Task 当前挂起的 await 点注入一个 CancelledError,就像在那个 await 位置凭空 raise CancelledError() 一样。
为什么能省 token? LLMClient 的 chat_stream() 内部是 async for chunk in stream,CancelledError 会打断这个 await,底层的 HTTP 连接随之关闭。OpenAI 服务端检测到客户端断连后会停止生成 token,不再计费。
1.3 LLM 模式中断的协程层次
websocket_chat() [协程 - FastAPI ASGI 框架驱动]
├── bus.run(websocket) [Task A - 独立消费队列推送 WebSocket]
└── ChatHandler.handle_chat() [协程 - websocket_chat 内 await]
└── llm_next_task [Task B - 包装 generator.__anext__()]
└── LLMClient.chat_stream() [async generator]
└── async for chunk in stream (← CancelledError 注入点)
└── HTTP 连接 → OpenAI API
中断时 CancelledError 的传播:llm_next_task.cancel() → async for chunk in stream → HTTP 断开 → Task 状态变为 CANCELLED → ChatHandler 捕获 except CancelledError: pass → 推送 interrupted → break。
非常简洁。但当引入工具调用后,事情变得复杂。
2. Agent 模式中断:引入工具调用
2.1 Agent 模式与 LLM 模式的核心区别
Agent 模式使用 AgentClient 替代 LLMClient。两者都暴露 chat_stream() 接口,ChatHandler 不关心内部差异。但 AgentClient.chat_stream() 内部多了:
- 规划轮(第 0 轮):纯文本,判断是否需要工具
- Agent 循环(第 1 轮起):LLM 调用 → 工具执行 → 结果返回 → 再次 LLM 调用 → ...
- 审批等待:工具执行前需要用户确认
- 子 Agent:
run_sub_agents 工具会启动子进程
ChatHandler 的主循环代码完全不变——它只是从 llm_iterator 拿 LLMChunk。复杂度全部在 AgentClient.chat_stream() 这个 generator 内部。
2.2 AgentClient 内部的中断检查点
AgentClient.chat_stream() 在 5 个位置检查 is_cancelled:
# 检查点 1: 规划轮开始前(:338)
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 2: 规划轮流式接收中(:221,每个 chunk)
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 3: Agent 循环入口(:354,每轮开始)
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 4: Agent 循环流式接收中(:402,每个 chunk)
if session_id and await session_manager.is_cancelled(session_id):
return
# 检查点 5: 工具执行前(:494,每个工具调用前)
if session_id and await session_manager.is_cancelled(session_id):
return
这些是协作式检查:generator 主动查询标志然后 return,让 generator 优雅关闭。这与 CancelledError 的被动注入形成互补——如果 generator 正好卡在一个长时间 await(比如等待工具执行),CancelledError 能立即打断它。
2.3 Agent 模式中断的完整时序
sequenceDiagram
participant FE as 前端
participant API as POST /api/interrupt
participant SM as SessionManager
participant AM as ApprovalManager
participant CH as ChatHandler
participant TASK as llm_next_task
participant GEN as AgentClient<br/>.chat_stream()
participant MCP as MCP 工具
FE->>API: {session_id: "abc"}
API->>SM: await cancel("abc")
rect rgb(255, 235, 235)
Note over SM: 设标志 + 回调
SM->>SM: cancelled = True
SM->>AM: cancel_session("abc")
Note over AM: 唤醒审批等待 event.set()
end
API-->>FE: 200 OK
rect rgb(235, 235, 255)
Note over CH: ~10ms 后轮询到
CH->>SM: is_cancelled → True
CH->>TASK: llm_next_task.cancel()
end
alt 场景 A: generator 在等 LLM 流式
TASK->>GEN: CancelledError
GEN->>GEN: async for chunk in stream 被打断
Note over GEN: HTTP 关闭,省 token
else 场景 B: generator 在等 MCP 工具
TASK->>GEN: CancelledError
GEN->>MCP: await call_tool 被打断
Note over MCP: 客户端不再等待结果<br/>MCP 服务端工具继续执行到完成
else 场景 C: generator 在等审批
Note over AM: 回调已设 approved=False
Note over GEN: generator 检查到 is_cancelled → return
end
TASK-->>CH: CancelledError / StopAsyncIteration
CH->>FE: bus.push({type: "interrupted"})关键理解:与 LLM 模式相比,Agent 模式的 generator 可能卡在三种不同的 await 上。CancelledError 注入的位置不同,效果也不同:
| generator 卡在 |
CancelledError 效果 |
async for chunk in stream |
HTTP 关闭,OpenAI 停止生成 |
await mcp_client.call_tool() |
客户端停止等待,MCP 服务端工具继续执行到完成 |
await session_manager.is_cancelled() |
generator 自己 return(协作式退出) |
3. 子 Agent 中断:引入子进程和回调链
3.1 系统启动:回调链注册
在理解子 Agent 取消之前,先看清系统启动时谁把谁连在了一起。
sequenceDiagram
participant Init as managers/__init__.py
participant SM as SessionManager
participant AM as ApprovalManager
participant LS as websocket_server<br/>lifespan()
participant AC as AgentClient
participant REG as sub_agent/__init__.py<br/>register_sub_agent_tool()
participant PM as ProcessManager
Note over Init: 包导入时执行(全局单例)
Init->>SM: SessionManager()
Init->>AM: ApprovalManager()
Init->>SM: on_cancel(approval_manager.cancel_session)
Note over SM: _cancel_callbacks = [AM.cancel_session]
Note over LS: 服务启动时执行
LS->>AC: AgentClient(llm_config, agent_config)
LS->>AC: await connect_mcp_servers()
LS->>REG: register_sub_agent_tool(agent_client, session_manager)
REG->>PM: ProcessManager(session_manager)
REG->>SM: on_cancel(pm.cancel_group)
Note over SM: _cancel_callbacks = [AM.cancel_session, PM.cancel_group]
REG->>AC: register_internal_tool("run_sub_agents", handler)启动完成后,session_manager._cancel_callbacks 中有两个回调:
| 序号 |
回调 |
注册位置 |
作用 |
| 1 |
approval_manager.cancel_session |
managers/__init__.py:13 |
唤醒所有卡在审批等待的协程 |
| 2 |
pm.cancel_group |
sub_agent/__init__.py:126 |
向所有子 Agent 子进程发 SIGTERM |
注意:纯 LLM 模式中回调 2 不存在(因为未注册 register_sub_agent_tool)。回调 1 在 Agent 模式下才有意义(LLM 模式没有审批流程)。
3.2 正常请求:协程与子进程启动链
以一次涉及子 Agent 的完整请求为例,标注每一层的协程/Task/进程关系:
sequenceDiagram
participant FE as 前端
participant WS as websocket_chat()<br/>[协程 - ASGI 驱动]
participant BUS as SessionMessageBus<br/>[Task A: bus.run]
participant CH as ChatHandler.handle_chat()<br/>[协程 - WS 内 await]
participant GEN as AgentClient.chat_stream()<br/>[async generator]
participant LLM as OpenAI API
participant PM as ProcessManager.spawn_group()<br/>[协程 - generator 内 await]
participant MON as _monitor()<br/>[Task C: per sub-process]
participant SUB as python -m sub_agent.runner<br/>[子进程, PID=xxx]
participant NODE as node playwright-mcp<br/>[孙进程, PID=yyy]
FE->>WS: WebSocket 消息 {session_id, messages}
Note over WS: session_manager.register(session_id)
WS->>BUS: asyncio.create_task(bus.run(websocket))
Note over BUS: Task A:从 bus 队列读取消息,推送到 WebSocket
WS->>CH: await handle_text_chat(session_id, messages)
CH->>GEN: llm_iterator = agent_client.chat_stream(messages, session_id)
Note over CH: 主循环开始
loop ChatHandler 主循环 (每 10ms 一轮)
CH->>CH: llm_next_task [Task B] = create_task(llm_iterator.__anext__())
Note over CH: 检查 is_cancelled → False,继续
Note over GEN: generator 被 __anext__() 驱动执行
GEN->>LLM: await stream = client.chat.completions.create(stream=True)
LLM-->>GEN: chunk (tool_call: run_sub_agents)
GEN-->>CH: yield LLMChunk(tool_call: start)
GEN-->>CH: yield LLMChunk(tool_call: approval_required)
Note over CH: 等待用户审批...
FE->>CH: POST /api/tool-approve {approved: true}
GEN-->>CH: yield LLMChunk(tool_call: executing)
Note over GEN: 审批通过,执行工具
GEN->>PM: await _execute_tool("run_sub_agents", args)
PM->>SUB: asyncio.create_subprocess_exec("python", "-m", "sub_agent.runner", spec)
PM->>MON: asyncio.create_task(_monitor(proc))
Note over MON: Task C:逐行读取 sub 进程 stderr
Note over SUB: 子进程启动
SUB->>NODE: FastMCP StdioTransport 启动 node playwright-mcp
NODE->>NODE: 启动 Chrome
loop 子进程 agent_loop
SUB->>SUB: 调用 LLM → 执行工具 → 重复
end
SUB-->>PM: stdout: 结果 JSON
MON-->>PM: _monitor 完成
PM-->>GEN: spawn_group 返回聚合结果
GEN-->>CH: yield LLMChunk(tool_call: result)
end3.3 协程/Task/进程层次总结
websocket_chat() [协程 - FastAPI ASGI 框架驱动,非我们创建的 Task]
│
├── bus.run(websocket) [Task A - asyncio.create_task, 独立消费队列推送 WebSocket]
│
├── ChatHandler.handle_chat() [协程 - websocket_chat 内 await,共享同一调用栈]
│ │
│ ├── llm_next_task [Task B - asyncio.create_task, 包装 generator.__anext__()]
│ │ └── AgentClient.chat_stream() [async generator - 被 Task B 驱动]
│ │ ├── await stream (LLM 流式)
│ │ └── await _execute_tool() → spawn_group()
│ │ ├── _monitor(proc0) [Task C0 - asyncio.create_task]
│ │ ├── _monitor(proc1) [Task C1 - asyncio.create_task]
│ │ └── await asyncio.wait(monitor_tasks)
│ │
│ └── approval_wait_task [Task D - asyncio.create_task, 等待审批 Event]
│
└── 子进程 (OS 级别,不在 asyncio 事件循环内)
└── python -m sub_agent.runner [子进程]
└── node @playwright/mcp [孙进程, start_new_session=True]
└── Chrome [曾孙进程]
Task 与协程的区别:Task(A/B/C/D)是用 asyncio.create_task() 创建的独立调度单元,有自己的调用栈,可以被单独 .cancel()。协程(websocket_chat、handle_chat)是通过 await 串在一起的,共享调用栈,不能单独取消。
关键理解:ChatHandler → llm_next_task → generator → _execute_tool → spawn_group → asyncio.wait 是一条连续的 await 链。ChatHandler 的主循环在等 llm_next_task 完成,而 llm_next_task 内部在等 spawn_group,spawn_group 在等子进程。整条链在同一个 asyncio 事件循环中,cancel 一个 Task 会沿链传播 CancelledError。
4. 子 Agent 取消:完整传递链
4.1 全局时序图
sequenceDiagram
participant FE as 前端
participant API as POST /api/interrupt
participant SM as SessionManager
participant AM as ApprovalManager
participant PM_CB as ProcessManager<br/>.cancel_group()
participant CH as ChatHandler<br/>主循环
participant TASK as llm_next_task<br/>[asyncio.Task]
participant GEN as AgentClient<br/>.chat_stream()
participant SG as ProcessManager<br/>.spawn_group()
participant SUB as 子进程<br/>agent_loop
FE->>API: {session_id: "abc"}
API->>SM: await cancel("abc")
rect rgb(255, 235, 235)
Note over SM: 第一步:设标志
SM->>SM: sessions["abc"]["cancelled"] = True
end
rect rgb(235, 255, 235)
Note over SM: 第二步:调用回调链(同步顺序执行)(回调链只是实现优雅退出,不做兜底)
SM->>AM: await cancel_session("abc")
Note over AM: 所有 pending 审批标记为 rejected<br/>event.set() 唤醒,优雅拒绝审批
SM->>PM_CB: await cancel_group("abc")
Note over PM_CB: 遍历该 session 的所有子进程<br/>proc.terminate() 发 SIGTERM
PM_CB->>SUB: SIGTERM
Note over SUB: _sigterm_handler:<br/>_cancelled = True
end
API-->>FE: 200 OK
rect rgb(235, 235, 255)
Note over CH: 第三步:ChatHandler 轮询检测(~10ms 后)
CH->>SM: await is_cancelled("abc")
SM-->>CH: True
CH->>TASK: llm_next_task.cancel()
Note over TASK: CancelledError 被注入到 Task
end
rect rgb(255, 255, 220)
Note over TASK: 第四步:CancelledError 沿 await 链传播
TASK->>GEN: CancelledError
GEN->>SG: CancelledError(如果正在 await spawn_group)
Note over SG: except asyncio.CancelledError:
SG->>SG: cancel monitor tasks
SG->>SUB: _terminate_and_kill()<br/>SIGTERM → 等10s → SIGKILL
SG->>TASK: raise CancelledError
end
TASK-->>CH: CancelledError
Note over CH: except CancelledError: pass
rect rgb(240, 240, 240)
Note over CH: 第五步:ChatHandler 收尾
CH->>FE: bus.push({type: "interrupted"})
CH->>CH: break 退出主循环
Note over CH: finally: 清理 approval_wait_task
end
rect rgb(240, 255, 240)
Note over SUB: 第六步:子进程优雅退出
Note over SUB: agent_loop 检查 _cancelled → True<br/>return "cancelled"
SUB->>SUB: runner.py: await client.close()
Note over SUB: keep_alive=False → disconnect()<br/>→ killpg() → Chrome 关闭
SUB->>SUB: 进程退出
end4.2 两条路径的关系
取消信号通过两条独立路径到达子进程,形成双保险:
flowchart TB
CANCEL["session_manager.cancel(session_id)"]
FLAG["sessions[sid]['cancelled'] = True"]
CB1["回调1: approval_manager.cancel_session()"]
CB2["回调2: pm.cancel_group()"]
CANCEL --> FLAG
CANCEL --> CB1
CANCEL --> CB2
CB1 --> WAKE["唤醒所有 pending 审批<br/>event.set(), approved=False"]
CB2 --> SIGTERM_A["路径 A: proc.terminate()<br/>即时 SIGTERM"]
FLAG --> POLL["ChatHandler 轮询<br/>is_cancelled() → True"]
POLL --> CANCEL_TASK["llm_next_task.cancel()"]
CANCEL_TASK --> CE["CancelledError 传播"]
CE --> SG_CATCH["spawn_group except CancelledError"]
SG_CATCH --> SIGTERM_B["路径 B: _terminate_and_kill()<br/>SIGTERM → 10s → SIGKILL"]
SIGTERM_A --> SUB_FLAG["子进程: _cancelled = True"]
SIGTERM_B --> SUB_FLAG
SUB_FLAG --> EXIT["子进程优雅退出<br/>close() Chrome"]
style SIGTERM_A fill:#fdd,stroke:#c00
style SIGTERM_B fill:#fdd,stroke:#c00
style CB1 fill:#dfd,stroke:#0a0
style FLAG fill:#ddf,stroke:#00c
|
路径 A(回调直接 SIGTERM) |
路径 B(CancelledError 传播) |
| 触发时机 |
cancel() 内立即执行 |
ChatHandler ~10ms 后轮询到 |
| 谁发 SIGTERM |
cancel_group() |
_terminate_and_kill() |
| SIGKILL 兜底 |
无 |
有(10 秒后) |
| 能打断 LLM 流式 |
不能(只管子进程) |
能(cancel Task 断开 HTTP 连接) |
| 能打断审批等待 |
不能(由回调1 负责) |
不需要(回调1 已处理) |
路径 A 更快,路径 B 更彻底。两条都会发 SIGTERM,proc.terminate() 幂等,重复无害。
5. CancelledError 的产生与传播
5.1 CancelledError 从哪来?
无论是 LLM 模式还是 Agent 模式,CancelledError 的产生者都是同一个——ChatHandler:
# chat_handler.py:226
llm_next_task.cancel() # ← 这一行产生 CancelledError
asyncio.Task.cancel() 的原理:向 Task 当前挂起的 await 点注入一个 CancelledError 异常。就像在 Task 内部那个 await 的位置凭空 raise CancelledError() 一样。
5.2 CancelledError 注入到哪?
取决于 llm_next_task 此刻卡在哪个 await 上。
LLM 模式下,llm_next_task 包装的是 LLMClient.chat_stream().__anext__(),generator 内部只有一个 await:
async for chunk in stream ← CancelledError 注入点(唯一)
Agent 模式下,llm_next_task 包装的是 AgentClient.chat_stream().__anext__(),generator 内部可能卡在多个 await:
flowchart TD
TASK["llm_next_task.cancel()"]
CE["CancelledError 被注入"]
TASK --> CE
CE --> WHERE{"generator 此刻卡在哪个 await?"}
WHERE -->|"await stream.__anext__()"| A["场景 A: LLM 流式接收"]
WHERE -->|"await _execute_tool()"| B["场景 B: MCP 工具执行"]
WHERE -->|"await _execute_tool()<br/>→ spawn_group()"| C["场景 C: 子 Agent 执行"]
A --> A1["HTTP 连接被关闭<br/>OpenAI 停止生成 → 省 token"]
A --> A2["CancelledError 从 generator 向上传播<br/>到 llm_next_task"]
B --> B1["客户端停止等待结果<br/>MCP 服务端工具继续执行到完成"]
B --> B2["CancelledError 从 generator 向上传播"]
C --> C1["spawn_group() 捕获 CancelledError"]
C1 --> C2["cancel 所有 monitor tasks"]
C1 --> C3["_terminate_and_kill(processes)<br/>SIGTERM → 10s → SIGKILL"]
C1 --> C4["raise CancelledError(继续向上传播)"]
C4 --> C5["CancelledError 从 generator 向上传播"]
A2 --> CATCH
B2 --> CATCH
C5 --> CATCH
CATCH["ChatHandler 捕获<br/>except CancelledError: pass"]
style A1 fill:#dfd
style C3 fill:#fdd5.3 传播链详解(场景 C:子 Agent 执行中)
这是最复杂的场景。关键点:CancelledError 不是直接打到 readline() 上的。monitor_tasks 是独立的 Task(Task C),不在 llm_next_task(Task B)的调用栈里。CancelledError 只能沿 Task B 自己的 await 链传播,打中 asyncio.wait(),然后由 spawn_group 手动发起第二波 cancel 给 monitor tasks。
sequenceDiagram
participant CH as ChatHandler
participant TB as Task B<br/>llm_next_task
participant GEN as chat_stream()<br/>[generator]
participant SG as spawn_group()
participant AW as asyncio.wait()
participant TC as Task C0 / C1<br/>_monitor()
participant PROC as 子进程
Note over TC: await proc.stderr.readline()
Note over AW: 等待 monitor_tasks 完成
rect rgb(255, 235, 235)
Note over CH,AW: 第一波:CancelledError 沿 Task B 的 await 链注入
CH->>TB: llm_next_task.cancel()
TB->>GEN: CancelledError 注入到<br/>await _execute_tool() (:547)
GEN->>SG: 穿过 _execute_tool()<br/>穿过 run_sub_agents_handler()
SG->>AW: CancelledError 到达<br/>await asyncio.wait() (:146)
AW-->>SG: raise CancelledError
end
Note over TC: 此时 monitor tasks 还活着!<br/>readline() 还在等数据
rect rgb(235, 235, 255)
Note over SG,PROC: 第二波:spawn_group 手动清理(except 块内)
Note over SG: except asyncio.CancelledError (:185)
SG->>TC: for t in monitor_tasks: t.cancel()
Note over TC: CancelledError 注入到<br/>await readline()
TC-->>SG: await gather 消费异常
SG->>PROC: _terminate_and_kill()<br/>SIGTERM → 10s → SIGKILL
end
rect rgb(255, 255, 220)
Note over SG,CH: 第三阶段:CancelledError 继续向上传播(raise)
SG-->>GEN: raise CancelledError
Note over GEN: 不捕获,generator 被终止
GEN-->>TB: CancelledError
Note over TB: Task 状态变为 CANCELLED
TB-->>CH: CancelledError
Note over CH: except CancelledError: pass<br/>最终在这里被消化 (:226-229)
end为什么不是直接打在 readline() 上? 因为 asyncio.Task.cancel() 只能注入到该 Task 自己当前挂起的 await 点。Task B 的 await 链是:
Task B (llm_next_task)
→ __anext__()
→ chat_stream() generator
→ _execute_tool()
→ spawn_group()
→ asyncio.wait() ← Task B 的最深 await 点,CancelledError 打这里
而 Task C(monitor)有自己的独立调用栈:
Task C0 (_monitor)
→ readline() ← Task C0 的 await 点,Task B 的 cancel 打不到这里
spawn_group 的 except CancelledError 块就是中转站——接住 Task B 的取消,翻译成对 Task C 的取消,再 raise 继续往上抛。
5.4 场景 A:LLM 流式接收中(LLM 模式和 Agent 模式通用)
llm_next_task.cancel()
│
▼
LLMClient/AgentClient.chat_stream()
│ async for chunk in stream: # ← CancelledError 注入点
│ # HTTP 连接被关闭
│ # OpenAI 服务端检测到断连,停止生成
│
│ CancelledError 向上传播
▼
ChatHandler: except CancelledError: pass
效果:OpenAI 停止生成 token,节省费用。这在 LLM 模式和 Agent 模式中效果完全一致。
5.5 如果此时没有子 Agent 在运行?
那 spawn_group 不在调用栈中,CancelledError 只需要穿过 async for chunk in stream 或 await mcp_client.call_tool(),传播链更短,行为更简单。
6. 子进程的取消响应
子进程不参与 asyncio 事件循环,也不会收到 CancelledError。它通过 OS 信号(SIGTERM) 接收取消通知。
6.1 信号处理
# sub_agent/agent_loop.py:45-58
_cancelled = False
def _sigterm_handler(signum, frame):
global _cancelled
_cancelled = True # 只设标志,不立即退出
signal.signal(signal.SIGTERM, _sigterm_handler)
6.2 检查点
# agent_loop.py
for step in range(max_steps):
if _cancelled: # 检查点 1:轮次入口
return _result("cancelled")
# ... LLM 调用 ...
# ... 工具执行(期间不检查)...
if _cancelled: # 检查点 2:工具执行后
return _result("cancelled")
关键设计:工具执行期间不中断。MCP 工具(如 browser_navigate)可能有副作用,中途打断可能导致状态不一致。等当前工具跑完再退出。
6.3 退出清理
# runner.py:201-208
for client in mcp_clients:
await client.close() # keep_alive=False → 触发完整清理
# → disconnect() → killpg() → Chrome 关闭
print(f"{label} 完成")
# asyncio.run(main()) 返回,子进程退出
7. 流程图总结
7.1 从简单到复杂:三种中断模式对比
LLM 模式(最简单)
──────────────────
cancel() → 标志 → ChatHandler 轮询 → llm_next_task.cancel()
→ CancelledError → HTTP 关闭
只涉及: asyncio 事件循环内的一条 await 链
Agent 模式(无子 Agent)
──────────────────────
cancel() → 标志 + 回调1(审批)
→ ChatHandler 轮询 → llm_next_task.cancel()
→ CancelledError → HTTP 关闭 / MCP 打断
多了: 审批回调 + 工具执行打断
Agent 模式(有子 Agent)
──────────────────────
cancel() → 标志 + 回调1(审批) + 回调2(SIGTERM) ← 路径 A
→ ChatHandler 轮询 → llm_next_task.cancel()
→ CancelledError → spawn_group 捕获
→ _terminate_and_kill (SIGTERM+SIGKILL) ← 路径 B
→ 子进程 _cancelled 标志 → 优雅退出
多了: 跨进程 SIGTERM + 双路径保险
7.2 取消信号传播全景
flowchart LR
subgraph 前端
BTN["中断按钮"]
end
subgraph "主进程 (asyncio 事件循环)"
API["POST /api/interrupt"]
SM["SessionManager.cancel()"]
FLAG["cancelled = True"]
CB1["回调1:<br/>ApprovalManager<br/>.cancel_session()"]
CB2["回调2:<br/>ProcessManager<br/>.cancel_group()"]
CH["ChatHandler<br/>is_cancelled 轮询"]
CANCEL_TASK["llm_next_task<br/>.cancel()"]
CE["CancelledError<br/>沿 await 链传播"]
SG["spawn_group()<br/>except CancelledError"]
TK["_terminate_and_kill()"]
end
subgraph "子进程 (独立 Python 进程)"
SH["SIGTERM handler<br/>_cancelled = True"]
AL["agent_loop<br/>检查 _cancelled"]
CLOSE["client.close()<br/>keep_alive=False"]
end
subgraph "孙进程"
NODE["node playwright-mcp"]
CHROME["Chrome"]
end
BTN -->|POST| API
API --> SM
SM --> FLAG
SM --> CB1
SM --> CB2
CB1 -->|"event.set()"| WAKE["唤醒审批等待"]
CB2 -->|"SIGTERM (路径A)"| SH
FLAG -.->|"轮询"| CH
CH --> CANCEL_TASK
CANCEL_TASK --> CE
CE --> SG
SG --> TK
TK -->|"SIGTERM+SIGKILL (路径B)"| SH
SH --> AL
AL -->|"return cancelled"| CLOSE
CLOSE -->|"killpg()"| NODE
NODE -->|"进程组终止"| CHROME
style FLAG fill:#ddf
style CB1 fill:#dfd
style CB2 fill:#fdd
style CE fill:#ffd
style SH fill:#fdd7.3 各层的取消机制对比
| 层级 |
取消机制 |
取消粒度 |
延迟 |
| SessionManager |
布尔标志 cancelled=True |
即时设置 |
0ms |
| ApprovalManager |
event.set() 唤醒 |
即时唤醒 |
0ms |
| ProcessManager (路径A) |
proc.terminate() SIGTERM |
fire-and-forget |
0ms |
| ChatHandler |
轮询 is_cancelled |
每 10ms 一次 |
0~10ms |
| LLMClient/AgentClient |
llm_next_task.cancel() → CancelledError |
打断当前 await |
0ms |
| ProcessManager (路径B) |
_terminate_and_kill() SIGTERM+SIGKILL |
等待 10s 后强杀 |
10s |
| 子进程 agent_loop |
_cancelled 标志 |
当前工具执行完后 |
0~30s |
| Chrome |
os.killpg() 进程组终止 |
即时 |
0ms |
7.4 为什么不用 CancelledError 一路到底?
因为 CancelledError 只在 asyncio 事件循环内有效,跨不了进程边界。子进程有自己的事件循环,主进程的 CancelledError 传不进去。所以:
- 主进程内部:CancelledError(asyncio 原生机制)
- 跨进程:SIGTERM(OS 原生机制)
- 子进程内部:布尔标志轮询(最简单可靠)
评论区