子代理
CountBot 的后台子代理系统和多智能体协作系统,支持创建独立的后台 Agent 处理耗时任务,以及多个智能体协同完成复杂工作流。
目录
子代理系统
设计理念
- 异步执行 — 子代理在后台独立运行,不阻塞主对话
- 独立上下文 — 每个子代理有独立的消息列表和工具注册表
- 进度追踪 — 实时更新任务进度(0-100%)
- 结果通知 — 任务完成后通过 WebSocket 通知前端
- 资源隔离 — 子代理有独立的迭代限制和工具集
架构概览
┌─────────────────────────────────────────────────────────┐
│ 主 Agent Loop │
│ │
│ 用户: "分析项目代码结构并生成报告" │
│ Agent: spawn(task="分析项目代码结构并生成报告") │
│ │ │
│ ▼ │
│ SpawnTool.execute() │
│ │ │
│ ▼ │
│ SubagentManager.create_task() → task_id │
│ SubagentManager.execute_task(task_id) │
│ │ │
│ │ asyncio.create_task (后台运行) │
│ ▼ │
│ ┌───────────── ─────────────────────────────────────┐ │
│ │ SubagentTask │ │
│ │ │ │
│ │ 独立的 Agent Loop: │ │
│ │ ├─ 系统提示词(子代理专用) │ │
│ │ ├─ 独立工具注册表 │ │
│ │ │ (read_file, write_file, edit_file, │ │
│ │ │ list_dir, exec, web_fetch) │ │
│ │ ├─ max_iterations = 15 │ │
│ │ └─ 进度追踪 (0% → 100%) │ │
│ │ │ │
│ │ 完成后: status=COMPLETED, result="..." │ │
│ └──────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ WebSocket 通知前端: 任务完成 │
└─────────────────────────────────────────────────────────┘
核心组件
SubagentManager
文件: backend/modules/agent/subagent.py
子代理管理器,负责任务的创建、执行、查询和清理。
from backend.modules.agent.subagent import SubagentManager
manager = SubagentManager(
provider=llm_provider,
workspace=Path("/workspace"),
model="glm-5",
)
方法列表
| 方法 | 说明 |
|---|---|
create_task(label, message, session_id) | 创建任务,返回 task_id |
execute_task(task_id) | 异步执行任务 |
cancel_task(task_id) | 取消运行中的任务 |
get_task(task_id) | 获取任务信息 |
list_tasks(status, session_id) | 列出任务 |
get_running_tasks() | 获取运行中的任务 |
delete_task(task_id) | 删除任务 |
get_running_count() | 运行中任务数量 |
get_stats() | 统计信息 |
cleanup_old_tasks(max_age_hours) | 清理过期任务 |
register_notification_callback(cb) | 注册通知回调 |
SubagentTask
文件: backend/modules/agent/subagent.py
任务数据类,封装单个子代理任务的状态。
class SubagentTask:
def __init__(
self,
task_id: str, # UUID
label: str, # 显示标签
message: str, # 任务描述
session_id: str, # 关联会话
system_prompt: str, # 自定义系统提示词
event_callback, # 事件回调
enable_skills: bool, # 是否启用技能
):
self.task_id = task_id
self.label = label
self.message = message
self.status = TaskStatus.PENDING
self.progress = 0 # 进度 (0-100)
self.result = None # 执行结果
self.error = None # 错误信息
self.model_override = None # 模型配置覆盖
self.cancel_token = None # 取消令牌
self.tool_call_records = [] # 工具调用记录
TaskStatus
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
SpawnTool
文件: backend/modules/tools/spawn.py
面向 Agent 的子代理创建工具。
{
"name": "spawn",
"parameters": {
"task": "分析项目代码结构并生成报告",
"label": "代码分析"
}
}
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
task | string | 是 | 任务描述 |
label | string | 否 | 显示标签(默认取 task 前 30 字符) |
返回示例:"子 Agent [代码分析] 已启动 (ID: uuid)。完成后我会通知你。"
任务生命周期
create_task() execute_task() _run_task()
│ │ │
▼ ▼ ▼
PENDING ──────────► RUNNING ──────────► COMPLETED
│ │
│ ├─► FAILED (异常)
│ └─► CANCELLED (取消)
│
└─ cancel_task() ──► CANCELLED
子代理执行流程
1. 构建独立环境
子代理有独立的系统提示词和工具注册表:
# 子代理专用系统提示词
system_prompt = _build_subagent_prompt(task)
# 独立工具注册表(比主 Agent 少)
tools = ToolRegistry()
tools.register(ReadFileTool(workspace))
tools.register(WriteFileTool(workspace))
tools.register(EditFileTool(workspace))
tools.register(ListDirTool(workspace))
tools.register(ExecTool(workspace, timeout=300, allow_dangerous=False))
tools.register(WebSearchTool()) # 可选
tools.register(WebFetchTool()) # 可选
2. 执行 Agent Loop
子代理运行独立的 ReAct 循环:
while iteration < 15:
│
├─ provider.chat_stream(messages, tools)
│ ├─ 收集 content
│ └─ 收集 tool_calls
│
├─ if tool_calls:
│ ├─ 执行工具
│ ├─ 添加 tool result
│ └─ progress += 5 (最大 90%)
│
└─ else: break
3. 完成处理
task.result = "".join(response_chunks)
task.status = TaskStatus.COMPLETED
task.progress = 100
task.completed_at = datetime.now()
工 具权限
子代理可用的工具是主 Agent 的子集:
| 工具 | 主 Agent | 子代理 | 说明 |
|---|---|---|---|
| read_file | 可用 | 可用 | |
| write_file | 可用 | 可用 | |
| edit_file | 可用 | 可用 | |
| list_dir | 可用 | 可用 | |
| exec | 可用 | 可用 | 超时 300s(主 Agent 30s) |
| web_fetch | 可用 | 可用 | 可选 |
| spawn | 可用 | 不可用 | 防止递归创建 |
| send_media | 可用 | 不可用 | |
| screenshot | 可用 | 不可用 | |
| memory | 可用 | 不可用 | 运行时默认是统一记忆工具 |
子代理的 Shell 工具超时设为 300 秒(主 Agent 默认 30 秒),适合执行耗时命令。
通知机制
WebSocket 通知
子代理状态变化时通过 WebSocket 通知前端:
manager.register_notification_callback(notify_callback)
# 通知事件类型
await _notify(task_id, "started") # 任务开始
await _notify(task_id, "completed") # 任务完成
await _notify(task_id, "failed") # 任务失败
await _notify(task_id, "cancelled") # 任务取消
前端展示
前端通过 WebSocket 接收任务状态更 新,在 UI 中展示:
- 任务列表(运行中/已完成/失败)
- 进度条
- 任务结果
API 接口
文件: backend/api/tasks.py
| 端点 | 方法 | 说明 |
|---|---|---|
/api/tasks | GET | 列出任务 |
/api/tasks/{id} | GET | 获取任务详情 |
/api/tasks/{id}/cancel | POST | 取消任务 |
/api/tasks/{id} | DELETE | 删除任务 |
/api/tasks/stats | GET | 任务统计 |
任务列表
GET /api/tasks?session_id=xxx&status=running
响应:
[
{
"task_id": "uuid",
"label": "代码分析",
"message": "分析项目代码结构并生成报告",
"status": "running",
"progress": 45,
"created_at": "2026-02-15T10:00:00",
"completed_at": null
}
]
任务统计
GET /api/tasks/stats
响应:
{
"total": 10,
"running": 1,
"completed": 8,
"failed": 1,
"cancelled": 0
}
使用场景
| 场景 | 示例 |
|---|---|
| 代码分析 | "分析整个项目的代码结构并生成文档" |
| 批量处理 | "将 data/ 目录下所有 CSV 转换为 JSON" |
| 长时间编译 | "编译项目并运行所有测试" |
| 数据采集 | "搜索并整理 Python 最佳实践资料" |
| 报告生成 | "分析日志文件并生成错误报告" |
配置参数
| 参数 | 默认值 | 说明 |
|---|---|---|
max_iterations | 15 | 子代理最大迭代次数 |
| Shell 超时 | 300s | 子代理 Shell 命令超时 |
max_age_hours | 24h | 过期任务清理阈值 |
allow_dangerous | false | 子代理不允许危险命令 |
restrict_to_workspace | true | 限制在工作空间内 |
多智能体协作系统
CountBot 的多智能体工作流系统支持三种协作模式:Pipeline(流水线)、Graph(有向无环图)、Council(议会)。
系统架构
多智能体协作系统采用分层架构:
┌─────────────────────────────────────────────────────────┐
│ 用户层 │
│ Web UI / 飞书 / 钉钉 / QQ / Telegram │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ API 层 │
│ /api/agent-teams (CRUD) │
│ workflow_run 工具 / /api/agent-teams/{id}/config │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 工作流引擎层 │
│ WorkflowEngine (backend/modules/agent/workflow.py) │
│ ├─ Pipeline 编排器 │
│ ├─ Graph 调度器(DAG + 并行) │
│ └─ Council 协调器(多视角 + 交叉评审) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 子代理管理层 │
│ SubagentManager (backend/modules/agent/subagent.py) │
│ ├─ 任务生命周期管理 │
│ ├─ 异步任务调度 │
│ ├─ 工具调用追踪 │
│ └─ 实时进度推送 │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Agent Loop 层 │
│ 独立的 ReAct 循环 + 工具系统 + LLM 调用 │
└─────────────────────────────────────────────────────────┘
核心技术特性
1. 异步并发执行
使用 asyncio.gather() 实现多代理并行执行:
# Graph 模式:并发执行无依赖节点
ready_agents = [a for a in agents if not a.depends_on]
results = await asyncio.gather(*[
execute_agent(agent) for agent in ready_agents
])
# Council 模式:并发执行所有成员
results = await asyncio.gather(*[
execute_agent(agent) for agent in council_members
])
2. 流式内容传输
支持实时推送子代理的输出内容:
async for chunk in provider.chat_stream(messages, tools):
if chunk.content:
# 实时推送到 WebSocket
await event_callback("workflow_agent_chunk", {
"agent_id": agent.id,
"content": chunk.content
})
3. 工具调用追踪
记录每个子代理的工具调用详情:
tool_call_record = {
"tool": tool_name,
"arguments": arguments,
"result": result,
"success": True,
"duration": elapsed_ms,
"timestamp": datetime.now().isoformat()
}
task.tool_call_records.append(tool_call_record)
4. 执行元数据序列化
将工作流执行过程序列化为 HTML 注释,支持前端状态恢复:
metadata = f"""
<!-- WORKFLOW_EXECUTION_METADATA
mode: {mode}
agents: {len(agents)}
tool_calls: {total_tool_calls}
duration: {duration_ms}ms
-->
"""
5. 技能系统集成
支持为子代理动态注入技能文档:
if enable_skills:
skills_summary = build_skills_summary()
system_prompt += f"\n\n## 可用技能\n{skills_summary}"
协作模式
Pipeline 模式(流水线)
智能体按顺序执行任务,前一个的输出作为下一个的输入。
执行流程:
async def execute_pipeline(agents, goal):
context = [] # 累积上下文
for agent in agents:
# 构建提示词:目标 + 前序输出
prompt = f"目标: {goal}\n\n前序输出:\n{context}"
# 执行当前阶段
result = await execute_agent(agent, prompt)
# 累积到上下文
context.append(f"## {agent.role}\n{result}")
return "\n\n".join(context)
使用场景:
- 文档处理流程(提取 → 分析 → 总结)
- 数据处理管道(采集 → 清洗 → 分析)
- 代码生成流程(设计 → 实现 → 测试)
示例配置:
{
"name": "文档处理流水线",
"mode": "pipeline",
"agents": [
{
"id": "extractor",
"role": "信息提取专家",
"system_prompt": "你是信息提取专家,专注于从文档中提取关键信息",
"task": "提取文档中的关键信息和数据"
},
{
"id": "analyzer",
"role": "数据分析师",
"system_prompt": "你是数据分析师,专注于分析数据模式和趋势",
"task": "分析提取的信息,找出模式和趋势"
},
{
"id": "summarizer",
"role": "报告撰写专家",
"system_prompt": "你是报告撰写专家,专注于生成清晰的总结报告",
"task": "基于分析结果生成最终总结报告"
}
]
}
Graph 模式(有向无环图)
支持复杂的任务依赖关系,智能体可以并行执行。
执行流程:
async def execute_graph(agents, goal):
# 1. 构建依赖映射
dep_map = {a.id: set(a.depends_on) for a in agents}
# 2. 检测循环依赖
if has_cycle(dep_map):
raise ValueError("检测到循环依赖")
# 3. 调度循环
completed = set()
results = {}
while len(completed) < len(agents):
# 找出所有无阻塞的节点
ready = [a for a in agents
if a.id not in completed
and dep_map[a.id].issubset(completed)]
# 并发执行
tasks = [execute_agent(a, goal, results) for a in ready]
agent_results = await asyncio.gather(*tasks)
# 更新结果
for agent, result in zip(ready, agent_results):
results[agent.id] = result
completed.add(agent.id)
return results
依 赖解析:
# 上游失败自动标记下游为失败
if any(dep in failed_agents for dep in agent.depends_on):
agent.status = "failed"
agent.error = "上游任务失败"
continue
使用场景:
- 复杂项目分析(多个模块并行分析)
- 多源数据聚合(并行获取后汇总)
- 并行代码审查(多个文件同时审查)
示例配置:
{
"name": "项目分析图",
"mode": "graph",
"agents": [
{
"id": "frontend",
"role": "前端分析师",
"system_prompt": "你是前端分析师,专注于前端代码结构分析",
"task": "分析前端代码结构和技术栈",
"depends_on": []
},
{
"id": "backend",
"role": "后端分析师",
"system_prompt": "你是后端分析师,专注于后端代码结构分析",
"task": "分析后端代码结构和 API 设计",
"depends_on": []
},
{
"id": "database",
"role": "数据库专家",
"system_prompt": "你是数据库专家,专注于数据库设计分析",
"task": "分析数据库设计和数据模型",
"depends_on": []
},
{
"id": "integrator",
"role": "系统架构师",
"system_prompt": "你是系统架构师,专注于整体架构分析",
"task": "汇总前后端和数据库分析,生成完整的架构报告",
"depends_on": ["frontend", "backend", "database"]
}
]
}
Council 模式(议会)
多个智能体从不同视角分析同一问题,支持交叉审查。
执行流程:
async def execute_council(agents, goal, cross_review=True):
# 第 1 轮:独立分析
round1_tasks = [
execute_agent(a, f"{goal}\n\n请从{a.perspective}角度分析")
for a in agents
]
round1_results = await asyncio.gather(*round1_tasks)
if not cross_review:
return format_council_results(round1_results)
# 第 2 轮:交叉评审
round2_tasks = []
for i, agent in enumerate(agents):
# 收集其他成员的立场
other_views = [
f"## {agents[j].role} ({agents[j].perspective})\n{round1_results[j]}"
for j in range(len(agents)) if j != i
]
prompt = f"""
目标: {goal}
你的初步分析:
{round1_results[i]}
其他成员的立场:
{chr(10).join(other_views)}
请评价其他成员的观点,并完善你的分析。
"""
round2_tasks.append(execute_agent(agent, prompt))
round2_results = await asyncio.gather(*round2_tasks)
return format_council_results(round2_results)
使用场景:
- 决策分析(多角度评估)
- 代码审查(多人审查模式)
- 方案评估(技术、成本、风险多维度)
示例配置:
{
"name": "技术方案评审议会",
"mode": "council",
"cross_review": true,
"agents": [
{
"id": "tech_expert",
"role": "技术专家",
"system_prompt": "你是技术专家,专注于技术可行性和实现难度评估",
"perspective": "技术可行性"
},
{
"id": "business_analyst",
"role": "商业分析师",
"system_prompt": "你是商业分析师,专注于商业价值和市场需求评估",
"perspective": "商业价值"
},
{
"id": "risk_manager",
"role": "风险管理专家",
"system_prompt": "你是风险管理专家,专注于识别和评估潜在风险",
"perspective": "风险评估"
},
{
"id": "cost_controller",
"role": "成本控制专家",
"system_prompt": "你是成本控制专家,专注于成本效益分析",
"perspective": "成本效益"
}
]
}
团队管理 API
新增文件:
backend/api/agent_teams.py- 团队管理 APIbackend/models/agent_team.py- 团队数据模型backend/modules/agent/workflow.py- 工作流引擎
API 端点:
| 端点 | 方法 | 说明 |
|---|---|---|
/api/agent-teams | GET | 列出所有智能体团队 |
/api/agent-teams | POST | 创建智能体团队 |
/api/agent-teams/{id} | GET | 获取团队详情 |
/api/agent-teams/{id} | PUT | 更新团队配置 |
/api/agent-teams/{id} | DELETE | 删除团队 |
workflow_run | Tool | 在对话链路中执行团队工作流 |
/api/agent-teams/{id}/config | GET/PUT/DELETE | 团队模型配置 |
创建团队
POST /api/agent-teams
{
"name": "文档处理团队",
"description": "专门处理文档的智能体团队",
"mode": "pipeline",
"agents": [
{
"name": "提取器",
"role": "从文档中提取关键信息",
"model": "glm-5",
"temperature": 0.3
},
{
"name": "分析器",
"role": "分析提取的信息",
"model": "glm-5",
"temperature": 0.5
}
],
"enabled": true
}
执行团队工作流
通过 `workflow_run(team_name=..., goal=...)` 触发团队执行
{
"task": "分析这份项目文档并生成总结",
"context": {
"file_path": "docs/project.md"
}
}
响应:
{
"execution_id": "uuid",
"status": "running",
"results": []
}
使用方式
1. 通过 API 创建团队
使用上述 API 创建智能体团队,配置协作模式和各个智能体的角色。
2. 在聊天中 @团队
在聊天窗口中直接 @团队名称触发协作执行:
用户: @文档处理团队 分析这份项目文档
系统会自动:
- 识别 @团队名称
- 加载团队配置
- 按照配置的模式执行工作流
- 返回协作结果
3. 在飞书等渠道中使用
在飞书、钉钉等渠道中,同样支持 @团队名称:
@CountBot @代码审查议会 审查这段代码
协作场景
场景 1:文档处理流水线
Pipeline: 提取器 → 分析器 → 总结器
输入: 一份 50 页的技术文档
输出: 结构化的总结报告