Agent 架构设计
概述
LLM Agent 是能够自主规划、执行任务并与外部工具交互的智能系统。本章深入讲解 Agent 的核心架构、规划算法、工具调用机制和记忆系统设计。
Agent 架构概览
核心组件
┌─────────────────────────────────────────────────────────────────────────────┐
│ LLM Agent Architecture │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ User │ │
│ │ Input │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ Agent Core │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ Planner │ │ │
│ │ │ │ │ │
│ │ │ Task ──▶ [Decompose] ──▶ [Plan] ──▶ [Select Action] │ │ │
│ │ │ │ │ │
│ │ └──────────────────────────────┬──────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌──────────────────────────────┴──────────────────────────────────┐ │ │
│ │ │ Executor │ │ │
│ │ │ │ │ │
│ │ │ Action ──▶ [Tool Selection] ──▶ [Execution] ──▶ [Observe] │ │ │
│ │ │ │ │ │
│ │ └──────────────────────────────┬──────────────────────────────────┘ │ │
│ └─────────────────────────────────┼─────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────┼──────────────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Memory │ │ Tools │ │ Knowledge │ │
│ │ System │ │ Library │ │ Base │ │
│ │ │ │ │ │ │ │
│ │ - Short-term│ │ - Search │ │ - RAG │ │
│ │ - Long-term │ │ - Code Exec │ │ - Vector DB │ │
│ │ - Episodic │ │ - API Call │ │ - KG │ │
│ │ - Semantic │ │ - Browser │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Agent 类型对比
┌─────────────────────────────────────────────────────────────────────────────┐
│ Agent Types Comparison │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. ReAct Agent (Reasoning + Acting) │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ Thought ──▶ Action ──▶ Observation ──▶ Thought ──▶ ... │ │
│ │ │ │
│ │ 特点: 思考与行动交替,透明可解释 │ │
│ │ 适用: 复杂推理任务,需要可解释性 │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ 2. Plan-and-Execute Agent │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ Plan [Step1, Step2, ...] ──▶ Execute Step1 ──▶ Execute Step2 ──▶ ...│ │
│ │ │ │
│ │ 特点: 先规划后执行,适合长任务 │ │
│ │ 适用: 多步骤复杂任务 │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ 3. Function Calling Agent │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ Query ──▶ LLM (with tools) ──▶ Function Call ──▶ Result ──▶ Answer │ │
│ │ │ │
│ │ 特点: 依赖模型原生能力,调用简洁 │ │
│ │ 适用: OpenAI/Claude 等支持 Function Calling 的模型 │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ 4. Multi-Agent System │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Agent A │ ───▶ │ Agent B │ ───▶ │ Agent C │ │ │
│ │ │(Planner)│ │(Executor)│ │(Reviewer)│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ │ 特点: 多Agent协作,专业化分工 │ │
│ │ 适用: 复杂项目,需要多种专业能力 │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
核心组件实现
工具系统
"""
工具系统实现
"""
from typing import Any, Dict, List, Callable, Optional, Union
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import json
import inspect
@dataclass
class ToolParameter:
"""工具参数定义"""
name: str
type: str
description: str
required: bool = True
default: Any = None
enum: List[Any] = None
@dataclass
class Tool:
"""工具定义"""
name: str
description: str
parameters: List[ToolParameter]
function: Callable
returns: str = "string"
examples: List[Dict] = field(default_factory=list)
def to_openai_schema(self) -> Dict:
"""转换为 OpenAI Function Calling 格式"""
properties = {}
required = []
for param in self.parameters:
prop = {
"type": param.type,
"description": param.description
}
if param.enum:
prop["enum"] = param.enum
properties[param.name] = prop
if param.required:
required.append(param.name)
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": properties,
"required": required
}
}
}
def to_anthropic_schema(self) -> Dict:
"""转换为 Anthropic Tool Use 格式"""
properties = {}
required = []
for param in self.parameters:
properties[param.name] = {
"type": param.type,
"description": param.description
}
if param.required:
required.append(param.name)
return {
"name": self.name,
"description": self.description,
"input_schema": {
"type": "object",
"properties": properties,
"required": required
}
}
async def execute(self, **kwargs) -> Any:
"""执行工具"""
# 验证参数
self._validate_params(kwargs)
# 执行
if inspect.iscoroutinefunction(self.function):
result = await self.function(**kwargs)
else:
result = self.function(**kwargs)
return result
def _validate_params(self, kwargs: Dict):
"""验证参数"""
for param in self.parameters:
if param.required and param.name not in kwargs:
raise ValueError(f"Missing required parameter: {param.name}")
class ToolRegistry:
"""工具注册表"""
def __init__(self):
self.tools: Dict[str, Tool] = {}
def register(self, tool: Tool):
"""注册工具"""
self.tools[tool.name] = tool
def register_function(
self,
name: str = None,
description: str = None
):
"""装饰器方式注册工具"""
def decorator(func: Callable):
tool_name = name or func.__name__
tool_desc = description or func.__doc__ or ""
# 从函数签名提取参数
sig = inspect.signature(func)
params = []
for param_name, param in sig.parameters.items():
param_type = "string"
if param.annotation != inspect.Parameter.empty:
if param.annotation == int:
param_type = "integer"
elif param.annotation == float:
param_type = "number"
elif param.annotation == bool:
param_type = "boolean"
elif param.annotation == list:
param_type = "array"
params.append(ToolParameter(
name=param_name,
type=param_type,
description="",
required=param.default == inspect.Parameter.empty
))
tool = Tool(
name=tool_name,
description=tool_desc,
parameters=params,
function=func
)
self.register(tool)
return func
return decorator
def get(self, name: str) -> Optional[Tool]:
return self.tools.get(name)
def list_tools(self) -> List[Tool]:
return list(self.tools.values())
def get_schemas(self, format: str = "openai") -> List[Dict]:
"""获取所有工具的 schema"""
schemas = []
for tool in self.tools.values():
if format == "openai":
schemas.append(tool.to_openai_schema())
elif format == "anthropic":
schemas.append(tool.to_anthropic_schema())
return schemas
# 内置工具实现
class BuiltinTools:
"""内置工具集"""
@staticmethod
def create_search_tool(search_engine) -> Tool:
"""创建搜索工具"""
async def search(query: str, num_results: int = 5) -> str:
results = await search_engine.search(query, num_results)
return json.dumps(results, ensure_ascii=False)
return Tool(
name="web_search",
description="Search the web for information. Use this when you need current information or facts.",
parameters=[
ToolParameter("query", "string", "The search query"),
ToolParameter("num_results", "integer", "Number of results to return", required=False)
],
function=search
)
@staticmethod
def create_calculator_tool() -> Tool:
"""创建计算器工具"""
def calculate(expression: str) -> str:
try:
# 安全执行数学表达式
allowed_names = {
'abs': abs, 'round': round, 'min': min, 'max': max,
'sum': sum, 'pow': pow, 'sqrt': __import__('math').sqrt,
'sin': __import__('math').sin, 'cos': __import__('math').cos,
'tan': __import__('math').tan, 'log': __import__('math').log,
'pi': __import__('math').pi, 'e': __import__('math').e
}
result = eval(expression, {"__builtins__": {}}, allowed_names)
return str(result)
except Exception as e:
return f"Error: {str(e)}"
return Tool(
name="calculator",
description="Perform mathematical calculations. Input should be a valid mathematical expression.",
parameters=[
ToolParameter("expression", "string", "Mathematical expression to evaluate")
],
function=calculate
)
@staticmethod
def create_code_executor_tool(sandbox_config: Dict = None) -> Tool:
"""创建代码执行工具"""
async def execute_code(code: str, language: str = "python") -> str:
# 使用沙箱执行代码
import subprocess
import tempfile
import os
if language == "python":
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
temp_file = f.name
try:
result = subprocess.run(
['python', temp_file],
capture_output=True,
text=True,
timeout=30
)
output = result.stdout if result.returncode == 0 else result.stderr
return output[:2000] # 限制输出长度
finally:
os.unlink(temp_file)
else:
return f"Unsupported language: {language}"
return Tool(
name="code_executor",
description="Execute code and return the output. Use for calculations, data processing, etc.",
parameters=[
ToolParameter("code", "string", "The code to execute"),
ToolParameter("language", "string", "Programming language (default: python)", required=False)
],
function=execute_code
)
ReAct Agent 实现
"""
ReAct Agent 实现
"""
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import re
class AgentState(Enum):
THINKING = "thinking"
ACTING = "acting"
OBSERVING = "observing"
FINISHED = "finished"
ERROR = "error"
@dataclass
class AgentStep:
"""Agent 执行步骤"""
step_num: int
thought: str
action: Optional[str] = None
action_input: Optional[Dict] = None
observation: Optional[str] = None
state: AgentState = AgentState.THINKING
@dataclass
class AgentResult:
"""Agent 执行结果"""
success: bool
output: str
steps: List[AgentStep]
total_tokens: int = 0
error: Optional[str] = None
class ReActAgent:
"""ReAct Agent"""
def __init__(
self,
llm,
tools: ToolRegistry,
max_steps: int = 10,
verbose: bool = True
):
self.llm = llm
self.tools = tools
self.max_steps = max_steps
self.verbose = verbose
async def run(self, task: str) -> AgentResult:
"""执行任务"""
steps = []
scratchpad = ""
system_prompt = self._build_system_prompt()
for step_num in range(1, self.max_steps + 1):
# 构建提示
user_prompt = self._build_user_prompt(task, scratchpad)
# 调用 LLM
response = await self.llm.chat([
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
])
# 解析响应
thought, action, action_input = self._parse_response(response)
step = AgentStep(
step_num=step_num,
thought=thought,
action=action,
action_input=action_input,
state=AgentState.THINKING
)
if self.verbose:
print(f"\n--- Step {step_num} ---")
print(f"Thought: {thought}")
# 检查是否完成
if action == "Final Answer":
step.state = AgentState.FINISHED
steps.append(step)
return AgentResult(
success=True,
output=action_input.get("answer", ""),
steps=steps
)
# 执行动作
if action:
step.state = AgentState.ACTING
if self.verbose:
print(f"Action: {action}")
print(f"Action Input: {action_input}")
tool = self.tools.get(action)
if tool:
try:
observation = await tool.execute(**action_input)
step.observation = observation
step.state = AgentState.OBSERVING
if self.verbose:
print(f"Observation: {observation[:500]}...")
except Exception as e:
observation = f"Error executing tool: {str(e)}"
step.observation = observation
step.state = AgentState.ERROR
else:
observation = f"Unknown tool: {action}"
step.observation = observation
step.state = AgentState.ERROR
# 更新 scratchpad
scratchpad += f"\nThought: {thought}\nAction: {action}\nAction Input: {json.dumps(action_input)}\nObservation: {observation}\n"
steps.append(step)
# 达到最大步数
return AgentResult(
success=False,
output="Max steps reached without finding an answer",
steps=steps,
error="Max steps exceeded"
)
def _build_system_prompt(self) -> str:
"""构建系统提示"""
tool_descriptions = []
for tool in self.tools.list_tools():
params = ", ".join([f"{p.name}: {p.type}" for p in tool.parameters])
tool_descriptions.append(f"- {tool.name}({params}): {tool.description}")
tools_text = "\n".join(tool_descriptions)
return f"""You are a helpful AI assistant that can use tools to accomplish tasks.
Available Tools:
{tools_text}
You should follow the ReAct format:
Thought: [Your reasoning about what to do next]
Action: [The tool to use, or "Final Answer" if you have the answer]
Action Input: {{"param1": "value1", "param2": "value2"}}
When you have gathered enough information, use:
Action: Final Answer
Action Input: {{"answer": "Your final answer here"}}
Important:
- Always think step by step
- Use tools when needed to gather information
- Be concise in your thoughts
- Provide a final answer when you have sufficient information"""
def _build_user_prompt(self, task: str, scratchpad: str) -> str:
"""构建用户提示"""
prompt = f"Task: {task}\n"
if scratchpad:
prompt += f"\nPrevious steps:\n{scratchpad}\n"
prompt += "\nWhat should you do next?"
return prompt
def _parse_response(self, response: str) -> Tuple[str, Optional[str], Optional[Dict]]:
"""解析 LLM 响应"""
thought = ""
action = None
action_input = None
# 提取 Thought
thought_match = re.search(r'Thought:\s*(.+?)(?=Action:|$)', response, re.DOTALL)
if thought_match:
thought = thought_match.group(1).strip()
# 提取 Action
action_match = re.search(r'Action:\s*(.+?)(?=Action Input:|$)', response, re.DOTALL)
if action_match:
action = action_match.group(1).strip()
# 提取 Action Input
input_match = re.search(r'Action Input:\s*(\{.+?\})', response, re.DOTALL)
if input_match:
try:
action_input = json.loads(input_match.group(1))
except json.JSONDecodeError:
action_input = {"raw": input_match.group(1)}
return thought, action, action_input
class PlanAndExecuteAgent:
"""Plan-and-Execute Agent"""
def __init__(
self,
llm,
tools: ToolRegistry,
max_replans: int = 3
):
self.llm = llm
self.tools = tools
self.max_replans = max_replans
self.executor = ReActAgent(llm, tools, max_steps=5)
async def run(self, task: str) -> AgentResult:
"""执行任务"""
all_steps = []
# 1. 生成计划
plan = await self._create_plan(task)
print(f"Generated Plan:\n{self._format_plan(plan)}")
# 2. 执行计划
completed_steps = []
results = []
for i, step in enumerate(plan):
print(f"\n=== Executing Step {i+1}: {step} ===")
# 执行单步
step_result = await self.executor.run(
f"Complete this step: {step}\n\nContext from previous steps:\n{self._format_results(results)}"
)
all_steps.extend(step_result.steps)
if step_result.success:
completed_steps.append(step)
results.append({
"step": step,
"result": step_result.output
})
else:
# 尝试重新规划
print(f"Step failed, attempting to replan...")
new_plan = await self._replan(
task,
completed_steps,
step,
step_result.error
)
if new_plan:
plan = new_plan
continue
else:
return AgentResult(
success=False,
output=f"Failed at step: {step}",
steps=all_steps,
error=step_result.error
)
# 3. 综合结果
final_answer = await self._synthesize(task, results)
return AgentResult(
success=True,
output=final_answer,
steps=all_steps
)
async def _create_plan(self, task: str) -> List[str]:
"""创建执行计划"""
tool_names = [t.name for t in self.tools.list_tools()]
prompt = f"""Create a step-by-step plan to accomplish the following task.
Each step should be a clear, actionable item.
Available tools: {', '.join(tool_names)}
Task: {task}
Output the plan as a numbered list:
1. [First step]
2. [Second step]
...
Keep the plan concise (3-7 steps)."""
response = await self.llm.chat([{"role": "user", "content": prompt}])
# 解析计划
steps = []
for line in response.strip().split('\n'):
line = line.strip()
if line and line[0].isdigit():
# 移除数字前缀
step = re.sub(r'^\d+[\.\)]\s*', '', line)
if step:
steps.append(step)
return steps
async def _replan(
self,
task: str,
completed_steps: List[str],
failed_step: str,
error: str
) -> Optional[List[str]]:
"""重新规划"""
prompt = f"""The original plan failed. Create a new plan to complete the task.
Task: {task}
Completed steps:
{chr(10).join(f'- {s}' for s in completed_steps)}
Failed step: {failed_step}
Error: {error}
Create a revised plan to complete the remaining work:"""
response = await self.llm.chat([{"role": "user", "content": prompt}])
# 解析新计划
steps = []
for line in response.strip().split('\n'):
line = line.strip()
if line and line[0].isdigit():
step = re.sub(r'^\d+[\.\)]\s*', '', line)
if step:
steps.append(step)
return steps if steps else None
async def _synthesize(self, task: str, results: List[Dict]) -> str:
"""综合结果"""
results_text = self._format_results(results)
prompt = f"""Based on the following completed steps and their results, provide a final answer to the task.
Task: {task}
Completed Steps and Results:
{results_text}
Final Answer:"""
response = await self.llm.chat([{"role": "user", "content": prompt}])
return response.strip()
def _format_plan(self, plan: List[str]) -> str:
return "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
def _format_results(self, results: List[Dict]) -> str:
if not results:
return "No previous results."
text = []
for i, r in enumerate(results):
text.append(f"Step {i+1}: {r['step']}")
text.append(f"Result: {r['result']}\n")
return "\n".join(text)
记忆系统
"""
Agent 记忆系统
"""
from typing import List, Dict, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
from abc import ABC, abstractmethod
import json
import numpy as np
@dataclass
class Memory:
"""记忆项"""
id: str
content: str
type: str # "conversation", "fact", "task", "reflection"
timestamp: datetime
metadata: Dict = field(default_factory=dict)
embedding: Optional[List[float]] = None
importance: float = 0.5
@dataclass
class ConversationTurn:
"""对话轮次"""
role: str
content: str
timestamp: datetime
metadata: Dict = field(default_factory=dict)
class MemoryStore(ABC):
"""记忆存储基类"""
@abstractmethod
def add(self, memory: Memory) -> str:
pass
@abstractmethod
def get(self, memory_id: str) -> Optional[Memory]:
pass
@abstractmethod
def search(self, query: str, top_k: int = 10) -> List[Memory]:
pass
@abstractmethod
def delete(self, memory_id: str) -> bool:
pass
class ShortTermMemory:
"""短期记忆 - 对话上下文"""
def __init__(self, max_turns: int = 20, max_tokens: int = 4000):
self.max_turns = max_turns
self.max_tokens = max_tokens
self.conversation: List[ConversationTurn] = []
def add_turn(self, role: str, content: str, metadata: Dict = None):
"""添加对话轮次"""
turn = ConversationTurn(
role=role,
content=content,
timestamp=datetime.now(),
metadata=metadata or {}
)
self.conversation.append(turn)
# 保持在限制内
self._trim()
def get_context(self, last_n: int = None) -> List[Dict]:
"""获取对话上下文"""
turns = self.conversation[-last_n:] if last_n else self.conversation
return [
{"role": t.role, "content": t.content}
for t in turns
]
def get_summary(self, llm=None) -> str:
"""获取对话摘要"""
if not self.conversation:
return ""
if llm:
# 使用 LLM 生成摘要
context = self.get_context()
prompt = f"""Summarize the following conversation in 2-3 sentences:
{json.dumps(context, indent=2)}
Summary:"""
return llm.complete(prompt)
else:
# 简单摘要
return f"Conversation with {len(self.conversation)} turns."
def clear(self):
"""清空记忆"""
self.conversation = []
def _trim(self):
"""裁剪记忆"""
# 按轮次裁剪
if len(self.conversation) > self.max_turns:
self.conversation = self.conversation[-self.max_turns:]
# 按 token 裁剪(简化估算)
total_chars = sum(len(t.content) for t in self.conversation)
while total_chars > self.max_tokens * 4 and len(self.conversation) > 1:
self.conversation.pop(0)
total_chars = sum(len(t.content) for t in self.conversation)
class LongTermMemory:
"""长期记忆 - 向量存储"""
def __init__(
self,
vector_store,
embedding_model,
importance_threshold: float = 0.3
):
self.vector_store = vector_store
self.embedding_model = embedding_model
self.importance_threshold = importance_threshold
def store(self, content: str, type: str, metadata: Dict = None) -> str:
"""存储记忆"""
# 计算重要性
importance = self._compute_importance(content)
if importance < self.importance_threshold:
return None # 不够重要,不存储
# 计算嵌入
embedding = self.embedding_model.encode(content)
memory = Memory(
id=self._generate_id(),
content=content,
type=type,
timestamp=datetime.now(),
metadata=metadata or {},
embedding=embedding.tolist(),
importance=importance
)
# 存储到向量库
self.vector_store.add(
ids=[memory.id],
embeddings=[memory.embedding],
contents=[memory.content],
metadatas=[{
"type": memory.type,
"timestamp": memory.timestamp.isoformat(),
"importance": memory.importance,
**memory.metadata
}]
)
return memory.id
def recall(
self,
query: str,
top_k: int = 5,
type_filter: str = None,
recency_weight: float = 0.2
) -> List[Memory]:
"""回忆相关记忆"""
# 向量检索
query_embedding = self.embedding_model.encode(query)
filters = {}
if type_filter:
filters["type"] = type_filter
results = self.vector_store.search(
query_embedding.tolist(),
top_k=top_k * 2, # 检索更多,后续过滤
filters=filters
)
# 结合时间衰减重新排序
scored_results = []
now = datetime.now()
for result in results:
timestamp = datetime.fromisoformat(result.metadata.get("timestamp", now.isoformat()))
hours_ago = (now - timestamp).total_seconds() / 3600
# 时间衰减
recency_score = np.exp(-hours_ago / 168) # 一周半衰期
# 综合得分
final_score = (1 - recency_weight) * result.score + recency_weight * recency_score
scored_results.append((result, final_score))
# 排序
scored_results.sort(key=lambda x: x[1], reverse=True)
# 转换为 Memory 对象
memories = []
for result, score in scored_results[:top_k]:
memories.append(Memory(
id=result.chunk_id,
content=result.content,
type=result.metadata.get("type", "unknown"),
timestamp=datetime.fromisoformat(result.metadata.get("timestamp")),
metadata=result.metadata,
importance=result.metadata.get("importance", 0.5)
))
return memories
def _compute_importance(self, content: str) -> float:
"""计算记忆重要性"""
# 简单启发式规则
importance = 0.5
# 长度因素
if len(content) > 200:
importance += 0.1
if len(content) > 500:
importance += 0.1
# 关键词因素
important_keywords = ["important", "remember", "key", "critical", "决定", "重要", "记住"]
for kw in important_keywords:
if kw.lower() in content.lower():
importance += 0.15
break
# 问题/任务因素
if "?" in content or "任务" in content or "目标" in content:
importance += 0.1
return min(importance, 1.0)
def _generate_id(self) -> str:
import uuid
return str(uuid.uuid4())
class EpisodicMemory:
"""情景记忆 - 任务执行历史"""
def __init__(self, max_episodes: int = 100):
self.episodes: List[Dict] = []
self.max_episodes = max_episodes
def record_episode(
self,
task: str,
steps: List[AgentStep],
result: str,
success: bool
):
"""记录任务执行"""
episode = {
"id": self._generate_id(),
"task": task,
"steps": [
{
"thought": s.thought,
"action": s.action,
"observation": s.observation[:200] if s.observation else None
}
for s in steps
],
"result": result,
"success": success,
"timestamp": datetime.now().isoformat(),
"num_steps": len(steps)
}
self.episodes.append(episode)
# 限制数量
if len(self.episodes) > self.max_episodes:
self.episodes = self.episodes[-self.max_episodes:]
def find_similar_tasks(self, task: str, top_k: int = 3) -> List[Dict]:
"""查找相似任务"""
# 简单关键词匹配
task_words = set(task.lower().split())
scored = []
for ep in self.episodes:
ep_words = set(ep["task"].lower().split())
overlap = len(task_words & ep_words) / max(len(task_words | ep_words), 1)
scored.append((ep, overlap))
scored.sort(key=lambda x: x[1], reverse=True)
return [ep for ep, _ in scored[:top_k]]
def get_success_rate(self, task_type: str = None) -> float:
"""获取成功率"""
if not self.episodes:
return 0.0
relevant = self.episodes
if task_type:
relevant = [ep for ep in self.episodes if task_type.lower() in ep["task"].lower()]
if not relevant:
return 0.0
return sum(1 for ep in relevant if ep["success"]) / len(relevant)
def _generate_id(self) -> str:
import uuid
return str(uuid.uuid4())
class AgentMemory:
"""Agent 综合记忆系统"""
def __init__(
self,
vector_store,
embedding_model,
llm=None
):
self.short_term = ShortTermMemory()
self.long_term = LongTermMemory(vector_store, embedding_model)
self.episodic = EpisodicMemory()
self.llm = llm
def add_interaction(self, role: str, content: str):
"""添加交互"""
self.short_term.add_turn(role, content)
# 重要内容存入长期记忆
if role == "assistant" and len(content) > 100:
self.long_term.store(content, type="response")
def get_context(self, query: str = None) -> Dict:
"""获取相关上下文"""
context = {
"conversation": self.short_term.get_context(),
"relevant_memories": [],
"similar_tasks": []
}
if query:
# 从长期记忆中检索
memories = self.long_term.recall(query, top_k=3)
context["relevant_memories"] = [
{"content": m.content, "type": m.type}
for m in memories
]
# 从情景记忆中查找
similar = self.episodic.find_similar_tasks(query, top_k=2)
context["similar_tasks"] = similar
return context
def reflect(self):
"""反思并提取洞察"""
if not self.llm:
return
# 获取最近对话
recent_context = self.short_term.get_context(last_n=10)
if len(recent_context) < 4:
return
prompt = f"""Based on the following recent conversation, extract key insights or facts that should be remembered:
{json.dumps(recent_context, indent=2)}
List 1-3 key insights (each on a new line):"""
response = self.llm.complete(prompt)
# 存储洞察
for line in response.strip().split('\n'):
line = line.strip()
if line and len(line) > 10:
self.long_term.store(line, type="reflection")
小结
本章深入讲解了 LLM Agent 的核心架构:
- 工具系统:工具定义、注册表、Schema 转换
- ReAct Agent:思考-行动-观察循环
- Plan-and-Execute:先规划后执行的模式
- 记忆系统:短期、长期、情景记忆
下一章我们将探讨 多 Agent 协作,讲解如何构建多 Agent 协同工作的系统。