跳到主要内容

HTTP API 深度解析

本章节深入剖析 CountBot 的 HTTP API 机制,通过真实场景展示完整的请求-响应流程。每篇文章聚焦一个核心场景,包含完整的 HTTP 请求示例、后端处理流程和返回数据格式。

目录

  1. 基础对话流程 - 用户发送"你好"
  2. 工具调用流程 - 查询天气信息
  3. 技能系统深度解析 - 使用技能执行任务
  4. 子代理调度流程 - 创建后台任务
  5. 文件操作流程 - 读写文件
  6. Shell 命令执行 - 执行系统命令
  7. WebSocket 实时通信 - 流式响应和通知
  8. 多轮对话和上下文管理 - 连续对话

1. 基础对话流程

场景描述

用户通过 Web 界面发送一条简单的问候消息"你好",系统返回 AI 的回复。这是最基础的对话场景,展示了 CountBot 的核心消息处理流程。

HTTP 请求示例

curl -X POST http://localhost:8000/api/chat/send \
-H "Content-Type: application/json" \
-d '{
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "你好",
"attachments": null
}'

请求参数详解

参数类型必填说明
session_idstring会话唯一标识符(UUID 格式)
messagestring用户消息内容(最少 1 个字符)
attachmentsarray附件路径列表(用于图片、文件等)

后端处理流程

1. FastAPI 接收请求 (POST /api/chat/send)

2. 验证会话是否存在 (SessionManager.get_session)

3. 保存用户消息到数据库 (SessionManager.add_message)

4. 获取会话历史 (SessionManager.get_history_with_summary)
- 包含会话总结(如果有)
- 限制历史消息条数(max_history_messages)

5. 构建 LLM 上下文 (ContextBuilder.build_messages)
- 系统提示词(persona 配置)
- 会话总结(压缩的历史信息)
- 最近的对话历史
- 当前用户消息

6. Agent Loop 处理 (AgentLoop.process_message)
- 调用 LLM 生成响应
- 流式返回内容块

7. 保存 AI 响应到数据库

8. 返回 SSE 流式响应

SSE 流式响应格式

CountBot 使用 Server-Sent Events (SSE) 实现流式响应,客户端可以实时接收 AI 生成的内容。

开始事件

event: start
data: {"message_id": "12345"}

内容块事件(多次)

event: message
data: {"content": "你"}

event: message
data: {"content": "好"}

event: message
data: {"content": "!"}

event: message
data: {"content": "我"}

event: message
data: {"content": "是"}

event: message
data: {"content": " CountBot"}

完成事件

event: done
data: {"message_id": "12346"}

错误事件(如果发生错误)

event: error
data: {"error": "Internal server error", "type": "Exception"}

完整响应示例

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no

event: start
data: {"message_id": "12345"}

event: message
data: {"content": "你好!我是 CountBot,"}

event: message
data: {"content": "一个智能助手。"}

event: message
data: {"content": "有什么我可以帮助你的吗?"}

event: done
data: {"message_id": "12346"}

关键代码片段

1. 消息保存(backend/api/chat.py)

# 保存用户消息到数据库
user_message = await session_manager.add_message(
session_id=request.session_id,
role="user",
content=request.message,
)

2. 获取会话历史(包含总结)

# 获取带总结的会话历史
context = await session_manager.get_history_with_summary(
session_id=request.session_id,
limit=None if max_history == -1 else max_history
)

3. Agent Loop 流式处理

# 处理消息并流式输出
async for chunk in agent_loop.process_message(
message=request.message,
session_id=request.session_id,
context=context,
media=request.attachments,
):
assistant_content += chunk
# 发送内容块
yield f"event: message\ndata: {json.dumps({'content': chunk})}\n\n"

CountBot 特色功能

  1. 会话总结机制:自动压缩历史对话,节省 token 消耗
  2. 滚动窗口溢出总结:当历史消息超过限制时,自动将旧消息总结到记忆系统
  3. 自动记忆触发:当会话达到 30 条消息或 15000 字符时,自动保存到长期记忆
  4. 流式响应:使用 SSE 实现实时内容推送,提升用户体验
  5. 异步处理:完全异步架构,支持高并发

2. 工具调用流程

场景描述

用户发送"帮我查询北京天气",AI 识别需要调用工具,执行 Shell 命令查询天气,并返回结果。

HTTP 请求示例

curl -X POST http://localhost:8000/api/chat/send \
-H "Content-Type: application/json" \
-d '{
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "帮我查询北京天气"
}'

后端处理流程

1. 接收用户消息

2. Agent Loop 调用 LLM

3. LLM 返回工具调用请求
{
"tool_calls": [{
"id": "call_abc123",
"type": "function",
"function": {
"name": "exec",
"arguments": "{\"command\": \"curl wttr.in/Beijing?format=3\"}"
}
}]
}

4. Agent Loop 识别工具调用

5. 发送 WebSocket 通知(工具调用开始)

6. 执行工具 (ToolRegistry.execute)
- 验证参数
- 安全检查(危险命令检测)
- 执行命令
- 记录审计日志

7. 工具返回结果
"Beijing: Clear, +15°C"

8. 发送 WebSocket 通知(工具调用完成)

9. 将工具结果添加到上下文

10. LLM 生成最终响应
"北京今天天气晴朗,温度 15°C。"

11. 流式返回给用户

工具注册和发现机制

CountBot 使用 ToolRegistry 管理所有工具:

# backend/modules/tools/setup.py
def register_all_tools(workspace, ...):
tools = ToolRegistry()

# 1. 文件系统工具
tools.register(ReadFileTool(workspace))
tools.register(WriteFileTool(workspace))
tools.register(EditFileTool(workspace))
tools.register(ListDirTool(workspace))

# 2. Shell 工具
tools.register(ExecTool(
workspace=workspace,
timeout=command_timeout,
allow_dangerous=allow_dangerous,
))

# 3. Web 工具
tools.register(WebFetchTool())

# 4. 子代理工具
tools.register(SpawnTool(subagent_manager))

# 5. 其他工具...

return tools

工具定义格式

工具定义遵循 OpenAI Function Calling 格式:

{
"type": "function",
"function": {
"name": "exec",
"description": "执行 Shell 命令",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "要执行的命令"
}
},
"required": ["command"]
}
}
}

工具执行过程

1. 参数验证

# backend/modules/tools/registry.py
errors = tool.validate_params(arguments)
if errors:
return f"Error: Invalid parameters: {'; '.join(errors)}"

2. 安全检查(Shell 工具)

# backend/modules/tools/shell.py
DANGEROUS_PATTERNS = [
r'\brm\s+-rf\s+/',
r'\b(sudo|su)\b',
r'>\s*/dev/sd[a-z]',
r'\bdd\b.*if=/dev/zero',
# ... 更多危险模式
]

def _is_dangerous(self, command: str) -> bool:
for pattern in self.deny_patterns:
if re.search(pattern, command, re.IGNORECASE):
return True
return False

3. 执行命令

# 使用 asyncio.subprocess 执行
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(self.workspace),
)

# 设置超时
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout
)

4. 记录审计日志

# backend/modules/tools/file_audit_logger.py
file_audit_logger.record_call(
call_id=call_id,
tool_name=tool_name,
arguments=arguments,
session_id=session_id
)

# 执行完成后更新结果
file_audit_logger.update_result(
call_id,
result,
"success",
duration_ms=duration_ms
)

WebSocket 工具调用通知

工具调用开始通知

{
"type": "tool_call",
"tool": "exec",
"arguments": {
"command": "curl wttr.in/Beijing?format=3"
},
"message_id": 12345
}

工具调用完成通知

{
"type": "tool_result",
"tool": "exec",
"result": "Beijing: Clear, +15°C",
"message_id": 12345,
"duration": 1250
}

完整 SSE 响应示例

event: start
data: {"message_id": "12345"}

event: message
data: {"content": "好的,"}

event: message
data: {"content": "让我查询一下北京的天气。"}

event: message
data: {"content": "\n\n"}

event: message
data: {"content": "北京今天天气晴朗,"}

event: message
data: {"content": "温度 15°C。"}

event: done
data: {"message_id": "12346"}

工具调用记录查询

可以通过 API 查询工具调用历史:

curl http://localhost:8000/api/chat/sessions/{session_id}/messages

响应包含工具调用记录:

{
"id": 12346,
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"role": "assistant",
"content": "北京今天天气晴朗,温度 15°C。",
"created_at": "2024-01-15T10:30:00Z",
"tool_calls": [
{
"id": "tc_001",
"name": "exec",
"arguments": {
"command": "curl wttr.in/Beijing?format=3"
},
"result": "Beijing: Clear, +15°C",
"status": "success",
"duration": 1250
}
]
}

CountBot 特色功能

  1. 智能工具选择:LLM 自动识别需要的工具,无需用户指定
  2. 安全沙箱:危险命令检测、工作空间隔离、超时控制
  3. 实时通知:WebSocket 推送工具调用状态,用户可见执行过程
  4. 审计日志:完整记录所有工具调用,支持追溯和调试
  5. 重试机制:工具执行失败自动重试(最多 3 次)
  6. 并发控制:支持并行工具调用,提升执行效率

3. 技能系统深度解析

场景描述

用户请求"生成一张猫的图片",AI 识别需要使用 image-gen 技能,读取技能文档,执行其中的 Python 脚本生成图片。

技能系统架构

skills/                          # 技能目录
├── image-gen/ # 图片生成技能
│ ├── SKILL.md # 技能文档(必需)
│ ├── config.json # 配置文件(可选)
│ └── scripts/
│ └── generate.py # 执行脚本
├── weather/ # 天气查询技能
│ ├── SKILL.md
│ └── scripts/
│ └── query.py
└── ...

技能文档格式(SKILL.md)

---
title: 图片生成
description: 使用 AI 模型生成图片
always: false
metadata: {"CountBot": {"requires": {"bins": ["python3"], "env": ["OPENAI_API_KEY"]}}}
---

# 图片生成技能

## 功能说明

使用 DALL-E 3 模型生成图片。

## 使用方法

### 生成图片

\`\`\`bash
python skills/image-gen/scripts/generate.py --prompt "a cute cat" --output output.png
\`\`\`

### 参数说明

- `--prompt`: 图片描述(必需)
- `--output`: 输出文件路径(可选,默认 output.png)
- `--size`: 图片尺寸(可选,1024x1024 / 1792x1024 / 1024x1792)

## 示例

生成一只可爱的猫:

\`\`\`bash
python skills/image-gen/scripts/generate.py --prompt "a cute cat playing with yarn"
\`\`\`

HTTP 请求示例

curl -X POST http://localhost:8000/api/chat/send \
-H "Content-Type: application/json" \
-d '{
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "生成一张猫的图片"
}'

后端处理流程

1. 用户发送消息

2. ContextBuilder 构建上下文
- 加载技能摘要(build_skills_summary)
- 注入到系统提示词

3. LLM 识别需要使用技能

4. LLM 调用 read_file 工具
{
"tool": "read_file",
"arguments": {
"path": "skills/image-gen/SKILL.md"
}
}

5. 读取技能文档内容

6. LLM 分析文档中的命令示例

7. LLM 调用 exec 工具执行命令
{
"tool": "exec",
"arguments": {
"command": "python skills/image-gen/scripts/generate.py --prompt 'a cute cat' --output temp/cat.png"
}
}

8. 执行 Python 脚本

9. 返回执行结果
"图片已生成:temp/cat.png"

10. LLM 生成最终响应
"我已经为你生成了一张可爱的猫咪图片,保存在 temp/cat.png"

技能加载机制

1. 技能扫描(SkillsLoader)

# backend/modules/agent/skills.py
class SkillsLoader:
def __init__(self, skills_dir: Path):
self.workspace_skills = skills_dir # 工作空间技能
self.builtin_skills = BUILTIN_SKILLS_DIR # 内置技能
self.skills: dict[str, Skill] = {}
self._load_all_skills()

2. 技能元数据解析

def _parse_metadata(self) -> dict[str, Any]:
"""解析 YAML frontmatter"""
metadata = {
"title": self.name,
"description": "",
"always": False, # 是否自动加载
"requires": {}, # 依赖检查
}

# 解析 YAML frontmatter
if self.content.startswith("---"):
# 提取 title, description, always, requires 等字段
...

return metadata

3. 依赖检查

def check_requirements(self) -> bool:
"""检查技能依赖是否满足"""
requires = self.metadata.get("requires", {})

# 检查二进制依赖
for binary in requires.get("bins", []):
if not shutil.which(binary):
return False

# 检查环境变量
for env_var in requires.get("env", []):
if not os.environ.get(env_var):
return False

return True

4. 技能摘要生成

def build_skills_summary(self) -> str:
"""构建技能摘要(极简版)"""
lines = []
for name, skill in sorted(self.skills.items()):
if not skill.enabled:
continue

if not skill.check_requirements():
continue

desc = skill.metadata.get("description", "")
title = skill.metadata.get("title", name)

lines.append(f"- {name} ({title}): {desc}")

return "\n".join(lines)

技能注入到系统提示词

# backend/modules/agent/context.py
def build_system_prompt(self) -> str:
prompt = f"""你是 {self.ai_name},一个智能助手。

## 可用技能(Skills)

**重要**: 技能不是工具!技能是包含命令行调用示例的文档。

以下技能已启用:

{self.skills.build_skills_summary()}

**正确使用流程**:
1. 用户提到某个功能(如"生成图片")
2. 使用 read_file 读取技能文档: read_file(path='skills/image-gen/SKILL.md')
3. 阅读文档中的命令行示例
4. 使用 exec 工具执行命令

**错误示例**:
错误示例:image_gen(prompt="...") # image-gen 不是工具

**正确示例**:
正确示例:read_file(path='skills/image-gen/SKILL.md') # 先读取文档
正确示例:exec(command='python skills/image-gen/scripts/generate.py ...') # 再执行命令
"""
return prompt

技能管理 API

列出所有技能

curl http://localhost:8000/api/skills

响应:

[
{
"name": "image-gen",
"enabled": true,
"source": "builtin",
"path": "/app/skills/image-gen/SKILL.md",
"metadata": {
"title": "图片生成",
"description": "使用 AI 模型生成图片",
"always": false,
"requirements": ["python3", "OPENAI_API_KEY"]
}
},
{
"name": "weather",
"enabled": true,
"source": "workspace",
"path": "/workspace/skills/weather/SKILL.md",
"metadata": {
"title": "天气查询",
"description": "查询城市天气信息",
"always": false,
"requirements": []
}
}
]

启用/禁用技能

# 禁用技能
curl -X PUT http://localhost:8000/api/skills/image-gen/disable

# 启用技能
curl -X PUT http://localhost:8000/api/skills/image-gen/enable

读取技能内容

curl http://localhost:8000/api/skills/image-gen

响应:

{
"name": "image-gen",
"content": "---\ntitle: 图片生成\n...",
"enabled": true,
"metadata": {
"title": "图片生成",
"description": "使用 AI 模型生成图片"
}
}

技能执行示例

完整的工具调用序列

[
{
"tool": "read_file",
"arguments": {
"path": "skills/image-gen/SKILL.md"
},
"result": "---\ntitle: 图片生成\n...\n```bash\npython skills/image-gen/scripts/generate.py --prompt \"...\" --output output.png\n```"
},
{
"tool": "exec",
"arguments": {
"command": "python skills/image-gen/scripts/generate.py --prompt 'a cute cat' --output temp/cat.png"
},
"result": "图片已生成:temp/cat.png\n尺寸:1024x1024\n模型:dall-e-3"
}
]

标准输入输出处理

技能脚本通过标准输入输出与 CountBot 交互:

# skills/image-gen/scripts/generate.py
import sys
import json

def main():
# 解析命令行参数
parser = argparse.ArgumentParser()
parser.add_argument('--prompt', required=True)
parser.add_argument('--output', default='output.png')
args = parser.parse_args()

# 执行任务
result = generate_image(args.prompt, args.output)

# 输出结果(JSON 格式)
print(json.dumps({
"success": True,
"output": args.output,
"size": "1024x1024"
}))

if __name__ == '__main__':
main()

错误处理机制

1. 依赖缺失

{
"error": "技能 'image-gen' 依赖缺失",
"missing": ["python3", "OPENAI_API_KEY"],
"suggestion": "请安装 Python 3 并设置 OPENAI_API_KEY 环境变量"
}

2. 脚本执行失败

{
"tool": "exec",
"error": "Command failed with exit code 1",
"stderr": "ModuleNotFoundError: No module named 'openai'",
"suggestion": "请运行 pip install openai"
}

3. 技能文档不存在

{
"tool": "read_file",
"error": "File not found: skills/image-gen/SKILL.md",
"suggestion": "请检查技能是否已安装"
}

CountBot 特色功能

  1. 渐进式加载:只在系统提示词中包含技能摘要,需要时才读取完整文档
  2. 依赖检查:自动检测二进制和环境变量依赖,避免执行失败
  3. 工作空间优先:工作空间技能覆盖内置技能,支持自定义
  4. 自动加载always: true 的技能自动注入到上下文
  5. 安全隔离:技能脚本在工作空间沙箱中执行
  6. 标准化接口:统一的 SKILL.md 格式,易于创建和分享

4. 子代理调度流程

场景描述

用户请求"帮我后台分析这个日志文件,完成后通知我",系统创建一个子代理(Subagent)在后台执行任务,主对话可以继续进行。

子代理架构

Main Agent (主代理)
↓ spawn 工具
Subagent Manager (子代理管理器)
↓ 创建任务
Subagent Task (子代理任务)
↓ 独立执行
- 独立的 LLM 调用
- 独立的工具注册表
- 独立的上下文
↓ 完成后
- 更新任务状态
- 通知主代理
- 保存到数据库

HTTP 请求示例

curl -X POST http://localhost:8000/api/chat/send \
-H "Content-Type: application/json" \
-d '{
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "帮我后台分析 logs/app.log 文件,找出所有错误"
}'

后端处理流程

1. 用户发送消息

2. LLM 识别需要后台任务

3. LLM 调用 spawn 工具
{
"tool": "spawn",
"arguments": {
"label": "日志分析",
"message": "分析 logs/app.log 文件,找出所有错误并统计"
}
}

4. SpawnTool 创建子代理任务
- 生成唯一 task_id
- 创建 SubagentTask 对象
- 注册到 SubagentManager

5. 立即返回任务 ID
"子 Agent [日志分析] 已启动 (ID: abc-123)。我会继续监控进度。"

6. 异步执行子代理任务
- 创建独立的 ToolRegistry
- 构建子代理专用系统提示词
- 调用 LLM 处理任务
- 执行工具调用
- 更新任务进度

7. 任务完成
- 更新状态为 COMPLETED
- 保存结果到数据库
- 发送 WebSocket 通知

8. 主代理可以查询任务状态

子代理创建 API

直接创建子代理(不通过对话)

curl -X POST http://localhost:8000/api/subagents \
-H "Content-Type: application/json" \
-d '{
"label": "日志分析",
"message": "分析 logs/app.log 文件,找出所有错误",
"session_id": "550e8400-e29b-41d4-a716-446655440000"
}'

响应:

{
"task_id": "abc-123-def-456",
"label": "日志分析",
"status": "pending",
"created_at": "2024-01-15T10:30:00Z"
}

子代理状态管理

任务状态枚举

class TaskStatus(Enum):
PENDING = "pending" # 等待执行
RUNNING = "running" # 执行中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
CANCELLED = "cancelled" # 已取消

查询任务状态

curl http://localhost:8000/api/subagents/abc-123-def-456

响应:

{
"task_id": "abc-123-def-456",
"label": "日志分析",
"message": "分析 logs/app.log 文件,找出所有错误",
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "running",
"progress": 45,
"result": null,
"error": null,
"created_at": "2024-01-15T10:30:00Z",
"started_at": "2024-01-15T10:30:01Z",
"completed_at": null,
"tool_call_records": [
{
"tool_call_id": "call_001",
"name": "read_file",
"arguments": {"path": "logs/app.log"},
"result": "2024-01-15 10:00:00 ERROR ...",
"status": "success",
"duration_ms": 150
}
]
}

列出所有任务

curl http://localhost:8000/api/subagents?session_id=550e8400-e29b-41d4-a716-446655440000

响应:

[
{
"task_id": "abc-123",
"label": "日志分析",
"status": "completed",
"progress": 100,
"created_at": "2024-01-15T10:30:00Z"
},
{
"task_id": "def-456",
"label": "数据备份",
"status": "running",
"progress": 60,
"created_at": "2024-01-15T10:35:00Z"
}
]

独立会话隔离

每个子代理拥有独立的执行环境:

# backend/modules/agent/subagent.py
async def _run_task(self, task: SubagentTask) -> None:
# 1. 创建独立的工具注册表
tools = ToolRegistry()
tools.register(ReadFileTool(self.workspace))
tools.register(WriteFileTool(self.workspace))
tools.register(ExecTool(self.workspace))
# 注意:子代理没有 spawn 工具(避免无限递归)

# 2. 构建子代理专用系统提示词
system_prompt = self._build_subagent_prompt(task.message)

# 3. 独立的消息上下文
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task.message},
]

# 4. 独立的 LLM 调用
async for chunk in self.provider.chat_stream(
messages=messages,
tools=tools.get_definitions(),
model=self.model,
):
# 处理响应...

子代理专用系统提示词

def _build_subagent_prompt(self, task: str) -> str:
return f"""# 子代理 (Subagent)

你是主代理创建的子代理,专门负责完成特定任务。

## 你的任务
{task}

## 工作规则
1. **专注任务**: 只完成分配的任务,不做其他事情
2. **简洁高效**: 最终响应会报告给主代理,保持简洁但信息完整
3. **不要闲聊**: 不要发起对话或承担额外任务
4. **彻底完成**: 确保任务完整完成,提供清晰的结果总结

## 可用能力
- 读写工作空间文件
- 执行 Shell 命令
- 网络搜索和抓取网页

## 限制
- 不能直接向用户发送消息(无 message 工具)
- 不能创建其他子代理(无 spawn 工具)
- 无法访问主代理的对话历史

## 完成标准
任务完成后,提供清晰的总结:
- 完成了什么
- 发现了什么
- 遇到的问题(如果有)
- 建议的后续步骤(如果需要)
"""

异步执行机制

# backend/modules/agent/subagent.py
async def execute_task(self, task_id: str) -> None:
"""立即返回,后台执行"""
task = self.tasks.get(task_id)
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()

# 创建异步任务
async_task = asyncio.create_task(self._run_task(task))
self.running_tasks[task_id] = async_task

# 立即返回,不等待完成

结果查询和通知

WebSocket 实时通知

{
"type": "subagent_status",
"task_id": "abc-123",
"status": "running",
"progress": 45,
"message": "正在分析日志文件..."
}
{
"type": "subagent_complete",
"task_id": "abc-123",
"status": "completed",
"result": "分析完成。共发现 15 个错误:\n1. NullPointerException (8次)\n2. IOException (5次)\n3. TimeoutException (2次)"
}

轮询查询

# 每 5 秒查询一次
while true; do
curl http://localhost:8000/api/subagents/abc-123
sleep 5
done

多智能体协作

主代理可以创建多个子代理并行执行:

# 用户: "帮我同时分析 3 个日志文件"

# LLM 调用 3 次 spawn 工具
spawn(label="分析 app.log", message="分析 logs/app.log")
spawn(label="分析 error.log", message="分析 logs/error.log")
spawn(label="分析 access.log", message="分析 logs/access.log")

# 3 个子代理并行执行
# 主代理可以继续对话
# 完成后分别通知

任务取消

curl -X POST http://localhost:8000/api/subagents/abc-123/cancel

响应:

{
"success": true,
"task_id": "abc-123",
"status": "cancelled"
}

数据库持久化

子代理任务自动保存到数据库:

CREATE TABLE tasks (
id VARCHAR(36) PRIMARY KEY,
label VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
session_id VARCHAR(36),
status VARCHAR(20) NOT NULL,
progress INTEGER DEFAULT 0,
result TEXT,
error TEXT,
created_at TIMESTAMP NOT NULL,
started_at TIMESTAMP,
completed_at TIMESTAMP,
tool_call_records TEXT -- JSON 格式
);

CountBot 特色功能

  1. 真正的异步执行:子代理在后台运行,主对话不阻塞
  2. 独立执行环境:每个子代理有独立的工具和上下文
  3. 实时进度通知:WebSocket 推送任务状态和进度
  4. 持久化存储:任务状态保存到数据库,重启后可恢复
  5. 工具调用记录:完整记录子代理的所有工具调用
  6. 并行执行:支持多个子代理同时运行
  7. 安全隔离:子代理无法创建新的子代理,避免无限递归

5. 文件操作流程

场景描述

用户请求"读取 config.json 文件并修改端口号为 8080",AI 使用文件工具读取、修改并保存文件。

文件工具概览

CountBot 提供 4 个核心文件操作工具:

工具功能安全特性
read_file读取文件内容工作空间隔离、路径验证
write_file写入文件内容工作空间隔离、路径验证、审计日志
edit_file编辑文件(查找替换)工作空间隔离、原子操作
list_dir列出目录内容工作空间隔离、递归限制

HTTP 请求示例

curl -X POST http://localhost:8000/api/chat/send \
-H "Content-Type: application/json" \
-d '{
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "读取 config.json 文件并修改端口号为 8080"
}'

后端处理流程

1. 用户发送消息

2. LLM 分析任务

3. 调用 read_file 工具
{
"tool": "read_file",
"arguments": {
"path": "config.json"
}
}

4. 读取文件内容
{
"port": 3000,
"host": "localhost"
}

5. LLM 分析内容

6. 调用 edit_file 工具
{
"tool": "edit_file",
"arguments": {
"path": "config.json",
"old_text": "\"port\": 3000",
"new_text": "\"port\": 8080"
}
}

7. 执行文件编辑
- 验证路径安全性
- 读取原文件
- 查找并替换
- 原子写入
- 记录审计日志

8. 返回结果
"文件已成功修改"

9. LLM 生成响应
"我已经将 config.json 中的端口号修改为 8080。"

工具定义

read_file 工具

{
"type": "function",
"function": {
"name": "read_file",
"description": "读取文件内容",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件路径(相对于工作空间)"
}
},
"required": ["path"]
}
}
}

write_file 工具

{
"type": "function",
"function": {
"name": "write_file",
"description": "写入文件内容(会覆盖现有文件)",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件路径(相对于工作空间)"
},
"content": {
"type": "string",
"description": "文件内容"
}
},
"required": ["path", "content"]
}
}
}

edit_file 工具

{
"type": "function",
"function": {
"name": "edit_file",
"description": "编辑文件(查找并替换文本)",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件路径"
},
"old_text": {
"type": "string",
"description": "要替换的文本"
},
"new_text": {
"type": "string",
"description": "新文本"
}
},
"required": ["path", "old_text", "new_text"]
}
}
}

工作空间隔离验证

所有文件操作都限制在工作空间内:

# backend/modules/tools/filesystem.py
def _validate_path(self, path: str) -> Path:
"""验证路径安全性"""
# 解析为绝对路径
abs_path = (self.workspace / path).resolve()

# 检查是否在工作空间内
if not str(abs_path).startswith(str(self.workspace.resolve())):
raise ValueError(
f"Path '{path}' is outside workspace. "
f"All file operations must be within the workspace directory."
)

return abs_path

安全示例

# 允许的路径
read_file(path="config.json")
read_file(path="data/users.json")
read_file(path="logs/app.log")

# 拒绝的路径
read_file(path="/etc/passwd") # 绝对路径
read_file(path="../../../etc/passwd") # 路径遍历
read_file(path="~/secrets.txt") # 用户目录

路径安全检查

# backend/modules/tools/filesystem.py
class ReadFileTool(Tool):
async def execute(self, path: str) -> str:
# 1. 验证路径
abs_path = self._validate_path(path)

# 2. 检查文件是否存在
if not abs_path.exists():
return f"Error: File '{path}' not found"

# 3. 检查是否为文件
if not abs_path.is_file():
return f"Error: '{path}' is not a file"

# 4. 读取内容
try:
content = abs_path.read_text(encoding='utf-8')
return content
except UnicodeDecodeError:
return f"Error: File '{path}' is not a text file"

文件审计日志

所有文件写入操作都会记录到审计日志:

# backend/modules/tools/file_audit_logger.py
class FileAuditLogger:
def record_file_operation(
self,
operation: str, # "write" / "edit" / "delete"
path: str,
session_id: str,
content_preview: str = None,
success: bool = True,
):
log_entry = {
"timestamp": datetime.now().isoformat(),
"operation": operation,
"path": path,
"session_id": session_id,
"content_preview": content_preview[:100] if content_preview else None,
"success": success,
}

# 写入审计日志文件
self._append_to_log(log_entry)

审计日志格式

{
"timestamp": "2024-01-15T10:30:00Z",
"operation": "write",
"path": "config.json",
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"content_preview": "{\"port\": 8080, \"host\": \"localhost\"}",
"success": true
}

完整工具调用示例

1. 读取文件

请求:

{
"tool": "read_file",
"arguments": {
"path": "config.json"
}
}

响应:

{
"result": "{\n \"port\": 3000,\n \"host\": \"localhost\",\n \"debug\": false\n}"
}

2. 编辑文件

请求:

{
"tool": "edit_file",
"arguments": {
"path": "config.json",
"old_text": "\"port\": 3000",
"new_text": "\"port\": 8080"
}
}

响应:

{
"result": "文件已成功修改。替换了 1 处匹配项。"
}

3. 写入新文件

请求:

{
"tool": "write_file",
"arguments": {
"path": "output/result.txt",
"content": "处理完成\n共处理 100 条记录"
}
}

响应:

{
"result": "文件已成功写入:output/result.txt (23 bytes)"
}

4. 列出目录

请求:

{
"tool": "list_dir",
"arguments": {
"path": "logs",
"recursive": false
}
}

响应:

{
"result": "logs/\n├── app.log (1.2 MB)\n├── error.log (45 KB)\n└── access.log (3.5 MB)\n\n总计: 3 个文件"
}

原子操作保证

文件编辑使用原子操作,避免数据损坏:

# backend/modules/tools/filesystem.py
async def execute(self, path: str, old_text: str, new_text: str) -> str:
abs_path = self._validate_path(path)

# 1. 读取原文件
original_content = abs_path.read_text(encoding='utf-8')

# 2. 执行替换
if old_text not in original_content:
return f"Error: Text not found in file"

new_content = original_content.replace(old_text, new_text)

# 3. 原子写入(先写临时文件,再重命名)
temp_path = abs_path.with_suffix('.tmp')
temp_path.write_text(new_content, encoding='utf-8')
temp_path.replace(abs_path)

return "文件已成功修改"

错误处理

文件不存在

{
"tool": "read_file",
"error": "Error: File 'config.json' not found",
"suggestion": "请检查文件路径是否正确"
}

路径越界

{
"tool": "read_file",
"error": "Path '/etc/passwd' is outside workspace",
"suggestion": "所有文件操作必须在工作空间内"
}

权限不足

{
"tool": "write_file",
"error": "Permission denied: config.json",
"suggestion": "请检查文件权限"
}

编码错误

{
"tool": "read_file",
"error": "File 'image.png' is not a text file",
"suggestion": "此工具只能读取文本文件"
}

CountBot 特色功能

  1. 工作空间隔离:所有文件操作限制在工作空间内,防止越界访问
  2. 路径验证:自动解析和验证路径,防止路径遍历攻击
  3. 审计日志:完整记录所有文件写入操作,支持追溯
  4. 原子操作:文件编辑使用原子操作,避免数据损坏
  5. 编码检测:自动检测文件编码,支持 UTF-8
  6. 错误友好:详细的错误信息和建议

6. Shell 命令执行

场景描述

用户请求"查看系统磁盘使用情况",AI 使用 exec 工具执行 df -h 命令并返回结果。

HTTP 请求示例

curl -X POST http://localhost:8000/api/chat/send \
-H "Content-Type: application/json" \
-d '{
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "查看系统磁盘使用情况"
}'

后端处理流程

1. 用户发送消息

2. LLM 识别需要执行命令

3. 调用 exec 工具
{
"tool": "exec",
"arguments": {
"command": "df -h"
}
}

4. 安全检查
- 危险命令检测
- 白名单验证(如果启用)
- 工作空间限制

5. 执行命令
- 使用 asyncio.subprocess
- 设置超时(默认 30 秒)
- 捕获 stdout 和 stderr

6. 处理输出
- 截断过长输出(默认 10000 字符)
- 合并 stdout 和 stderr

7. 记录审计日志

8. 返回结果

exec 工具定义

{
"type": "function",
"function": {
"name": "exec",
"description": "执行 Shell 命令",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "要执行的命令"
}
},
"required": ["command"]
}
}
}

沙箱机制

1. 工作目录隔离

# backend/modules/tools/shell.py
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(self.workspace), # 限制在工作空间
)

2. 环境变量清理

# 只保留必要的环境变量
safe_env = {
'PATH': os.environ.get('PATH', ''),
'HOME': str(self.workspace),
'USER': 'countbot',
}

process = await asyncio.create_subprocess_shell(
command,
env=safe_env,
...
)

危险命令检测

CountBot 内置危险命令模式库:

# backend/modules/tools/shell.py
DANGEROUS_PATTERNS = [
# 文件系统破坏
r'\brm\s+-rf\s+/',
r'\brm\s+-rf\s+\*',
r'\bdd\b.*if=/dev/zero',
r'>\s*/dev/sd[a-z]',

# 权限提升
r'\b(sudo|su)\b',
r'\bchmod\s+777',

# 网络攻击
r'\b(nmap|masscan|hping3)\b',
r'\bcurl\b.*-X\s+(DELETE|PUT)',

# 系统修改
r'\b(shutdown|reboot|halt)\b',
r'\b(mkfs|fdisk|parted)\b',

# 进程操作
r'\bkill\s+-9\s+1\b', # 杀死 init 进程
r'\bkillall\b',

# 数据泄露
r'/etc/(passwd|shadow)',
r'~/.ssh/',
]

检测逻辑

def _is_dangerous(self, command: str) -> bool:
"""检测命令是否危险"""
for pattern in self.deny_patterns:
if re.search(pattern, command, re.IGNORECASE):
return True
return False

async def execute(self, command: str) -> str:
# 危险命令检测
if not self.allow_dangerous and self._is_dangerous(command):
return (
f"Error: Command blocked for security reasons.\n"
f"Command: {command}\n"
f"Reason: Matches dangerous pattern"
)

# 执行命令...

超时控制

# backend/modules/tools/shell.py
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout # 默认 30 秒
)
except asyncio.TimeoutError:
process.kill()
return f"Error: Command timed out after {self.timeout} seconds"

输出截断

# 防止输出过长
output = stdout.decode('utf-8', errors='replace')
if len(output) > self.max_output_length:
output = output[:self.max_output_length]
output += f"\n\n[输出已截断,超过 {self.max_output_length} 字符]"

命令白名单

可以配置命令白名单,只允许特定命令:

# config.yaml
security:
command_whitelist_enabled: true
custom_allow_patterns:
- '^ls\b'
- '^cat\b'
- '^grep\b'
- '^df\b'
- '^du\b'
- '^ps\b'
- '^top\b'
# backend/modules/tools/shell.py
def _is_allowed(self, command: str) -> bool:
"""检查命令是否在白名单中"""
if not self.allow_patterns:
return True # 未启用白名单

for pattern in self.allow_patterns:
if re.match(pattern, command):
return True

return False

完整执行示例

成功执行

请求:

{
"tool": "exec",
"arguments": {
"command": "df -h"
}
}

响应:

{
"result": "Filesystem Size Used Avail Use% Mounted on\n/dev/sda1 100G 45G 50G 48% /\ntmpfs 8.0G 1.2G 6.8G 15% /tmp"
}

危险命令被阻止

请求:

{
"tool": "exec",
"arguments": {
"command": "rm -rf /"
}
}

响应:

{
"error": "Error: Command blocked for security reasons.\nCommand: rm -rf /\nReason: Matches dangerous pattern"
}

命令超时

请求:

{
"tool": "exec",
"arguments": {
"command": "sleep 60"
}
}

响应(30 秒后):

{
"error": "Error: Command timed out after 30 seconds"
}

命令执行失败

请求:

{
"tool": "exec",
"arguments": {
"command": "cat nonexistent.txt"
}
}

响应:

{
"result": "cat: nonexistent.txt: No such file or directory\n[Exit code: 1]"
}

审计日志

所有命令执行都会记录:

{
"timestamp": "2024-01-15T10:30:00Z",
"tool": "exec",
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"command": "df -h",
"exit_code": 0,
"duration_ms": 150,
"output_length": 256,
"success": true
}

安全配置

# config.yaml
security:
# 是否阻止危险命令
dangerous_commands_blocked: true

# 命令超时(秒)
command_timeout: 30

# 最大输出长度(字符)
max_output_length: 10000

# 是否限制在工作空间
restrict_to_workspace: true

# 自定义拒绝模式
custom_deny_patterns:
- 'curl.*evil\.com'
- 'wget.*malware'

# 是否启用命令白名单
command_whitelist_enabled: false

# 自定义允许模式
custom_allow_patterns:
- '^git\b'
- '^npm\b'
- '^python\b'

# 是否启用审计日志
audit_log_enabled: true

CountBot 特色功能

  1. 多层安全防护:危险命令检测 + 白名单 + 工作空间隔离
  2. 智能超时:可配置超时时间,防止命令卡死
  3. 输出管理:自动截断过长输出,避免内存溢出
  4. 完整审计:记录所有命令执行,支持安全审查
  5. 错误友好:详细的错误信息和退出码
  6. 异步执行:使用 asyncio.subprocess,不阻塞主线程

7. WebSocket 实时通信

场景描述

客户端通过 WebSocket 连接到 CountBot,实时接收流式响应和工具调用通知,无需轮询。

WebSocket 连接建立

连接 URL

ws://localhost:8000/ws/chat

认证(远程访问)

当前 WebSocket 连接不接受 query token。浏览器端通常应先通过 /api/auth/login 获取认证 Cookie,再建立连接;如客户端库支持自定义握手头,也可以传 Authorization: Bearer <token>

// 浏览器端通常直接复用同源认证 Cookie
const ws = new WebSocket('ws://server:8000/ws/chat');

如果客户端库支持自定义握手头,也可以使用:

Authorization: Bearer your_token_here

连接示例(JavaScript)

const ws = new WebSocket('ws://localhost:8000/ws/chat');

ws.onopen = () => {
console.log('WebSocket 已连接');
};

ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('收到消息:', data);
};

ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
};

ws.onclose = () => {
console.log('WebSocket 已断开');
};

会话绑定机制

客户端连接后,需要绑定到特定会话:

// 发送消息(自动绑定会话)
ws.send(JSON.stringify({
type: 'message',
sessionId: '550e8400-e29b-41d4-a716-446655440000',
content: '你好'
}));

后端处理:

# backend/ws/connection.py
async def bind_session(connection_id: str, session_id: str):
"""绑定连接到会话"""
if session_id not in self._session_connections:
self._session_connections[session_id] = set()

self._session_connections[session_id].add(connection_id)

# 增加连接计数
increment_connection_count(session_id)

# 取消延迟取消任务(如果有)
cancel_delayed_cancellation(session_id)

流式 Chunk 推送

服务器推送格式

{
"type": "message_chunk",
"content": "你好"
}

客户端接收

ws.onmessage = (event) => {
const data = JSON.parse(event.data);

if (data.type === 'message_chunk') {
// 追加内容到 UI
appendToChat(data.content);
}
};

完整流式响应示例

{"type": "message_chunk", "content": "你"}
{"type": "message_chunk", "content": "好"}
{"type": "message_chunk", "content": "!"}
{"type": "message_chunk", "content": "我"}
{"type": "message_chunk", "content": "是"}
{"type": "message_chunk", "content": " CountBot"}
{"type": "message_complete", "messageId": "12346"}

工具调用通知

工具调用开始

{
"type": "tool_call",
"tool": "exec",
"arguments": {
"command": "df -h"
},
"messageId": 12345
}

工具调用完成

{
"type": "tool_result",
"tool": "exec",
"result": "Filesystem Size Used Avail Use%\n/dev/sda1 100G 45G 50G 48%",
"messageId": 12345,
"duration": 150
}

客户端处理

ws.onmessage = (event) => {
const data = JSON.parse(event.data);

switch (data.type) {
case 'tool_call':
showToolCallIndicator(data.tool, data.arguments);
break;

case 'tool_result':
hideToolCallIndicator();
showToolResult(data.tool, data.result, data.duration);
break;
}
};

子代理状态通知

子代理启动

{
"type": "subagent_status",
"taskId": "abc-123",
"label": "日志分析",
"status": "running",
"progress": 0,
"message": "子代理已启动"
}

进度更新

{
"type": "subagent_status",
"taskId": "abc-123",
"status": "running",
"progress": 45,
"message": "正在分析日志文件..."
}

子代理完成

{
"type": "subagent_complete",
"taskId": "abc-123",
"status": "completed",
"result": "分析完成。共发现 15 个错误。"
}

子代理失败

{
"type": "subagent_failed",
"taskId": "abc-123",
"status": "failed",
"error": "文件不存在: logs/app.log"
}

错误处理

服务器错误

{
"type": "error",
"message": "Internal server error",
"code": "INTERNAL_ERROR"
}

客户端处理

ws.onmessage = (event) => {
const data = JSON.parse(event.data);

if (data.type === 'error') {
showError(data.message, data.code);
}
};

连接管理

心跳机制

// 客户端每 30 秒发送 ping
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'ping',
sessionId: currentSessionId
}));
}
}, 30000);

自动重连

let reconnectAttempts = 0;
const maxReconnectAttempts = 5;

function connect() {
const ws = new WebSocket('ws://localhost:8000/ws/chat');

ws.onclose = () => {
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);
console.log(`重连中... (${reconnectAttempts}/${maxReconnectAttempts})`);
setTimeout(connect, delay);
}
};

ws.onopen = () => {
reconnectAttempts = 0;
console.log('WebSocket 已连接');
};
}

connect();

连接生命周期

1. 客户端发起连接

2. 服务器接受连接
- 生成 connection_id
- 注册到 ConnectionManager

3. 发送连接成功消息
{"type": "connected"}

4. 客户端发送消息(绑定会话)
{"type": "message", "sessionId": "...", "content": "..."}

5. 服务器绑定会话
- 将 connection_id 添加到会话连接集合
- 增加连接计数

6. 处理消息并推送响应
- 流式内容块
- 工具调用通知
- 子代理状态

7. 客户端断开连接
- 减少连接计数
- 如果连接数为 0,延迟 5 秒后取消任务

8. 清理连接
- 从 ConnectionManager 移除
- 从会话连接集合移除

延迟取消机制

当会话的所有 WebSocket 连接断开时,不会立即取消任务,而是延迟 5 秒:

# backend/ws/connection.py
async def schedule_delayed_cancellation(session_id: str, delay_seconds: int = 5):
"""延迟取消会话任务"""
await asyncio.sleep(delay_seconds)

# 检查是否有新连接
current = _session_tasks.get(session_id)
if current and current.connection_count == 0:
# 执行取消
cancel_session(session_id)

这样可以避免短暂断线导致任务被取消。

完整示例:React 客户端

import { useEffect, useState, useRef } from 'react';

interface Message {
role: 'user' | 'assistant';
content: string;
}

export function useWebSocket(sessionId: string) {
const [messages, setMessages] = useState<Message[]>([]);
const [isConnected, setIsConnected] = useState(false);
const wsRef = useRef<WebSocket | null>(null);
const currentMessageRef = useRef<string>('');

useEffect(() => {
const ws = new WebSocket('ws://localhost:8000/ws/chat');

ws.onopen = () => {
setIsConnected(true);
console.log('WebSocket connected');
};

ws.onmessage = (event) => {
const data = JSON.parse(event.data);

switch (data.type) {
case 'message_chunk':
// 追加内容到当前消息
currentMessageRef.current += data.content;
setMessages((prev) => {
const newMessages = [...prev];
if (newMessages[newMessages.length - 1]?.role === 'assistant') {
newMessages[newMessages.length - 1].content = currentMessageRef.current;
} else {
newMessages.push({
role: 'assistant',
content: currentMessageRef.current,
});
}
return newMessages;
});
break;

case 'message_complete':
// 消息完成,重置缓冲区
currentMessageRef.current = '';
break;

case 'tool_call':
console.log('Tool call:', data.tool, data.arguments);
break;

case 'tool_result':
console.log('Tool result:', data.tool, data.result);
break;

case 'error':
console.error('Error:', data.message);
break;
}
};

ws.onclose = () => {
setIsConnected(false);
console.log('WebSocket disconnected');
};

wsRef.current = ws;

return () => {
ws.close();
};
}, [sessionId]);

const sendMessage = (content: string) => {
if (wsRef.current && isConnected) {
// 添加用户消息到 UI
setMessages((prev) => [...prev, { role: 'user', content }]);

// 发送到服务器
wsRef.current.send(
JSON.stringify({
type: 'message',
sessionId,
content,
})
);
}
};

return { messages, isConnected, sendMessage };
}

CountBot 特色功能

  1. 真正的实时通信:WebSocket 双向通信,无需轮询
  2. 流式推送:内容逐字推送,用户体验流畅
  3. 工具可见性:实时显示工具调用状态,透明化 AI 操作
  4. 子代理通知:后台任务进度实时推送
  5. 智能重连:支持自动重连和延迟取消
  6. 会话隔离:每个会话独立管理连接
  7. 连接计数:引用计数机制,避免误取消

8. 多轮对话和上下文管理

场景描述

用户与 AI 进行多轮对话,系统自动管理对话历史、会话总结和长期记忆。

会话管理

创建会话

curl -X POST http://localhost:8000/api/chat/sessions \
-H "Content-Type: application/json" \
-d '{
"name": "项目讨论"
}'

响应:

{
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "项目讨论",
"created_at": "2024-01-15T10:00:00Z",
"updated_at": "2024-01-15T10:00:00Z",
"summary": null,
"summary_updated_at": null
}

列出所有会话

curl http://localhost:8000/api/chat/sessions?limit=10&offset=0

响应:

[
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "项目讨论",
"created_at": "2024-01-15T10:00:00Z",
"updated_at": "2024-01-15T10:30:00Z",
"summary": "讨论了项目架构和技术选型",
"summary_updated_at": "2024-01-15T10:25:00Z"
},
{
"id": "660e8400-e29b-41d4-a716-446655440001",
"name": "日常闲聊",
"created_at": "2024-01-14T15:00:00Z",
"updated_at": "2024-01-14T15:30:00Z",
"summary": null,
"summary_updated_at": null
}
]

对话历史存储

数据库模型

-- 会话表
CREATE TABLE sessions (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
summary TEXT,
summary_updated_at TIMESTAMP,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);

-- 消息表
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id VARCHAR(36) NOT NULL,
role VARCHAR(20) NOT NULL, -- 'user' / 'assistant'
content TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(id)
);

-- 工具调用记录表
CREATE TABLE tool_conversations (
id VARCHAR(36) PRIMARY KEY,
session_id VARCHAR(36) NOT NULL,
message_id INTEGER,
tool_name VARCHAR(100) NOT NULL,
arguments TEXT NOT NULL, -- JSON
result TEXT,
error TEXT,
duration_ms INTEGER,
timestamp TIMESTAMP NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(id),
FOREIGN KEY (message_id) REFERENCES messages(id)
);

获取消息历史

curl http://localhost:8000/api/chat/sessions/550e8400-e29b-41d4-a716-446655440000/messages

响应:

[
{
"id": 1,
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"role": "user",
"content": "你好",
"created_at": "2024-01-15T10:00:00Z",
"tool_calls": []
},
{
"id": 2,
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"role": "assistant",
"content": "你好!有什么我可以帮助你的吗?",
"created_at": "2024-01-15T10:00:05Z",
"tool_calls": []
},
{
"id": 3,
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"role": "user",
"content": "查询北京天气",
"created_at": "2024-01-15T10:01:00Z",
"tool_calls": []
},
{
"id": 4,
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"role": "assistant",
"content": "北京今天天气晴朗,温度 15°C。",
"created_at": "2024-01-15T10:01:10Z",
"tool_calls": [
{
"id": "tc_001",
"name": "exec",
"arguments": {"command": "curl wttr.in/Beijing?format=3"},
"result": "Beijing: Clear, +15°C",
"status": "success",
"duration": 1250
}
]
}
]

上下文窗口管理

滚动窗口机制

# backend/modules/config/schema.py
class PersonaConfig(BaseModel):
max_history_messages: int = 20 # 最大历史消息条数
# -1 表示不限制

获取带限制的历史

# backend/modules/session/manager.py
async def get_history_with_summary(
self,
session_id: str,
limit: int | None = None
) -> list[dict]:
"""获取会话历史(包含总结)"""
# 1. 获取会话总结
session = await self.get_session(session_id)
summary = session.summary if session else None

# 2. 获取最近的消息
messages = await self.get_messages(
session_id=session_id,
limit=limit
)

# 3. 构建上下文
context = []

# 如果有总结,添加到开头
if summary:
context.append({
"role": "system",
"content": f"[会话总结] {summary}"
})

# 添加消息历史
for msg in messages:
context.append({
"role": msg.role,
"content": msg.content
})

return context

会话总结机制

手动总结

curl -X POST http://localhost:8000/api/chat/sessions/550e8400-e29b-41d4-a716-446655440000/summarize

响应:

{
"success": true,
"summary": "用户询问了北京天气,AI 通过执行命令查询并返回了天气信息(晴朗,15°C)。",
"message": "已保存到记忆第 42 行(共 42 条)"
}

自动总结触发条件

# backend/api/chat.py
_AUTO_SUMMARIZE_MESSAGE_THRESHOLD = 30 # 消息数阈值
_AUTO_SUMMARIZE_CHAR_THRESHOLD = 15000 # 字符数阈值

async def _maybe_auto_summarize(
session_id: str,
session_manager: SessionManager,
agent_loop: AgentLoop,
) -> None:
"""检查是否需要自动总结"""
messages = await session_manager.get_messages(session_id=session_id)

msg_count = len(messages)
total_chars = sum(len(msg.content or "") for msg in messages)

# 检查是否达到阈值
if msg_count >= 30 or total_chars >= 15000:
# 触发自动总结
...

滚动窗口溢出总结

当历史消息超过限制时,自动将旧消息总结到记忆系统:

# backend/modules/session/manager.py
async def summarize_overflow(
self,
session_id: str,
max_history: int,
provider,
model: str,
memory_store,
) -> None:
"""总结溢出的历史消息"""
messages = await self.get_messages(session_id=session_id)

if len(messages) <= max_history:
return # 未溢出

# 获取溢出的消息
overflow_messages = messages[:-max_history]

# 使用 LLM 总结
summary = await self._summarize_messages(
overflow_messages,
provider,
model
)

# 保存到记忆系统
memory_store.append_entry(
source="web-chat",
content=summary
)

# 删除溢出的消息
for msg in overflow_messages:
await self.delete_message(msg.id)

记忆系统集成

记忆文件格式(MEMORY.md)

# 长期记忆

## 2024-01-15

### [web-chat] 10:30
用户询问了北京天气,AI 通过执行命令查询并返回了天气信息(晴朗,15°C)。

### [dingtalk] 14:20
用户请求分析日志文件,AI 创建了子代理后台执行任务,发现了 15 个错误。

## 2024-01-14

### [web-chat] 16:45
讨论了项目架构,决定使用 FastAPI + React 技术栈。

写入记忆

# backend/modules/agent/memory.py
class MemoryStore:
def append_entry(self, source: str, content: str) -> int:
"""追加记忆条目"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")

entry = f"\n### [{source}] {timestamp}\n{content}\n"

with open(self.memory_file, 'a', encoding='utf-8') as f:
f.write(entry)

return self.get_line_count()

读取记忆

def read_recent(self, limit: int = 10) -> str:
"""读取最近的记忆条目"""
if not self.memory_file.exists():
return ""

content = self.memory_file.read_text(encoding='utf-8')

# 按条目分割
entries = re.split(r'\n### ', content)

# 返回最近的 N 条
recent = entries[-limit:]

return '\n### '.join(recent)

上下文构建流程

# backend/modules/agent/context.py
class ContextBuilder:
def build_messages(
self,
history: list[dict],
current_message: str,
media: list[str] | None = None,
channel: str | None = None,
chat_id: str | None = None,
) -> list[dict]:
"""构建完整的 LLM 上下文"""
messages = []

# 1. 系统提示词
system_prompt = self.build_system_prompt()
messages.append({
"role": "system",
"content": system_prompt
})

# 2. 长期记忆(最近 10 条)
recent_memory = self.memory.read_recent(limit=10)
if recent_memory:
messages.append({
"role": "system",
"content": f"[长期记忆]\n{recent_memory}"
})

# 3. 会话历史(包含总结)
messages.extend(history)

# 4. 当前消息
messages.append({
"role": "user",
"content": current_message
})

return messages

完整对话示例

第 1 轮

用户:

你好

上下文:

[
{"role": "system", "content": "你是 CountBot..."},
{"role": "user", "content": "你好"}
]

AI:

你好!有什么我可以帮助你的吗?

第 2 轮

用户:

查询北京天气

上下文:

[
{"role": "system", "content": "你是 CountBot..."},
{"role": "user", "content": "你好"},
{"role": "assistant", "content": "你好!有什么我可以帮助你的吗?"},
{"role": "user", "content": "查询北京天气"}
]

AI(调用工具):

北京今天天气晴朗,温度 15°C。

第 3 轮(30 轮后,有会话总结)

用户:

还记得我们之前讨论的天气吗?

上下文:

[
{"role": "system", "content": "你是 CountBot..."},
{"role": "system", "content": "[会话总结] 用户询问了北京天气..."},
{"role": "user", "content": "还记得我们之前讨论的天气吗?"}
]

AI:

记得的!你之前询问了北京的天气,当时是晴朗的,温度 15°C。

CountBot 特色功能

  1. 智能上下文管理:自动限制历史消息,避免 token 浪费
  2. 会话总结:压缩历史对话,保留关键信息
  3. 滚动窗口溢出总结:旧消息自动总结到长期记忆
  4. 自动记忆触发:达到阈值自动保存到记忆系统
  5. 多层记忆:会话总结 + 长期记忆 + 最近历史
  6. 工具调用记录:完整保存工具调用历史,支持追溯
  7. 灵活配置:可配置历史消息条数、总结阈值等

总结

CountBot 的 HTTP API 机制具有以下核心特点:

1. 架构设计

  • FastAPI 框架:现代化的异步 Web 框架
  • SSE 流式响应:实时推送 AI 生成内容
  • WebSocket 双向通信:工具调用和子代理通知
  • 数据库持久化:SQLite 存储会话和消息

2. 核心功能

  • Agent Loop:智能循环处理,自动调用工具
  • 工具系统:可扩展的工具注册表,支持文件、Shell、Web 等
  • 技能系统:渐进式加载,支持自定义技能
  • 子代理:真正的异步后台任务,独立执行环境

3. 安全特性

  • 工作空间隔离:所有文件操作限制在工作空间内
  • 危险命令检测:内置危险模式库,可配置白名单
  • 审计日志:完整记录所有工具调用和文件操作
  • 超时控制:防止命令卡死和资源耗尽

4. 用户体验

  • 流式响应:逐字推送,体验流畅
  • 实时通知:WebSocket 推送工具调用和子代理状态
  • 智能上下文:自动管理历史消息和会话总结
  • 错误友好:详细的错误信息和建议

5. 可扩展性

  • 插件化工具:易于添加新工具
  • 技能系统:支持自定义技能和脚本
  • 多渠道支持:Web、钉钉、Telegram 等
  • 多智能体协作:支持并行子代理

CountBot 的 HTTP API 设计充分体现了现代 AI Agent 系统的最佳实践,为开发者提供了强大而灵活的接口。