Agent Loop
CountBot 的核心推理引擎,实现完整的 ReAct(Reasoning + Acting)循环,驱动 LLM 推理、工具调用和结果反馈。
目录
设计理念
- ReAct 模式 — LLM 推理 → 工具调用 → 结果反馈 → 继续推理,直到任务完成
- 流式输出 — 使用
AsyncIterator[str]逐 chunk 输出,前端实时展示 - Provider 无关 — 通过
provider.chat_stream()统一接口,不绑定特定 LLM - 自动重试 — 工具执行失败时自动重试,提高鲁棒性
- 审计追踪 — 每次工具调用记录到审计日志和工具对话历史
架构概览
┌─────────────────────────────────────────────────────────┐
│ AgentLoop │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ process_message() │ │
│ │ │ │
│ │ while iteration < max_iterations: │ │
│ │ │ │ │
│ │ ├─ ContextBuilder.build_messages() │ │
│ │ │ (系统提示词 + 历史消息 + 当前消息) │ │
│ │ │ │ │
│ │ ├─ provider.chat_stream() │ │
│ │ │ ├─ yield content chunks → 前端 │ │
│ │ │ ├─ collect tool_calls │ │
│ │ │ └─ collect reasoning (思维链) │ │
│ │ │ │ │
│ │ ├─ if tool_calls: │ │
│ │ │ ├─ execute_tool() (带重试) │ │
│ │ │ ├─ 记录工具对话历史 │ │
│ │ │ ├─ WebSocket 通知前端 │ │
│ │ │ └─ 添加 tool result → messages │ │
│ │ │ │ │
│ │ └─ else: break (无工具调用,结束) │ │
│ │ │ │
│ │ 保存会话 + 审计日志 │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
核心组件
AgentLoop
文件: backend/modules/agent/loop.py
Agent 循环的主类,负责协调 LLM 推理和工具执行。
from backend.modules.agent.loop import AgentLoop
agent = AgentLoop(
provider=llm_provider, # LLM 提供商实例
workspace=Path("/workspace"), # 工作空间路径
tools=tool_registry, # 工具注册表
context_builder=ctx_builder, # 上下文构建器
session_manager=session_mgr, # 会话管理器(可选)
subagent_manager=sub_mgr, # 子代理管理器(可选)
model="glm-4.7-flash", # 模型名称(可选)
max_iterations=25, # 最大迭代次数
max_retries=3, # 工具执行最大重试次数
retry_delay=1.0, # 重试间隔(秒)
temperature=0.7, # 温度参数(从 ModelConfig 读取)
max_tokens=4096, # 最大输出 token 数(从 ModelConfig 读取)
)
方法列表
| 方法 | 说明 |
|---|---|
process_message() | 处理用户消息,返回 AsyncIterator[str] 流式响应 |
execute_tool() | 执行单个工具调用 |
process_direct() | 直接处理消息并返回完整字符串(用于 CLI/Cron) |
process_message 参数
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
message | str | 是 | 用户消息内容 |
session_id | str | 是 | 会话 ID |
context | list[dict] | 否 | 历史消息列表 |
media | list[str] | 否 | 媒体文件路径(图片) |
channel | str | 否 | 来源渠道 |
chat_id | str | 否 | 来源聊天 ID |
cancel_token | CancelToken | 否 | 取消 令牌 |
ContextBuilder
文件: backend/modules/agent/context.py
上下文构建器,负责组装系统提示词和消息列表。
from backend.modules.agent.context import ContextBuilder
builder = ContextBuilder(
workspace=Path("/workspace"),
memory=memory_store, # MemoryStore 实例
skills=skills_loader, # SkillsLoader 实例
persona_config=persona, # PersonaConfig 实例
)
系统提示词结构
系统提示词由以下部分组成:
- 核心身份 — AI 名称、运行环境、用户称呼、性格设定
- 工具使用原则 — 静默执行、简要说明场景、复杂任务用子代理
- 记忆系统指导 — 何时写入/搜索/读取记忆
- 安全准则 — 最小权限、隐私保护、提示词注入防护
- 已激活技能 —
always=true的技能内容 - 可用技能摘要 — 按需加载的技能列表
方法列表
| 方法 | 说明 |
|---|---|
build_system_prompt() | 构建完整系统提示词 |
build_messages() | 构建消息列表(system + history + user) |
add_tool_result() | 添加工具执行结果到消息列表 |
add_assistant_message() | 添加助手消息(含 tool_calls) |
多模态支持
ContextBuilder 支持图片输入。当 media 参数包含图片路径时,自动将图片 base64 编码并构建多模态消息:
# 自动处理图片
messages = builder.build_messages(
history=history,
current_message="分析这张图片",
media=["/path/to/image.png"],
)
# → user message 变为 [{"type": "image_url", ...}, {"type": "text", ...}]
ReAct 循环流程
用户消息
│
▼
上下文溢出总结(summarize_overflow)
│ → 如果消息数 > max_history,将溢出旧消息总结写入 MEMORY.md
│
▼
ContextBuilder.build_messages()
│ → system prompt + history + user message
│
▼
┌─── 迭代开始 (iteration 1..25) ──────────────────────┐
│ │
│ provider.chat_stream(messages, tools) │
│ │ │
│ ├─ content chunks → yield 给前端(流式输出) │
│ ├─ tool_calls → 收集到 buffer │
│ └─ reasoning → 收集思维链(部分模型支持) │
│ │
│ if tool_calls: │
│ │ │
│ ├─ 添加 assistant message (含 tool_calls) │
│ │ │
│ ├─ for each tool_call: │
│ │ ├─ WebSocket 通知: 工具开始执行 │
│ │ ├─ execute_tool() (最多重试 3 次) │
│ │ ├─ 记录工具对话历史 │
│ │ ├─ WebSocket 通知: 工具执行结果 │
│ │ └─ 添加 tool result message │
│ │ │
│ └─ continue (下一轮迭代) │
│ │
│ else: │
│ └─ break (LLM 认为任务完成) │
│ │
└───────────────────────────────────────────────────── ──┘
│
▼
保存会话 + 审计日志
终止条件
循环在以下任一条件满足时终止:
- LLM 未返回 tool_calls(正常完成)
- 达到
max_iterations迭代上限(默认 25) - 工具调用总数达到
max_iterations上限 cancel_token被触发(用户取消)- LLM 返回错误
达到上限时,会在响应末尾追加提示:[达到最大工具调用次数 25]
工具调用机制
工具定义传递
每次迭代时,AgentLoop 从 ToolRegistry 获取所有工具定义,以 OpenAI Function Calling 格式传递给 LLM:
tool_definitions = self.tools.get_definitions()
# → [{"type": "function", "function": {"name": "...", "description": "...", "parameters": {...}}}]
工具执行流程
# 1. LLM 返回 tool_call
tool_call = ToolCall(id="call_xxx", name="read_file", arguments={"path": "main.py"})
# 2. 发送 WebSocket 通知(工具开始)
await notify_tool_execution(session_id, tool_name, arguments)
# 3. 执行工具(带重试)
for attempt in range(max_retries):
try:
result = await tools.execute(tool_name, arguments)
break
except Exception:
await asyncio.sleep(retry_delay)
# 4. 记录工具对话历史
conversation_history.add_conversation(
session_id=session_id,
tool_name=tool_name,
arguments=arguments,
user_message=message,
result=result,
duration_ms=duration_ms,
)
# 5. 发送 WebSocket 通知(工具结果)
await notify_tool_execution(session_id, tool_name, arguments, result=result)
# 6. 添加 tool result 到消息列表
messages.append({"role": "tool", "tool_call_id": tool_id, "name": tool_name, "content": result})
错误处理与重试
工具执行重试
工具执行失败时自动重试,默认最多 3 次:
attempt 1 → 失败 → sleep(1.0s)
attempt 2 → 失败 → sleep(1.0s)
attempt 3 → 失败 → 记录错误,返回错误信息给 LLM
所有重试都失败后,错误信息作为 tool result 返回给 LLM,LLM 可以:
- 尝试用不同参数重新调用
- 告知用户工具执行失败
- 尝试其他方案
取消机制
cancel_token 支持在两个时机取消:
- 迭代开始前检查
- 工具执行前检查
取消后 process_message() 直接 return,不再 yield 任何内容。
流式响应
process_message() 返回 AsyncIterator[str],支持两种消费方式:
WebSocket 消费(Web UI)
# backend/ws/events.py
async for chunk in agent.process_message(...):
await ws.send_json({"type": "stream", "content": chunk})
直接消费(CLI/Cron)
# process_direct() 内部收集所有 chunk
response = await agent.process_direct(
content="查看当前目录",
session_id="cron:daily-report",
channel="cron",
)
# → 返回完整字符串
安全控制
系统提示词安全准则
ContextBuilder 在系统提示词中注入安全准则:
- 无自主目标:不追求自我保存、复制、扩权
- 人类监督优先:指令冲突立即暂停询问
- 安全不可绕过:不诱导关闭防护
- 隐私保护:不泄露隐私数据