Agent 工具调用状态机
本文档描述 Agent 模式下,从 LLM 流式输出到工具执行再到前端展示的完整状态流转。
1. 整体 Agent 循环
Agent 采用「规划轮 + 意图识别 + 多轮工具循环」架构。
第 0 轮(规划轮):不传 tools,强制 LLM 先用语言回应用户。planning_prompt 作为临时 system 消息插入末尾,指导 LLM 判断是否需要工具。LLM 必须在回复末尾输出意图标签二选一:
<no_tool>(不需要工具,闲聊/问答):后端检测到后进入续写轮。<need_tool>(需要工具):后端检测到后进入工具循环。
无论哪个标签,规划轮结束后都会追加 assistant 回复 + [continue] 过渡消息,并清理 full_messages 中所有 assistant 消息的意图标签,避免后续轮次的 LLM 模仿标签模式。
意图标签保留在 raw_text 中(与 <voice> 标签同理),由 stream_parser.strip_tags() 统一从 text 中剥离。
续写轮(<no_tool> 路径):不传 tools,插入 no_tool_continuation_prompt 临时提示词,让 LLM 完整回答用户问题。这保证 <no_tool> 和 <need_tool> 路径统一为两轮结构,避免模型因历史中的单轮 <no_tool> 回复学到"说一半就停"的模式。
第 1 轮起(<need_tool> 路径):正常 Agent 循环,每轮向 LLM 发起流式请求(带 tools),LLM 可能返回文本或工具调用;如果有工具调用则先等待用户审批(ACK),审批通过后执行工具、将结果追加到上下文后进入下一轮,直到 LLM 纯文本输出(无工具调用)为止。例外:注册时声明 skip_approval=True 的工具(如 ask_user_question / show_card)直接跳过审批阶段进入执行——前者的表单本身就是用户交互界面,后者是静默展示不改变状态,走审批等于让用户多点一次。
2. 规划轮意图标签检测
意图标签在规划轮输出末尾,流结束后在完整文本中检测。无需状态机,逻辑简洁。
标签采用字典驱动:intent_tags = {"<no_tool>": True, "<need_tool>": False},key 为标签字符串,value 为 skip_agent_loop 的值。
检测流程
- 流式阶段:正常分句输出,不做标签检测
- 流结束后:拼接
accumulated_text + buffer为完整文本,在其中查找意图标签 - 结果:匹配到
<no_tool>→skip_agent_loop=True(进入续写轮);匹配到<need_tool>→skip_agent_loop=False(进入工具循环)
标签处理方式
意图标签与 <voice> 标签采用统一处理模式(在 stream_parser 中):
raw_text:保留标签原文(进入对话历史,帮助 LLM 学习标签用法)text:由strip_tags()统一剥离所有控制标签(<voice>、<no_tool>、<need_tool>)voice标志:has_voice_tag()会跳过意图标签后检测<voice>,正确处理<voice>...<need_tool>的情况
上下文清理
规划轮结束后(无论 <no_tool> 还是 <need_tool>),只要有输出,都会追加 assistant 回复 + [continue] 过渡消息,并遍历 full_messages 清理所有 assistant 消息中的意图标签。这防止后续轮次的 LLM 看到标签模式后模仿输出。
3. 单轮流式输出中的状态机
在每一轮 LLM 流式输出中,对工具调用事件维护一个 4 状态的有限状态机(正则文法),在不同时机 yield 不同的 LLMChunk 通知下游。
状态定义
| 状态 | 触发时机 | ToolCallInfo 字段 |
含义 |
|---|---|---|---|
| start | 流式 delta 中首次收到 function.name |
name 有值, arguments=None, result=None |
LLM 决定调用工具,名称已知 |
| approval_required | 流结束,arguments 拼接完整,等待用户审批 |
name 有值, arguments 有值, approval_required=True |
参数就绪,等待用户批准/拒绝。注册时 skip_approval=True 的工具跳过此状态(见下方状态图) |
| executing | 用户批准(或 skip_approval 工具直接进入),工具开始执行 | name 有值, arguments 有值, result=None |
工具正在执行。ChatHandler 在此状态入口清空 bus.tool_output_cache = None,为内部工具的部分结果缓存做好准备 |
| result | _execute_tool() 返回 |
name 有值, arguments 有值, result 有值 |
工具执行完毕 |
状态转换图
4. 审批(ACK)机制
架构
审批通过 HTTP 接口 + asyncio.Event 实现异步阻塞/唤醒,与 WebSocket 通道解耦。ChatHandler 通过 SessionMessageBus 推送消息,不直接操作 WebSocket。
AgentClient yield approval_required
→ ChatHandler: approval_manager.register() → bus.push(前端) → create_task(approval_manager.wait())
→ 前端显示审批 UI(或 auto-approve 命中时立即 POST)
→ POST /api/tool-approve {session_id, task_id, approved}
→ ApprovalManager.respond() → Event.set()
→ task.done() → ChatHandler 继续或中断
关键组件
| 组件 | 位置 | 职责 |
|---|---|---|
| ApprovalManager | managers/approval_manager.py |
管理 pending Event,提供 register/wait/respond/cancel_session |
| SessionManager | managers/session_manager.py |
会话生命周期,cancel 时联动唤醒 ApprovalManager |
| POST /api/tool-approve | websocket_server.py |
HTTP 审批接口,reject 时同时触发 session cancel |
| 前端 auto-approve 列表 | frontend/src/js/ (localStorage) |
工具白名单,命中时自动 POST approve |
skip_approval 免审通道
AgentClient.register_internal_tool(skip_approval=True) 会把工具名加入 self.no_approval_tools 集合。yield approval_required 的位置(agent_client.py:562 附近)前查询此集合,命中则直接进入 executing 状态,不走 approval_manager.register/wait 流程。目前启用的两个工具:
| 工具 | 免审理由 |
|---|---|
ask_user_question |
表单本身就是用户交互界面;走审批等于让用户点两次(先 approve 再填表) |
show_card |
静默推送视觉卡片,不改变后端/外部状态,也不消耗用户交互;立即 return,不阻塞 LLM |
扩展新的免审工具时,优先检查:该工具是否"交互本身即用户确认"或"静默展示不改状态"。不满足任一条件的工具(执行 shell、读写文件、消费 token 预算等)不应跳过审批。
ask_user_question 在 executing 状态额外触发一条 form_request 副消息(携带 form_id + questions)给前端渲染表单 UI。两条消息必须在 bus FIFO 上严格相邻,实现方式是在工具注册时通过同步 on_post_executing 钩子返回副消息 dict;ChatHandler 在 chunk 到达时同步调用钩子(保证 fm.prepare 早于 agent 恢复),把返回 dict 暂存,紧跟 tool_call[executing] 推送之后入 bus。对具体工具名保持无知。详见 MESSAGE_BUS_DESIGN.md §4.2。
竞态保护
- 先 register 再推送:
approval_manager.register()在 WebSocket push 之前调用,保证 Event 在前端回复前已存在。register()是同步方法,紧跟 chunk 消费、与 cancel 点之间零 await 切换,保证"register 未完成却被 cancel_session 扫过"的不变式 - cancel 联动:
session_manager.cancel()通过on_cancel回调调用approval_manager.cancel_session(),唤醒所有 pending Event。实现上同时pop出 dict 条目(不仅event.set()),避免"register 完成但 wait_task 尚未创建"窗口内被 cancel 时 entry 永久泄漏。与wait()finally 的pop(key, None)幂等兼容:wait 正在运行时靠局部引用响应 event;wait 尚未开始时get返回 None 立即 return False - 超时保护:
wait()使用asyncio.wait_for(timeout=300),超时视为拒绝 - 非阻塞等待:
ChatHandler使用asyncio.create_task()启动等待任务,主循环继续收集和推送 TTS 结果,避免阻塞音频推送 - finally 清理:
handle_chat的finally块中取消未完成的 approval_wait_task 并强制 respond(False) 清理残留 Event - cache 快照同步原子:构造
tool_rejected.partial_result的_build_tool_rejected_payload是同步 staticmethod(非 async),保证bus.tool_output_cache的读取与_id剥离在同一事件循环帧内完成,不会被内部工具监控协程(ProcessManager._monitor/run_command读流协程)的写入穿插 - active_tool_info 协程私有:该变量仅在
handle_chat主协程内读写,不暴露给 HTTP 审批接口或 cancel 回调等其他协程,避免跨协程读写需要的锁
拒绝/中断后的上下文处理
后端通过 active_tool_info 追踪当前活跃的工具调用(approval_required 和 executing 阶段设值,result 阶段清空)。无论是用户拒绝审批还是中断正在执行的工具,只要 active_tool_info 有值,后端都会先推送 tool_rejected 消息(含工具名、参数、原因),再推送 interrupted。
对于执行阶段的中断,ChatHandler 还会读取 bus.tool_output_cache(内部工具实时写入的部分结果缓存),将其作为 partial_result 附加到 tool_rejected 消息中,让前端和对话历史保留工具的部分执行结果:
| 中断时机 | tool_rejected.reason |
partial_result |
|---|---|---|
审批阶段(approval_pending=True) |
"用户拒绝了此工具调用" |
无(审批阶段工具未执行) |
执行阶段(approval_pending=False) |
"工具调用被中断" |
从 bus.tool_output_cache 读取(剥离 _id 字段后附加) |
| 无活跃工具 | 不推送 tool_rejected,直接 interrupted |
— |
前端消息过滤器对 tool_rejected 和 interrupted 放行(即使会话已被标记为 cancelledSessionId),确保中断按钮场景下这两条收尾消息不被丢弃。但放行后各处理器仍有 session guard:tool_rejected 检查 ctx.currentSessionId !== data.session_id,若新会话已开始则丢弃;interrupted 的 guard 条件为 ctx.currentSessionId && !ctx.interrupting && ctx.currentSessionId !== data.session_id——额外的 !ctx.interrupting 保证中断等待期间(按钮禁用,不可能有新会话)的 interrupted 事件不被误跳过。
前端收到 tool_rejected 后写入 historySegments(含 partial_result),由后续 interrupted 处理器中的 resetSession() → flushHistorySegments() 统一按序写入 conversationHistory:
// tool_rejected → 写入 historySegments
historySegments.push({ type: 'tool_rejected', name, arguments, reason, partial_result });
// interrupted → resetSession() → flushHistorySegments() 统一处理,写入 conversationHistory:
// assistant 消息(记录 LLM 的意图)
{ role: "assistant", content: textBuffer || null, tool_calls: [{ id, type: "function", function: { name, arguments } }] }
// tool 结果消息(记录拒绝/中断原因)
{ role: "tool", tool_call_id: id, content: "用户拒绝了此工具调用" }
// 或有部分结果时(拼接部分结果以及中断原因):
{ role: "tool", tool_call_id: id, content: "{partial_result JSON}...\n工具调用被中断" }
这确保下一轮对话时 LLM 能看到拒绝/中断记录,避免重复尝试相同的工具调用。
前端系统消息:三种中断场景各显示不同提示,让用户了解发生了什么:
| 触发场景 | 系统消息 | 显示位置 |
|---|---|---|
| 用户点击「中断」按钮 | 已中断对话 |
interrupted 处理器(userInitiatedInterrupt 时) |
| 用户点击「拒绝」按钮 | 已中断会话 |
approveTool(false) |
| 审批超时(300s) | 工具审批超时,已自动拒绝调用并中断会话 |
interrupted 处理器(!userInitiatedInterrupt 时) |
用户主动操作(中断/拒绝)时前端先设 cancelledSessionId,后续 interrupted 经过滤器放行后 userInitiatedInterrupt=true,不重复显示。
前端中断按钮两阶段状态:用户点击「中断」后,按钮进入「中断中…」中间状态(灰红色,context.interrupting=true,按钮禁用),阻止用户在中断完成前发送新消息(否则会丢弃缓存的部分结果)。直到后端返回 interrupted 事件触发 resetSession()(其中重置 interrupting=false)后,按钮才恢复为「发送」可用状态。设有 15 秒超时兜底,防止后端异常时前端卡死。若 /api/interrupt 响应 was_active=false(后端 handler 已结束、仅前端残留音频),前端短路收尾不等 interrupted 事件。
5. 数据流全链路
从 agent_client yield 到前端 WebSocket 展示的完整数据流:
6. 前端 UI 状态对应
工具调用事件通过 audioQueue 按序显示,与文本和音频消息共享同一个队列,确保显示顺序与后端推送顺序一致。
工具卡片终止态 (updateToolCallTerminated)
统一处理已拒绝/已失效两种终止态的 UI 展示:
- 有参数(approval_required 阶段及之后):用
<details>可展开查看参数 - 无参数(准备中阶段被中断):不可展开,仅显示文本
三种触发场景:
- 用户拒绝 →
approveTool(false)→updateToolCallTerminated(div, '已拒绝')+ 显示系统消息已中断会话+ 设cancelledSessionId - 用户中断 / 后端推 interrupted →
interruptSession()或interrupted处理器 →updateToolCallTerminated(div, '已失效') - 后端重启后审批丢失 →
sendToolApproval收到success:false→invalidateAllPendingToolCalls()→updateToolCallTerminated(div, '已失效')
前端审批队列行为
- approval_required 状态设置
approvalPendingTaskId,阻塞playNextAudio()继续消费队列 - auto-approve:前端维护
autoApproveTools列表(持久化到 localStorage),命中时立刻 POST approve 并显示执行中样式,用户无感 - 手动审批:显示批准/拒绝按钮 + "自动批准此工具" checkbox
- 批准后:清除
approvalPendingTaskId,恢复队列消费 - 中断清理:
interruptSession()调用updateToolCallTerminated()标记所有 pending 卡片为"已失效",重置approvalPendingTaskId - 拒绝清理:
approveTool(false)设置cancelledSessionId并显示系统消息已中断会话,后续interrupted经过滤器放行后userInitiatedInterrupt=true,不重复显示 - interrupted 清理:后端推送
interrupted时,处理器先做 session guard 检查(新会话已开始则跳过),通过后清理残留卡片并调用resetSession()→flushHistorySegments()。若为非用户主动中断(审批超时),显示工具审批超时,已自动拒绝调用并中断会话 - 后端重启保护:
POST /api/tool-approve返回success:false(审批任务不存在)时,invalidateAllPendingToolCalls()将所有 pending 卡片标记为"已失效"并恢复前端可操作状态
7. 关键设计决策
为什么用 4 状态而不是 3 状态?
原始 3 状态(start → executing → result)中工具自动执行,用户无法控制。新增 approval_required 状态在 executing 前插入审批环节,让用户决定是否允许工具执行。auto-approve 列表保证常用工具不影响交互流畅度。
为什么审批用 HTTP 接口而不是 WebSocket?
- 与中断接口
/api/interrupt风格统一 - cancel 联动更自然:拒绝时直接调
session_manager.cancel(),通过on_cancel回调唤醒 pending Event - 前端实现简单:auto-approve 只需一个
fetch调用 - 后续可扩展:审批记录可独立持久化
为什么 start 不等 arguments 完整?
OpenAI streaming 中 function.name 在一个 delta 中完整到达,但 arguments 是逐 token 拼接的。如果等 arguments 完整才通知前端,那整个 LLM 流式输出阶段前端都是黑屏的,失去了实时反馈的意义。
为什么工具调用要走 audioQueue?
工具调用事件和文本/音频消息共享同一个 audioQueue 队列,通过 playNextAudio() 按序消费。这保证了显示顺序与后端推送顺序严格一致——不会出现工具调用卡片跳到前面语音内容之前的问题。工具调用项在 playNextAudio 中被即时处理(不占播放时间),然后立即继续下一个队列项。审批状态是例外——它会阻塞队列直到用户响应。
为什么需要规划轮?
规划轮同时承担两个职责:
- 体验保障:LLM 在 Agent 模式下可能直接发起工具调用而不先输出文本,导致用户看到工具在执行但不知道为什么。规划轮通过不传
tools参数强制 LLM 先用语言回应用户。 - 意图识别:通过
<no_tool>/<need_tool>双标签机制,LLM 在规划轮末尾判断是否需要工具。双标签比单标签更可靠——LLM 必须明确二选一,而不是靠"有没有输出标签"来判断。标签放在末尾而非开头,让 LLM 先完成自然语言回复再做意图判断,避免开头检测时缓冲延迟影响首句输出速度。
planning_prompt 作为临时 system 消息插入 full_messages 末尾,仅包含意图判断指令,不包含工具描述——这样 LLM 不会在规划轮中"预演"工具调用细节(否则 Round 1 可能误以为工具已调用而跳过真正执行)。规划轮结束后 planning_prompt 被移除,后续轮次看到的是干净的消息历史。
为什么 <no_tool> 需要续写轮而不是直接返回?
早期设计中 <no_tool> 路径只有规划轮一轮就直接返回,而 <need_tool> 路径是规划轮 + 工具循环(两轮+)。这导致对话历史中:
<need_tool>场景:两条 assistant 消息(规划轮简短意图 + 工具循环详细回复)<no_tool>场景:一条 assistant 消息(规划轮简短回复)
模型从历史中学到了"先简短说意图,再详细展开"的两轮模式。但 <no_tool> 只有一轮,模型就会"说一半就停"——它输出了简短意图后就期待第二轮,但第二轮不存在。
解决方案:<no_tool> 也统一为两轮。续写轮(_run_no_tool_continuation)在规划轮之后追加一次纯文本 LLM 调用(不带 tools),插入 no_tool_continuation_prompt 指导模型完整回答。这样两条路径在对话历史中的结构一致,消除了模型行为不对称的问题。
同名工具多次调用如何处理?
通过 id="tool-call-${toolName}" 定位 DOM 元素进行原地更新。当 result 状态完成后立即 removeAttribute('id'),这样下次同名工具调用会创建新的卡片,不会覆盖已完成的卡片。
轮次边界(round_end)与前端历史分段
每次 LLM API 调用的最后一个 chunk 带 round_end=true 标记。ChatHandler 检测到后单独推送 {type: "round_end"} 消息。前端收到后在 historySegments 中插入 {type: 'separator'}。
flushHistorySegments() 的合并逻辑:
- 连续的 text 段累积到
textBuffer - 遇到 separator → 将
textBufferflush 为独立 assistant 消息(分隔不同 LLM 调用轮次的文本) - 遇到 tool 段 → 将
textBuffer合并为同一个 assistant 消息的content,并附带tool_calls - 遇到 hook 段 → 先 flush
textBuffer,再写入{role, content, _hook}消息(静默存入,不显示 UI) - 尾部剩余 text → flush 为独立 assistant 消息
这样前端不需要知道哪一轮是规划轮——规划轮和 agent 循环的文本由 separator 自然分隔,而同一轮内的文本和工具调用自然合并。
输出格式示例:
assistant: {content: "规划轮文字"} // separator 触发 flush
assistant: {content: "好的让我查一下", tool_calls: [{...}]} // tool 段合并
tool: {content: "result"} // 工具结果
assistant: {content: "最终回复"} // 尾部 flush
中断时的行为:round_end 通过 audioQueue 按序消费。中断时 audioQueue 被清空,未消费的 round_end 随之丢弃。这是正确的——未播放/未显示的内容不应进入对话历史,因此也不需要 separator 来分隔它们。
MCP 客户端架构
MCP 相关代码位于 mcp_engine/ 包中,支持 SSE 和 stdio 两种传输方式:
| 文件 | 职责 |
|---|---|
mcp_engine/mcp_config.py |
MCPServerConfig 数据类(SSE: host/port, stdio: command/args, isolation)+ DEFAULT_MCP_SERVERS 列表 |
mcp_engine/mcp_client.py |
MCPClient(connect/call_tool/close)+ ToolDefinition,connect() 按 transport 字段分支 |
mcp_engine/__init__.py |
统一导出 |
AgentClient 通过 from mcp_engine import MCPClient, ToolDefinition 使用。config.py 的 AgentConfig 通过 DEFAULT_MCP_SERVERS 获取默认服务器列表。
图片类工具返回(如 Playwright 截图的 ImageContent)在 MCPClient.call_tool() 中被替换为文本占位符 [图片已截取, mimeType=...],避免 base64 数据撑爆 LLM 上下文。
隔离级别:shared vs per_conversation
MCPServerConfig.isolation 字段决定该 server 是所有请求共享一个实例,还是每个对话(conversation_id)独立 subprocess:
| 值 | 适用场景 | 启动时行为 | 运行期行为 |
|---|---|---|---|
"shared"(默认) |
无状态或全局状态工具(如 lite 的 run_python) | 立即连接,常驻 agent_client.mcp_clients,其工具注册到 shared_tools_map |
LLM 发起工具调用 → 从 shared_tools_map[tool_name] 路由到常驻 MCPClient |
"per_conversation" |
有跨 session 持久状态的工具(如 Playwright 登录态、浏览历史) | 启动一次探测实例扫 schema → 立即关闭 → 探测目录删除。工具 schema 注册到 tools,server_config 引用存入 per_conv_tool_sources |
LLM 发起工具调用 → 从 sandbox.ensure_mcp_client(config) 按 conv_id 懒启动独立 subprocess,首次后缓存 |
调用路由顺序(AgentClient._execute_tool):
internal_tools → shared_tools_map → per_conv_tool_sources (需 sandbox) → 未知工具
sandbox 参数由 ChatHandler 从 conversation_id → sandbox_registry.acquire(conv_id) 获取,透传给 chat_stream 再到 _execute_tool。完整的 sandbox 生命周期、LRU 驱逐、进程清理、与 cancel 回调链的关系详见。
sub_agent 的借用路径
tool_server_configs({tool_name: config_dict, disabled_tools=None})无论 shared 还是 per_conversation 都会填充(包括被主 Agent disabled_tools 屏蔽但服务器上实际存在的工具)。sub_agent 启动时按 allowed_tools 过滤后创建自己的 MCPClient 实例(见 sub_agent/mcp_config.py:_isolate_browser_profile),对 Playwright 注入独立的临时 user-data-dir——sub_agent 的浏览器状态与主 Agent 的 per-conv Sandbox 互不共享,这是刻意设计:sub_agent 短生命周期任务,不应继承用户的登录态。
Skills 技能系统
Skills 相关代码位于 skills_engine/ 包中,提供开放 skill 生态兼容(Codex 模式:注入描述 + Hook 提示词注入)。
| 文件 | 职责 |
|---|---|
skills_engine/skills_config.py |
SkillInfo 数据类 + scan_skills() 扫描 skills/ 目录读取 SKILL.md frontmatter |
skills_engine/skills_context_manager.py |
SkillsContextManager:按 enabled_skills 过滤,聚合描述文本生成 system 块 |
skills_engine/hooks_config.py |
HookInfo 数据类 + scan_hooks() 扫描 hooks/ 目录加载 hooks.json + .md |
skills_engine/hook_manager.py |
HookManager:在 Agent 流程检查点 inject/remove 提示词。同一 timing 的 hooks 按 (role, persistent) 分组合并为单条消息注入,详见 HOOK_SYSTEM.md |
skills_engine/__init__.py |
统一导出 |
websocket_server.py 在 Agent 模式下从请求中读取 enabled_skills,创建 SkillsContextManager,将 get_session_prompt() 返回的文本作为独立 system 块插入 messages[1](紧跟主 system prompt 之后)。
前端通过 GET /api/skills 获取可用 skill 列表,用户选择持久化到 localStorage,每次请求通过 enabled_skills 字段传递。
上下文截断白名单
前端 buildMessagesForSend() 对历史消息中的工具调用参数和结果进行截断(TOOL_CONTEXT_MAX_LEN = 300)。NO_TRUNCATE_TOOLS 白名单中的工具(如 get_skill)不做截断,保证 LLM 能看到完整内容。截断时通过 tool_call_id 回溯匹配工具名称。
评论区