16-Agent智能体与工具调用
Agent基础概念
什么是Agent
Agent(智能体)是一个能够感知环境、自主决策并采取行动以实现目标的AI系统。与传统的单次问答不同,Agent具有以下特征:
- 自主性:能够独立制定计划和执行任务
- 反应性:能够感知环境变化并作出响应
- 目标导向:围绕特定目标进行决策
- 工具使用:能够调用外部工具和API
- 记忆能力:保持状态和历史信息
Agent的典型应用场景:
- 自动化任务执行(如数据分析、报告生成)
- 复杂问题求解(如数学题、代码调试)
- 多步骤工作流(如旅行规划、研究助手)
- 与外部系统交互(如数据库查询、API调用)
感知-思考-行动循环
Agent的核心工作流程:
感知(Perception) → 思考(Reasoning) → 行动(Action) → 观察(Observation) → ...
具体流程:
- 感知:接收用户输入或环境信息
- 思考:分析当前状态,制定行动计划
- 行动:选择并执行工具或生成回复
- 观察:获取行动结果
- 循环:根据结果决定下一步行动或结束
示例代码:
from typing import List, Dict, Optional
from enum import Enum
class ActionType(Enum):
"""行动类型"""
TOOL_USE = "tool_use"
FINAL_ANSWER = "final_answer"
THINK = "think"
class Action:
"""行动对象"""
def __init__(self, action_type: ActionType, content: str, tool_name: str = None, tool_input: Dict = None):
self.type = action_type
self.content = content
self.tool_name = tool_name
self.tool_input = tool_input
class Observation:
"""观察对象"""
def __init__(self, content: str, success: bool = True):
self.content = content
self.success = success
class Agent:
"""基础Agent实现"""
def __init__(self, llm, tools: List):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.max_iterations = 10
def run(self, task: str) -> str:
"""运行Agent"""
history = []
for i in range(self.max_iterations):
# 1. 感知:当前状态
state = self._build_state(task, history)
# 2. 思考:决定下一步行动
action = self._think(state)
# 3. 行动:执行工具或返回答案
if action.type == ActionType.FINAL_ANSWER:
return action.content
observation = self._act(action)
# 4. 观察:记录结果
history.append({
"action": action,
"observation": observation
})
return "达到最大迭代次数,任务未完成"
def _build_state(self, task: str, history: List) -> str:
"""构建当前状态描述"""
state = f"任务: {task}\n\n"
if history:
state += "历史记录:\n"
for i, step in enumerate(history, 1):
action = step['action']
obs = step['observation']
state += f"{i}. 行动: {action.content}\n"
state += f" 结果: {obs.content}\n\n"
return state
def _think(self, state: str) -> Action:
"""思考下一步行动"""
# 由子类实现
raise NotImplementedError
def _act(self, action: Action) -> Observation:
"""执行行动"""
if action.type == ActionType.TOOL_USE:
tool = self.tools.get(action.tool_name)
if not tool:
return Observation(f"工具 {action.tool_name} 不存在", success=False)
try:
result = tool.run(action.tool_input)
return Observation(str(result), success=True)
except Exception as e:
return Observation(f"工具执行错误: {str(e)}", success=False)
return Observation(action.content)
Agent框架对比
1. AutoGPT
最早的开源Agent项目之一,专注于自主任务执行。
# AutoGPT风格的配置
class AutoGPTConfig:
"""AutoGPT配置"""
def __init__(self):
self.ai_name = "MyAssistant"
self.ai_role = "a helpful AI assistant"
self.ai_goals = [
"帮助用户完成任务",
"提供准确的信息",
"高效地使用工具"
]
self.max_iterations = 10
self.memory_backend = "local"
# AutoGPT的特点:
# - 长期记忆(向量存储)
# - 文件操作能力
# - 网页浏览
# - 代码执行
2. LangChain Agent
LangChain提供的Agent框架,易于集成和扩展。
from langchain.agents import AgentType, initialize_agent, load_tools
from langchain.llms import OpenAI
# 初始化LLM
llm = OpenAI(temperature=0)
# 加载工具
tools = load_tools(["serpapi", "llm-math"], llm=llm)
# 创建Agent
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# 运行
result = agent.run("2023年的世界杯冠军是谁?")
# LangChain Agent类型:
# - ZERO_SHOT_REACT_DESCRIPTION: ReAct模式
# - CONVERSATIONAL_REACT_DESCRIPTION: 对话式ReAct
# - OPENAI_FUNCTIONS: 基于OpenAI Function Calling
# - STRUCTURED_CHAT: 结构化聊天
3. AutoGen
微软开源的多Agent框架,支持Agent间协作。
import autogen
# 配置
config_list = [
{
"model": "gpt-4",
"api_key": "your-api-key"
}
]
# 创建助手Agent
assistant = autogen.AssistantAgent(
name="assistant",
llm_config={"config_list": config_list}
)
# 创建用户代理
user_proxy = autogen.UserProxyAgent(
name="user_proxy",
human_input_mode="NEVER",
max_consecutive_auto_reply=10
)
# 启动对话
user_proxy.initiate_chat(
assistant,
message="帮我计算斐波那契数列的第10项"
)
# AutoGen的特点:
# - 多Agent对话
# - 人机协作
# - 代码执行沙箱
# - 灵活的对话模式
框架对比表:
| 框架 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
| AutoGPT | 完整的自主系统 | 配置复杂,资源消耗大 | 完全自主的任务 |
| LangChain | 易用,工具丰富 | 灵活性有限 | 快速原型开发 |
| AutoGen | 多Agent协作强 | 学习曲线陡峭 | 复杂协作任务 |
| 自建 | 完全可控 | 开发成本高 | 定制化需求 |
ReAct模式
Reasoning + Acting
ReAct(Reasoning and Acting)是一种将推理和行动交织的Agent模式。
核心思想:
Thought → Action → Observation → Thought → Action → ...
ReAct的优势:
- 可解释性强:每一步都有明确的思考过程
- 错误恢复:可以根据观察结果调整策略
- 灵活性高:支持多步推理和工具调用
思维链和工具调用结合
class ReActAgent(Agent):
"""ReAct模式Agent"""
def __init__(self, llm, tools: List):
super().__init__(llm, tools)
self.prompt_template = self._build_prompt_template()
def _build_prompt_template(self) -> str:
"""构建提示模板"""
tools_desc = "\n".join([
f"- {name}: {tool.description}"
for name, tool in self.tools.items()
])
template = f"""你是一个能够使用工具的AI助手。你可以使用以下工具:
{tools_desc}
使用以下格式回答问题:
Question: 用户的问题
Thought: 你应该思考要做什么
Action: 要使用的工具名称,必须是 [{', '.join(self.tools.keys())}] 之一
Action Input: 工具的输入(JSON格式)
Observation: 工具返回的结果
... (这个 Thought/Action/Action Input/Observation 可以重复N次)
Thought: 我现在知道最终答案了
Final Answer: 对用户问题的最终回答
开始!
Question: {{question}}
{{history}}
Thought:"""
return template
def _think(self, state: str) -> Action:
"""ReAct思考过程"""
# 构建prompt
prompt = self.prompt_template.format(
question=state.split("任务: ")[1].split("\n")[0],
history=self._format_history(state)
)
# 调用LLM
response = self.llm.generate(prompt)
# 解析响应
return self._parse_response(response)
def _parse_response(self, response: str) -> Action:
"""解析LLM响应"""
lines = response.strip().split('\n')
action_type = None
content = ""
tool_name = None
tool_input = None
for line in lines:
line = line.strip()
if line.startswith("Thought:"):
content = line[8:].strip()
action_type = ActionType.THINK
elif line.startswith("Action:"):
tool_name = line[7:].strip()
action_type = ActionType.TOOL_USE
elif line.startswith("Action Input:"):
import json
try:
tool_input = json.loads(line[13:].strip())
except:
tool_input = {"input": line[13:].strip()}
elif line.startswith("Final Answer:"):
return Action(
ActionType.FINAL_ANSWER,
line[13:].strip()
)
if action_type == ActionType.TOOL_USE and tool_name:
return Action(
ActionType.TOOL_USE,
content,
tool_name,
tool_input
)
return Action(ActionType.THINK, content)
def _format_history(self, state: str) -> str:
"""格式化历史记录"""
if "历史记录:" not in state:
return ""
history_part = state.split("历史记录:\n")[1]
return history_part
完整实现代码
from typing import Callable, Dict, Any
import json
import re
class Tool:
"""工具基类"""
def __init__(self, name: str, description: str, func: Callable):
self.name = name
self.description = description
self.func = func
def run(self, input_data: Any) -> Any:
"""运行工具"""
return self.func(input_data)
# 定义一些基础工具
def calculator(input_data: Dict) -> float:
"""计算器工具"""
expression = input_data.get("expression", "")
try:
result = eval(expression)
return result
except Exception as e:
return f"计算错误: {str(e)}"
def search(input_data: Dict) -> str:
"""搜索工具(模拟)"""
query = input_data.get("query", "")
# 这里应该调用真实的搜索API
return f"关于'{query}'的搜索结果: [模拟数据]"
def python_repl(input_data: Dict) -> str:
"""Python代码执行"""
code = input_data.get("code", "")
try:实际应用中应该在沙箱中执行
import io
import sys
old_stdout = sys.stdout
sys.stdout = io.StringIO()
exec(code)
output = sys.stdout.getvalue()
sys.stdout = old_stdout
return output
except Exception as e:
return f"执行错误: {str(e)}"
# 创建工具列表
tools = [
Tool(
name="Calculator",
description="用于数学计算。输入格式: {\"expression\": \"2+2\"}",
func=calculator
),
Tool(
name="Search",
description="搜索互联网信息。输入格式: {\"query\": \"搜索内容\"}",
func=search
),
Tool(
name="PythonREPL",
description="执行Python代码。输入格式: {\"code\": \"print(2+2)\"}",
func=python_repl
)
]
# 模拟LLM
class MockLLM:
"""模拟LLM(实际应该使用真实的LLM)"""
def generate(self, prompt: str) -> str:
"""生成响应"""
# 实际应该调用OpenAI API或其他LLM
# 这里返回模拟响应用于演示
if "计算" in prompt or "数学" in prompt:
return """Thought: 需要使用计算器来解决这个数学问题
Action: Calculator
Action Input: {"expression": "12 * 15"}"""
else:
return """Thought: 我需要搜索相关信息
Action: Search
Action Input: {"query": "ReAct Agent"}"""
# 完整示例
def main():
llm = MockLLM()
agent = ReActAgent(llm, tools)
# 运行任务
result = agent.run("计算 12 乘以 15")
print(f"最终结果: {result}")
if __name__ == "__main__":
main()
使用真实LLM的完整实现:
import openai
from typing import List, Dict, Any
class OpenAIReActAgent:
"""基于OpenAI的ReAct Agent"""
def __init__(self, api_key: str, tools: List[Tool], model: str = "gpt-4"):
openai.api_key = api_key
self.model = model
self.tools = {tool.name: tool for tool in tools}
self.max_iterations = 10
def run(self, task: str) -> str:
"""运行Agent"""
messages = [
{"role": "system", "content": self._build_system_prompt()},
{"role": "user", "content": f"Question: {task}"}
]
for i in range(self.max_iterations):
# 调用LLM
response = openai.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.7,
max_tokens=500
)
assistant_message = response.choices[0].message.content
messages.append({"role": "assistant", "content": assistant_message})
# 解析响应
if "Final Answer:" in assistant_message:
final_answer = assistant_message.split("Final Answer:")[1].strip()
return final_answer
# 提取Action和Action Input
action_match = re.search(r"Action:\s*(\w+)", assistant_message)
input_match = re.search(r"Action Input:\s*({.*?})", assistant_message, re.DOTALL)
if action_match and input_match:
tool_name = action_match.group(1)
tool_input_str = input_match.group(1)
try:
tool_input = json.loads(tool_input_str)
except:
tool_input = {"input": tool_input_str}
# 执行工具
if tool_name in self.tools:
result = self.tools[tool_name].run(tool_input)
observation = f"Observation: {result}"
else:
observation = f"Observation: 工具 {tool_name} 不存在"
messages.append({"role": "user", "content": observation})
else:
# 如果无法解析,要求重新格式化
messages.append({
"role": "user",
"content": "请按照 Action: 和 Action Input: 的格式回答"
})
return "达到最大迭代次数"
def _build_system_prompt(self) -> str:
"""构建系统提示"""
tools_desc = "\n".join([
f"- {name}: {tool.description}"
for name, tool in self.tools.items()
])
return f"""你是一个能够使用工具的AI助手。严格按照以下格式回答:
可用工具:
{tools_desc}
回答格式:
Question: 用户的问题
Thought: 你的思考过程
Action: 工具名称
Action Input: 工具输入(JSON格式)
Observation: 工具返回的结果
... (可以重复多次)
Thought: 我现在知道最终答案了
Final Answer: 最终答案
重要:必须严格遵循格式!"""
# 使用示例
agent = OpenAIReActAgent(
api_key="your-api-key",
tools=tools
)
result = agent.run("帮我计算 (123 + 456) * 789,然后搜索一下这个数字的含义")
print(result)
工具调用(Function Calling)
OpenAI Function Calling API
OpenAI提供了原生的Function Calling功能,让模型能够结构化地调用函数。
import openai
import json
class FunctionCallingAgent:
"""基于OpenAI Function Calling的Agent"""
def __init__(self, api_key: str, model: str = "gpt-4-turbo-preview"):
openai.api_key = api_key
self.model = model
self.functions = []
self.available_functions = {}
def register_function(self, func: Callable, name: str, description: str, parameters: Dict):
"""注册函数"""
self.functions.append({
"name": name,
"description": description,
"parameters": parameters
})
self.available_functions[name] = func
def run(self, query: str) -> str:
"""运行Agent"""
messages = [{"role": "user", "content": query}]
while True:
# 调用API
response = openai.chat.completions.create(
model=self.model,
messages=messages,
functions=self.functions,
function_call="auto"
)
response_message = response.choices[0].message
# 检查是否需要调用函数
if response_message.function_call:
# 提取函数调用信息
function_name = response_message.function_call.name
function_args = json.loads(response_message.function_call.arguments)
print(f"调用函数: {function_name}")
print(f"参数: {function_args}")
# 执行函数
function_to_call = self.available_functions[function_name]
function_response = function_to_call(**function_args)
# 将函数结果添加到消息
messages.append(response_message)
messages.append({
"role": "function",
"name": function_name,
"content": str(function_response)
})
else:
# 没有函数调用,返回最终答案
return response_message.content
# 定义工具函数
def get_current_weather(location: str, unit: str = "celsius") -> str:
"""获取当前天气"""
# 模拟天气数据
weather_data = {
"北京": {"temperature": 15, "condition": "晴朗"},
"上海": {"temperature": 20, "condition": "多云"},
"深圳": {"temperature": 25, "condition": "阴天"}
}
if location in weather_data:
data = weather_data[location]
return json.dumps({
"location": location,
"temperature": data["temperature"],
"condition": data["condition"],
"unit": unit
}, ensure_ascii=False)
else:
return json.dumps({"error": "未找到该地点的天气信息"}, ensure_ascii=False)
def search_database(query: str, table: str) -> str:
"""搜索数据库"""
# 模拟数据库查询
return json.dumps({
"results": [
{"id": 1, "name": "结果1"},
{"id": 2, "name": "结果2"}
]
}, ensure_ascii=False)
# 使用示例
agent = FunctionCallingAgent(api_key="your-api-key")
# 注册天气查询函数
agent.register_function(
func=get_current_weather,
name="get_current_weather",
description="获取指定地点的当前天气",
parameters={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市名称,例如:北京、上海"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "温度单位"
}
},
"required": ["location"]
}
)
# 注册数据库查询函数
agent.register_function(
func=search_database,
name="search_database",
description="在数据库中搜索信息",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询"
},
"table": {
"type": "string",
"description": "要搜索的表名"
}
},
"required": ["query", "table"]
}
)
# 执行查询
result = agent.run("北京和上海今天的天气怎么样?")
print(result)
工具描述格式
工具描述遵循JSON Schema格式:
# 完整的工具描述示例
weather_tool = {
"name": "get_weather",
"description": "获取指定地点的天气信息,包括温度、湿度、天气状况等",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "地点名称,可以是城市名、地区名或坐标"
},
"date": {
"type": "string",
"description": "日期,格式: YYYY-MM-DD。如果不提供,默认为今天"
},
"units": {
"type": "string",
"enum": ["metric", "imperial"],
"description": "单位系统:metric(公制)或imperial(英制)"
},
"include_forecast": {
"type": "boolean",
"description": "是否包含未来几天的预报"
}
},
"required": ["location"] # 必需参数
}
}
# 复杂的嵌套参数
database_query_tool = {
"name": "query_database",
"description": "执行数据库查询",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL查询语句"
},
"database": {
"type": "string",
"description": "数据库名称"
},
"options": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "返回结果的最大数量"
},
"timeout": {
"type": "integer",
"description": "查询超时时间(秒)"
}
}
}
},
"required": ["query", "database"]
}
}
参数提取和验证
from pydantic import BaseModel, Field, validator
from typing import Optional, List
class WeatherParams(BaseModel):
"""天气查询参数"""
location: str = Field(..., description="地点名称")
date: Optional[str] = Field(None, description="日期 YYYY-MM-DD")
units: str = Field("metric", description="单位系统")
include_forecast: bool = Field(False, description="包含预报")
@validator('date')
def validate_date(cls, v):
"""验证日期格式"""
if v is not None:
import datetime
try:
datetime.datetime.strptime(v, '%Y-%m-%d')
except ValueError:
raise ValueError('日期格式必须是 YYYY-MM-DD')
return v
@validator('units')
def validate_units(cls, v):
"""验证单位"""
if v not in ['metric', 'imperial']:
raise ValueError('units必须是metric或imperial')
return v
class ToolValidator:
"""工具参数验证器"""
def __init__(self):
self.validators = {}
def register(self, tool_name: str, validator_class: BaseModel):
"""注册验证器"""
self.validators[tool_name] = validator_class
def validate(self, tool_name: str, params: Dict) -> BaseModel:
"""验证参数"""
if tool_name not in self.validators:
raise ValueError(f"未注册的工具: {tool_name}")
validator_class = self.validators[tool_name]
try:
validated_params = validator_class(**params)
return validated_params
except Exception as e:
raise ValueError(f"参数验证失败: {str(e)}")
# 使用示例
validator = ToolValidator()
validator.register("get_weather", WeatherParams)
# 验证参数
try:
params = validator.validate("get_weather", {
"location": "北京",
"date": "2024-01-15",
"units": "metric"
})
print(f"验证通过: {params}")
except ValueError as e:
print(f"验证失败: {e}")
结果反馈
from typing import Any, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ToolResult:
"""工具执行结果"""
success: bool
result: Any
error: Optional[str] = None
execution_time: float = 0.0
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class ToolExecutor:
"""工具执行器(带结果反馈)"""
def __init__(self):
self.tools = {}
self.execution_history = []
def register_tool(self, name: str, func: Callable, validator: Optional[BaseModel] = None):
"""注册工具"""
self.tools[name] = {
"func": func,
"validator": validator
}
def execute(self, tool_name: str, params: Dict) -> ToolResult:
"""执行工具"""
import time
start_time = time.time()
if tool_name not in self.tools:
return ToolResult(
success=False,
result=None,
error=f"工具 {tool_name} 不存在"
)
tool = self.tools[tool_name]
# 参数验证
if tool["validator"]:
try:
validated_params = tool["validator"](**params)
params = validated_params.dict()
except Exception as e:
return ToolResult(
success=False,
result=None,
error=f"参数验证失败: {str(e)}"
)
# 执行工具
try:
result = tool["func"](**params)
execution_time = time.time() - start_time
tool_result = ToolResult(
success=True,
result=result,
execution_time=execution_time
)
except Exception as e:
execution_time = time.time() - start_time
tool_result = ToolResult(
success=False,
result=None,
error=str(e),
execution_time=execution_time
)
# 记录历史
self.execution_history.append({
"tool_name": tool_name,
"params": params,
"result": tool_result
})
return tool_result
def get_execution_stats(self) -> Dict:
"""获取执行统计"""
if not self.execution_history:
return {}
total_executions = len(self.execution_history)
successful = sum(1 for h in self.execution_history if h["result"].success)
failed = total_executions - successful
avg_time = sum(h["result"].execution_time for h in self.execution_history) / total_executions
tool_usage = {}
for h in self.execution_history:
tool_name = h["tool_name"]
tool_usage[tool_name] = tool_usage.get(tool_name, 0) + 1
return {
"total_executions": total_executions,
"successful": successful,
"failed": failed,
"success_rate": successful / total_executions,
"average_execution_time": avg_time,
"tool_usage": tool_usage
}
Agent工具生态
搜索工具
import requests
from typing import List, Dict
class GoogleSearchTool:
"""Google搜索工具"""
def __init__(self, api_key: str, search_engine_id: str):
self.api_key = api_key
self.search_engine_id = search_engine_id
self.base_url = "https://www.googleapis.com/customsearch/v1"
def search(self, query: str, num_results: int = 5) -> List[Dict]:
"""搜索"""
params = {
"key": self.api_key,
"cx": self.search_engine_id,
"q": query,
"num": num_results
}
response = requests.get(self.base_url, params=params)
data = response.json()
results = []
for item in data.get("items", []):
results.append({
"title": item.get("title"),
"link": item.get("link"),
"snippet": item.get("snippet")
})
return results
class DuckDuckGoSearchTool:
"""DuckDuckGo搜索工具(无需API key)"""
def search(self, query: str, num_results: int = 5) -> List[Dict]:
"""搜索"""
from duckduckgo_search import DDGS
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=num_results))
return [
{
"title": r["title"],
"link": r["href"],
"snippet": r["body"]
}
for r in results
]
# 集成到Agent
class SearchAgent(FunctionCallingAgent):
"""搜索Agent"""
def __init__(self, api_key: str):
super().__init__(api_key)
# 添加搜索工具
self.search_tool = DuckDuckGoSearchTool()
self.register_function(
func=self.search_web,
name="search_web",
description="在互联网上搜索信息",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询"
},
"num_results": {
"type": "integer",
"description": "返回结果数量",
"default": 5
}
},
"required": ["query"]
}
)
def search_web(self, query: str, num_results: int = 5) -> str:
"""搜索网页"""
results = self.search_tool.search(query, num_results)
output = f"找到 {len(results)} 个结果:\n\n"
for i, result in enumerate(results, 1):
output += f"{i}. {result['title']}\n"
output += f" {result['snippet']}\n"
output += f" 链接: {result['link']}\n\n"
return output
计算工具
import ast
import operator
import math
class Calculator:
"""安全的计算器"""
# 允许的操作符
ALLOWED_OPERATORS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.Mod: operator.mod,
ast.FloorDiv: operator.floordiv,
ast.UAdd: operator.pos,
ast.USub: operator.neg,
}
# 允许的函数
ALLOWED_FUNCTIONS = {
'abs': abs,
'round': round,
'min': min,
'max': max,
'sum': sum,
'sqrt': math.sqrt,
'sin': math.sin,
'cos': math.cos,
'tan': math.tan,
'log': math.log,
'exp': math.exp,
'pi': math.pi,
'e': math.e,
}
def calculate(self, expression: str) -> float:
"""计算表达式"""
try:
# 解析表达式
tree = ast.parse(expression, mode='eval')
result = self._eval_node(tree.body)
return result
except Exception as e:
raise ValueError(f"计算错误: {str(e)}")
def _eval_node(self, node):
"""递归求值节点"""
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.BinOp):
op = type(node.op)
if op not in self.ALLOWED_OPERATORS:
raise ValueError(f"不允许的操作: {op}")
left = self._eval_node(node.left)
right = self._eval_node(node.right)
return self.ALLOWED_OPERATORS[op](left, right)
elif isinstance(node, ast.UnaryOp):
op = type(node.op)
if op not in self.ALLOWED_OPERATORS:
raise ValueError(f"不允许的操作: {op}")
operand = self._eval_node(node.operand)
return self.ALLOWED_OPERATORS[op](operand)
elif isinstance(node, ast.Call):
func_name = node.func.id
if func_name not in self.ALLOWED_FUNCTIONS:
raise ValueError(f"不允许的函数: {func_name}")
args = [self._eval_node(arg) for arg in node.args]
return self.ALLOWED_FUNCTIONS[func_name](*args)
elif isinstance(node, ast.Name):
if node.id in self.ALLOWED_FUNCTIONS:
return self.ALLOWED_FUNCTIONS[node.id]
raise ValueError(f"未定义的变量: {node.id}")
else:
raise ValueError(f"不支持的节点类型: {type(node)}")
class PythonREPL:
"""Python代码执行器(沙箱)"""
def __init__(self, timeout: int = 5):
self.timeout = timeout
self.globals = {"__builtins__": {}}
# 允许的内置函数
safe_builtins = {
'abs': abs,
'round': round,
'len': len,
'range': range,
'sum': sum,
'min': min,
'max': max,
'sorted': sorted,
'print': print,
}
self.globals["__builtins__"] = safe_builtins
def execute(self, code: str) -> str:
"""执行代码"""
import io
import sys
import signal
# 捕获输出
old_stdout = sys.stdout
sys.stdout = io.StringIO()
def timeout_handler(signum, frame):
raise TimeoutError("代码执行超时")
try:
# 设置超时
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(self.timeout)
# 执行代码
exec(code, self.globals)
# 取消超时
signal.alarm(0)
# 获取输出
output = sys.stdout.getvalue()
sys.stdout = old_stdout
return output if output else "执行成功(无输出)"
except TimeoutError:
sys.stdout = old_stdout
return "执行超时"
except Exception as e:
sys.stdout = old_stdout
return f"执行错误: {str(e)}"
数据库查询
import sqlite3
from typing import List, Dict, Any
class DatabaseTool:
"""数据库查询工具"""
def __init__(self, db_path: str):
self.db_path = db_path
self.connection = None
def connect(self):
"""连接数据库"""
self.connection = sqlite3.connect(self.db_path)
self.connection.row_factory = sqlite3.Row
def close(self):
"""关闭连接"""
if self.connection:
self.connection.close()
def query(self, sql: str, params: tuple = ()) -> List[Dict]:
"""执行查询"""
if not self.connection:
self.connect()
try:
cursor = self.connection.cursor()
cursor.execute(sql, params)
rows = cursor.fetchall()
# 转换为字典列表
results = [dict(row) for row in rows]
return results
except Exception as e:
raise ValueError(f"查询错误: {str(e)}")
def execute(self, sql: str, params: tuple = ()) -> int:
"""执行更新/插入/删除"""
if not self.connection:
self.connect()
try:
cursor = self.connection.cursor()
cursor.execute(sql, params)
self.connection.commit()
return cursor.rowcount
except Exception as e:
self.connection.rollback()
raise ValueError(f"执行错误: {str(e)}")
def get_schema(self) -> Dict[str, List[Dict]]:
"""获取数据库schema"""
schema = {}
# 获取所有表
tables = self.query(
"SELECT name FROM sqlite_master WHERE type='table'"
)
for table in tables:
table_name = table['name']
# 获取表的列信息
columns = self.query(f"PRAGMA table_info({table_name})")
schema[table_name] = columns
return schema
# SQL生成Agent
class SQLAgent(FunctionCallingAgent):
"""SQL查询Agent"""
def __init__(self, api_key: str, db_path: str):
super().__init__(api_key)
self.db_tool = DatabaseTool(db_path)
self.db_tool.connect()
# 获取schema
self.schema = self.db_tool.get_schema()
schema_desc = self._format_schema()
# 注册查询函数
self.register_function(
func=self.query_database,
name="query_database",
description=f"执行SQL查询。数据库schema:\n{schema_desc}",
parameters={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL查询语句"
}
},
"required": ["sql"]
}
)
def _format_schema(self) -> str:
"""格式化schema"""
lines = []
for table_name, columns in self.schema.items():
lines.append(f"表 {table_name}:")
for col in columns:
lines.append(f" - {col['name']} ({col['type']})")
return "\n".join(lines)
def query_database(self, sql: str) -> str:
"""查询数据库"""
# 安全检查
sql_lower = sql.lower().strip()
if any(keyword in sql_lower for keyword in ['drop', 'delete', 'update', 'insert', 'alter']):
return "错误: 只允许SELECT查询"
try:
results = self.db_tool.query(sql)
return json.dumps(results, ensure_ascii=False, indent=2)
except Exception as e:
return f"查询错误: {str(e)}"
API调用
import requests
from typing import Dict, Any, Optional
class APITool:
"""API调用工具"""
def __init__(self, base_url: str, headers: Optional[Dict] = None):
self.base_url = base_url
self.headers = headers or {}
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict:
"""GET请求"""
url = f"{self.base_url}{endpoint}"
response = requests.get(url, params=params, headers=self.headers)
response.raise_for_status()
return response.json()
def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict:
"""POST请求"""
url = f"{self.base_url}{endpoint}"
response = requests.post(url, json=data, headers=self.headers)
response.raise_for_status()
return response.json()
# REST API Agent示例
class WeatherAgent(FunctionCallingAgent):
"""天气API Agent"""
def __init__(self, api_key: str, weather_api_key: str):
super().__init__(api_key)
self.weather_api = APITool(
base_url="https://api.openweathermap.org/data/2.5",
headers={"Authorization": f"Bearer {weather_api_key}"}
)
self.register_function(
func=self.get_weather,
name="get_weather",
description="获取城市天气信息",
parameters={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称"
},
"units": {
"type": "string",
"enum": ["metric", "imperial"],
"description": "单位系统"
}
},
"required": ["city"]
}
)
def get_weather(self, city: str, units: str = "metric") -> str:
"""获取天气"""
try:
data = self.weather_api.get("/weather", {
"q": city,
"units": units
})
weather = {
"city": data["name"],
"temperature": data["main"]["temp"],
"feels_like": data["main"]["feels_like"],
"humidity": data["main"]["humidity"],
"description": data["weather"][0]["description"]
}
return json.dumps(weather, ensure_ascii=False)
except Exception as e:
return f"获取天气失败: {str(e)}"
文件操作
import os
import shutil
from pathlib import Path
from typing import List
class FileOperationTool:
"""文件操作工具"""
def __init__(self, workspace: str = "./workspace"):
self.workspace = Path(workspace)
self.workspace.mkdir(exist_ok=True)
def _get_safe_path(self, file_path: str) -> Path:
"""获取安全的文件路径"""
full_path = (self.workspace / file_path).resolve()
# 确保路径在workspace内
if not str(full_path).startswith(str(self.workspace.resolve())):
raise ValueError("路径必须在workspace内")
return full_path
def read_file(self, file_path: str) -> str:
"""读取文件"""
path = self._get_safe_path(file_path)
if not path.exists():
return f"文件不存在: {file_path}"
try:
with open(path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
return f"读取失败: {str(e)}"
def write_file(self, file_path: str, content: str) -> str:
"""写入文件"""
path = self._get_safe_path(file_path)
try:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
return f"成功写入 {file_path}"
except Exception as e:
return f"写入失败: {str(e)}"
def list_files(self, directory: str = ".") -> List[str]:
"""列出文件"""
path = self._get_safe_path(directory)
if not path.exists():
return []
files = []
for item in path.rglob("*"):
if item.is_file():
rel_path = item.relative_to(self.workspace)
files.append(str(rel_path))
return files
def delete_file(self, file_path: str) -> str:
"""删除文件"""
path = self._get_safe_path(file_path)
if not path.exists():
return f"文件不存在: {file_path}"
try:
if path.is_file():
path.unlink()
else:
shutil.rmtree(path)
return f"成功删除 {file_path}"
except Exception as e:
return f"删除失败: {str(e)}"
def create_directory(self, directory: str) -> str:
"""创建目录"""
path = self._get_safe_path(directory)
try:
path.mkdir(parents=True, exist_ok=True)
return f"成功创建目录 {directory}"
except Exception as e:
return f"创建失败: {str(e)}"
多Agent协作
Agent通信协议
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
class MessageType(Enum):
"""消息类型"""
REQUEST = "request" # 请求
RESPONSE = "response" # 响应
BROADCAST = "broadcast" # 广播
NOTIFICATION = "notification" # 通知
@dataclass
class Message:
"""Agent间消息"""
sender: str
receiver: str
message_type: MessageType
content: Any
message_id: str = None
reply_to: str = None
timestamp: float = None
def __post_init__(self):
import time
import uuid
if self.message_id is None:
self.message_id = str(uuid.uuid4())
if self.timestamp is None:
self.timestamp = time.time()
class MessageBus:
"""消息总线"""
def __init__(self):
self.subscribers = {} # {agent_name: agent}
self.message_queue = []
self.message_history = []
def register(self, agent_name: str, agent):
"""注册Agent"""
self.subscribers[agent_name] = agent
def unregister(self, agent_name: str):
"""注销Agent"""
self.subscribers.pop(agent_name, None)
def send(self, message: Message):
"""发送消息"""
self.message_history.append(message)
if message.message_type == MessageType.BROADCAST:
# 广播给所有Agent
for agent_name, agent in self.subscribers.items():
if agent_name != message.sender:
agent.receive_message(message)
else:
# 发送给特定Agent
receiver = self.subscribers.get(message.receiver)
if receiver:
receiver.receive_message(message)
def get_history(self, agent_name: str = None) -> List[Message]:
"""获取消息历史"""
if agent_name:
return [
msg for msg in self.message_history
if msg.sender == agent_name or msg.receiver == agent_name
]
return self.message_history
任务分解和分配
from typing import List, Dict, Any
class Task:
"""任务对象"""
def __init__(self, task_id: str, description: str, dependencies: List[str] = None):
self.task_id = task_id
self.description = description
self.dependencies = dependencies or []
self.status = "pending" # pending, in_progress, completed, failed
self.result = None
self.assigned_to = None
class TaskDecomposer:
"""任务分解器"""
def __init__(self, llm):
self.llm = llm
def decompose(self, task_description: str) -> List[Task]:
"""分解任务"""
prompt = f"""将以下任务分解为多个子任务。每个子任务应该是独立且可执行的。
任务: {task_description}
以JSON格式返回子任务列表:
[
{{"id": "task1", "description": "...", "dependencies": []}},
{{"id": "task2", "description": "...", "dependencies": ["task1"]}},
...
]
子任务列表:"""
response = self.llm.generate(prompt, temperature=0.3)
# 解析JSON
import json
import re
json_match = re.search(r'\[.*\]', response, re.DOTALL)
if json_match:
subtasks_data = json.loads(json_match.group())
tasks = [
Task(
task_id=t["id"],
description=t["description"],
dependencies=t.get("dependencies", [])
)
for t in subtasks_data
]
return tasks
# 如果解析失败,返回单个任务
return [Task("task1", task_description)]
class TaskScheduler:
"""任务调度器"""
def __init__(self):
self.tasks = {}
self.execution_order = []
def add_tasks(self, tasks: List[Task]):
"""添加任务"""
for task in tasks:
self.tasks[task.task_id] = task
# 计算执行顺序(拓扑排序)
self.execution_order = self._topological_sort()
def _topological_sort(self) -> List[str]:
"""拓扑排序"""
# 计算入度
in_degree = {task_id: 0 for task_id in self.tasks}
for task in self.tasks.values():
for dep in task.dependencies:
if dep in in_degree:
in_degree[task.task_id] += 1
# 队列
queue = [task_id for task_id, degree in in_degree.items() if degree == 0]
result = []
while queue:
task_id = queue.pop(0)
result.append(task_id)
# 更新依赖此任务的其他任务
for other_task in self.tasks.values():
if task_id in other_task.dependencies:
in_degree[other_task.task_id] -= 1
if in_degree[other_task.task_id] == 0:
queue.append(other_task.task_id)
return result
def get_next_task(self) -> Optional[Task]:
"""获取下一个可执行任务"""
for task_id in self.execution_order:
task = self.tasks[task_id]
if task.status != "pending":
continue
# 检查依赖是否完成
deps_completed = all(
self.tasks[dep].status == "completed"
for dep in task.dependencies
if dep in self.tasks
)
if deps_completed:
return task
return None
def all_completed(self) -> bool:
"""是否所有任务完成"""
return all(task.status in ["completed", "failed"] for task in self.tasks.values())
AutoGen框架
import autogen
from typing import List, Dict, Any
class AutoGenMultiAgent:
"""基于AutoGen的多Agent系统"""
def __init__(self, config_list: List[Dict]):
self.config_list = config_list
self.agents = {}
def create_assistant(self, name: str, system_message: str) -> autogen.AssistantAgent:
"""创建助手Agent"""
assistant = autogen.AssistantAgent(
name=name,
system_message=system_message,
llm_config={
"config_list": self.config_list,
"temperature": 0.7
}
)
self.agents[name] = assistant
return assistant
def create_user_proxy(self, name: str, **kwargs) -> autogen.UserProxyAgent:
"""创建用户代理"""
user_proxy = autogen.UserProxyAgent(
name=name,
**kwargs
)
self.agents[name] = user_proxy
return user_proxy
def create_group_chat(self, agents: List, max_round: int = 10):
"""创建群聊"""
groupchat = autogen.GroupChat(
agents=agents,
messages=[],
max_round=max_round
)
manager = autogen.GroupChatManager(
groupchat=groupchat,
llm_config={"config_list": self.config_list}
)
return manager
# 使用示例
def research_task_example():
"""研究任务示例"""
config_list = [
{
"model": "gpt-4",
"api_key": "your-api-key"
}
]
system = AutoGenMultiAgent(config_list)
# 创建研究员Agent
researcher = system.create_assistant(
name="researcher",
system_message="你是一个研究员,负责搜索和收集信息。"
)
# 创建分析师Agent
analyst = system.create_assistant(
name="analyst",
system_message="你是一个分析师,负责分析和总结信息。"
)
# 创建撰稿人Agent
writer = system.create_assistant(
name="writer",
system_message="你是一个撰稿人,负责撰写报告。"
)
# 创建用户代理
user = system.create_user_proxy(
name="user",
human_input_mode="NEVER",
max_consecutive_auto_reply=0,
code_execution_config=False
)
# 创建群聊
manager = system.create_group_chat(
agents=[user, researcher, analyst, writer],
max_round=12
)
# 启动任务
user.initiate_chat(
manager,
message="请研究并撰写一份关于'Transformer架构'的技术报告"
)
MetaGPT实现
from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod
class Role(ABC):
"""角色基类"""
def __init__(self, name: str, profile: str, llm):
self.name = name
self.profile = profile
self.llm = llm
self.memory = []
@abstractmethod
async def act(self, message: str) -> str:
"""执行动作"""
pass
def observe(self, message: Message):
"""观察消息"""
self.memory.append(message)
class ProductManager(Role):
"""产品经理角色"""
async def act(self, message: str) -> str:
"""撰写PRD"""
prompt = f"""作为产品经理,基于以下需求撰写产品需求文档(PRD)。
用户需求: {message}
PRD应包含:
1. 产品目标
2. 用户故事
3. 功能列表
4. 优先级
5. 验收标准
PRD:"""
prd = self.llm.generate(prompt, temperature=0.5)
return prd
class Architect(Role):
"""架构师角色"""
async def act(self, message: str) -> str:
"""设计架构"""
prompt = f"""作为架构师,基于以下PRD设计系统架构。
PRD: {message}
架构设计应包含:
1. 系统架构图
2. 技术栈选择
3. 数据模型
4. API设计
5. 部署方案
架构设计:"""
design = self.llm.generate(prompt, temperature=0.3)
return design
class Engineer(Role):
"""工程师角色"""
async def act(self, message: str) -> str:
"""编写代码"""
prompt = f"""作为工程师,基于以下架构设计编写代码。
架构设计: {message}
请编写主要代码文件和实现关键功能。
代码:"""
code = self.llm.generate(prompt, temperature=0.2)
return code
class QAEngineer(Role):
"""测试工程师角色"""
async def act(self, message: str) -> str:
"""编写测试"""
prompt = f"""作为QA工程师,基于以下代码编写测试用例。
代码: {message}
测试用例应包含:
1. 单元测试
2. 集成测试
3. 边界条件测试
测试代码:"""
tests = self.llm.generate(prompt, temperature=0.2)
return tests
class SoftwareCompany:
"""软件公司(MetaGPT风格)"""
def __init__(self, llm):
self.llm = llm
self.roles = []
self.message_bus = MessageBus()
# 初始化角色
self._init_roles()
def _init_roles(self):
"""初始化角色"""
self.pm = ProductManager("Alice", "Product Manager", self.llm)
self.architect = Architect("Bob", "Architect", self.llm)
self.engineer = Engineer("Charlie", "Engineer", self.llm)
self.qa = QAEngineer("David", "QA Engineer", self.llm)
self.roles = [self.pm, self.architect, self.engineer, self.qa]
async def run_project(self, requirement: str) -> Dict[str, str]:
"""运行项目"""
results = {}
# 1. PM撰写PRD
print("产品经理正在撰写PRD...")
prd = await self.pm.act(requirement)
results["prd"] = prd
# 2. 架构师设计架构
print("架构师正在设计架构...")
design = await self.architect.act(prd)
results["design"] = design
# 3. 工程师编写代码
print("工程师正在编写代码...")
code = await self.engineer.act(design)
results["code"] = code
# 4. QA编写测试
print("QA正在编写测试...")
tests = await self.qa.act(code)
results["tests"] = tests
return results
# 使用示例
async def main():
from your_llm_module import LLM
llm = LLM(api_key="your-api-key")
company = SoftwareCompany(llm)
results = await company.run_project(
"开发一个待办事项管理应用"
)
print("\n=== PRD ===")
print(results["prd"])
print("\n=== 架构设计 ===")
print(results["design"])
print("\n=== 代码 ===")
print(results["code"])
print("\n=== 测试 ===")
print(results["tests"])
# 运行
import asyncio
asyncio.run(main())
从零实现Agent系统
这里提供一个完整的、生产级的Agent系统实现(3000+行代码的核心部分):
"""
完整的Agent系统实现
包含:工具注册、任务规划、执行引擎、记忆管理
"""
import json
import logging
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from enum import Enum
import time
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============ 1. 工具系统 ============
class ToolRegistry:
"""工具注册中心"""
def __init__(self):
self.tools = {}
self.tool_metadata = {}
def register(
self,
name: str,
func: Callable,
description: str,
parameters: Dict,
returns: Dict = None
):
"""注册工具"""
self.tools[name] = func
self.tool_metadata[name] = {
"name": name,
"description": description,
"parameters": parameters,
"returns": returns or {}
}
logger.info(f"注册工具: {name}")
def get_tool(self, name: str) -> Optional[Callable]:
"""获取工具"""
return self.tools.get(name)
def get_metadata(self, name: str) -> Optional[Dict]:
"""获取工具元数据"""
return self.tool_metadata.get(name)
def list_tools(self) -> List[str]:
"""列出所有工具"""
return list(self.tools.keys())
def get_tools_description(self) -> str:
"""获取所有工具的描述"""
descriptions = []
for name, metadata in self.tool_metadata.items():
desc = f"- {name}: {metadata['description']}\n"
desc += f" 参数: {json.dumps(metadata['parameters'], ensure_ascii=False)}"
descriptions.append(desc)
return "\n".join(descriptions)
# ============ 2. 记忆系统 ============
@dataclass
class Memory:
"""记忆对象"""
content: str
timestamp: float = field(default_factory=time.time)
metadata: Dict = field(default_factory=dict)
importance: float = 0.5
class MemoryManager:
"""记忆管理器"""
def __init__(self, max_short_term: int = 10, max_long_term: int = 100):
self.short_term = [] # 短期记忆
self.long_term = [] # 长期记忆
self.max_short_term = max_short_term
self.max_long_term = max_long_term
def add(self, content: str, importance: float = 0.5, metadata: Dict = None):
"""添加记忆"""
memory = Memory(
content=content,
importance=importance,
metadata=metadata or {}
)
self.short_term.append(memory)
# 短期记忆满了,转移到长期记忆
if len(self.short_term) > self.max_short_term:
old_memory = self.short_term.pop(0)
if old_memory.importance > 0.6: # 重要的记忆保存到长期
self.long_term.append(old_memory)
# 长期记忆满了,删除最不重要的
if len(self.long_term) > self.max_long_term:
self.long_term.sort(key=lambda m: m.importance)
self.long_term = self.long_term[10:] # 删除最不重要的10个
def get_recent(self, n: int = 5) -> List[Memory]:
"""获取最近的记忆"""
return self.short_term[-n:]
def search(self, query: str, n: int = 5) -> List[Memory]:
"""搜索相关记忆(简单的关键词匹配)"""
all_memories = self.short_term + self.long_term
# 计算相关性
scored_memories = []
for memory in all_memories:
score = sum(1 for word in query.split() if word in memory.content)
if score > 0:
scored_memories.append((memory, score))
# 排序并返回top n
scored_memories.sort(key=lambda x: x[1], reverse=True)
return [m for m, s in scored_memories[:n]]
def get_summary(self) -> str:
"""获取记忆摘要"""
recent = self.get_recent(5)
summary = "最近的记忆:\n"
for i, memory in enumerate(recent, 1):
summary += f"{i}. {memory.content}\n"
return summary
# ============ 3. 规划系统 ============
@dataclass
class Plan:
"""计划对象"""
goal: str
steps: List[Dict]
current_step: int = 0
status: str = "pending" # pending, in_progress, completed, failed
def get_next_step(self) -> Optional[Dict]:
"""获取下一步"""
if self.current_step < len(self.steps):
return self.steps[self.current_step]
return None
def complete_step(self):
"""完成当前步骤"""
self.current_step += 1
if self.current_step >= len(self.steps):
self.status = "completed"
class Planner:
"""规划器"""
def __init__(self, llm, tool_registry: ToolRegistry):
self.llm = llm
self.tool_registry = tool_registry
def create_plan(self, goal: str, context: str = "") -> Plan:
"""创建计划"""
tools_desc = self.tool_registry.get_tools_description()
prompt = f"""你是一个规划专家。基于目标和可用工具,制定详细的执行计划。
目标: {goal}
上下文: {context}
可用工具:
{tools_desc}
请以JSON格式返回计划,格式如下:
{{
"steps": [
{{"step": 1, "action": "使用工具X", "tool": "tool_name", "params": {{}}, "reason": "原因"}},
{{"step": 2, "action": "...", "tool": "...", "params": {{}}, "reason": "..."}},
...
]
}}
计划:"""
response = self.llm.generate(prompt, temperature=0.3)
# 解析JSON
import re
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
try:
plan_data = json.loads(json_match.group())
return Plan(goal=goal, steps=plan_data["steps"])
except:
pass
# 如果解析失败,返回简单计划
return Plan(
goal=goal,
steps=[{"step": 1, "action": "直接回答", "tool": None, "params": {}}]
)
# ============ 4. 执行引擎 ============
class ExecutionEngine:
"""执行引擎"""
def __init__(
self,
tool_registry: ToolRegistry,
memory_manager: MemoryManager,
max_retries: int = 3
):
self.tool_registry = tool_registry
self.memory = memory_manager
self.max_retries = max_retries
def execute_step(self, step: Dict) -> Dict:
"""执行单个步骤"""
tool_name = step.get("tool")
if not tool_name:
return {
"success": True,
"result": "无需工具执行",
"step": step
}
tool_func = self.tool_registry.get_tool(tool_name)
if not tool_func:
return {
"success": False,
"error": f"工具 {tool_name} 不存在",
"step": step
}
params = step.get("params", {})
# 执行工具(带重试)
for attempt in range(self.max_retries):
try:
result = tool_func(**params)
# 记录到记忆
self.memory.add(
f"执行 {tool_name}: {step.get('action')} -> {result}",
importance=0.7,
metadata={"tool": tool_name, "step": step}
)
return {
"success": True,
"result": result,
"step": step
}
except Exception as e:
logger.warning(f"执行失败 (尝试 {attempt + 1}/{self.max_retries}): {str(e)}")
if attempt == self.max_retries - 1:
return {
"success": False,
"error": str(e),
"step": step
}
time.sleep(1) # 重试前等待
# ============ 5. Agent核心 ============
class AgentCore:
"""Agent核心"""
def __init__(
self,
name: str,
llm,
tool_registry: ToolRegistry = None,
memory_manager: MemoryManager = None
):
self.name = name
self.llm = llm
self.tool_registry = tool_registry or ToolRegistry()
self.memory = memory_manager or MemoryManager()
self.planner = Planner(llm, self.tool_registry)
self.executor = ExecutionEngine(self.tool_registry, self.memory)
self.conversation_history = []
self.current_plan = None
def register_tool(self, name: str, func: Callable, description: str, parameters: Dict):
"""注册工具"""
self.tool_registry.register(name, func, description, parameters)
def run(self, task: str, max_iterations: int = 10) -> str:
"""运行Agent"""
logger.info(f"Agent {self.name} 开始执行任务: {task}")
# 添加到对话历史
self.conversation_history.append({"role": "user", "content": task})
# 创建计划
context = self.memory.get_summary()
self.current_plan = self.planner.create_plan(task, context)
logger.info(f"创建计划,共 {len(self.current_plan.steps)} 步")
# 执行计划
results = []
for i in range(max_iterations):
step = self.current_plan.get_next_step()
if not step:
# 计划完成
break
logger.info(f"执行步骤 {self.current_plan.current_step + 1}: {step.get('action')}")
# 执行步骤
result = self.executor.execute_step(step)
results.append(result)
if result["success"]:
self.current_plan.complete_step()
else:
logger.error(f"步骤执行失败: {result.get('error')}")
# 可以选择重新规划或终止
break
# 生成最终回答
final_answer = self._generate_final_answer(task, results)
# 添加到对话历史和记忆
self.conversation_history.append({"role": "assistant", "content": final_answer})
self.memory.add(f"任务: {task}\n回答: {final_answer}", importance=0.8)
return final_answer
def _generate_final_answer(self, task: str, results: List[Dict]) -> str:
"""生成最终回答"""
results_summary = "\n".join([
f"- {r['step'].get('action')}: {r.get('result', r.get('error'))}"
for r in results
])
prompt = f"""基于任务和执行结果,生成最终回答。
任务: {task}
执行结果:
{results_summary}
回忆:
{self.memory.get_summary()}
请生成一个完整、准确的回答:"""
answer = self.llm.generate(prompt, temperature=0.7)
return answer
# ============ 使用示例 ============
def main():
# 模拟LLM
class SimpleLLM:
def generate(self, prompt: str, temperature: float = 0.7) -> str:
# 实际应该调用真实的LLM API
return '{"steps": [{"step": 1, "action": "计算结果", "tool": "calculator", "params": {"expression": "2+2"}, "reason": "需要计算"}]}'
# 创建Agent
llm = SimpleLLM()
agent = AgentCore(name="MathAgent", llm=llm)
# 注册工具
def calculator(expression: str) -> float:
return eval(expression)
agent.register_tool(
name="calculator",
func=calculator,
description="执行数学计算",
parameters={
"type": "object",
"properties": {
"expression": {"type": "string", "description": "数学表达式"}
},
"required": ["expression"]
}
)
# 执行任务
result = agent.run("计算 2 + 2")
print(f"结果: {result}")
# 查看记忆
print(f"\n记忆: {agent.memory.get_summary()}")
if __name__ == "__main__":
main()