HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 完整学习路径

    • AI教程 - 从零到一的完整学习路径
    • 第00章:AI基础与发展史
    • 第01章:Python与AI开发环境
    • 第02章:数学基础-线性代数与微积分
    • 03-数据集详解-从获取到预处理
    • 04-从零训练第一个模型
    • 05-模型文件详解
    • 06-分布式训练-多GPU与多机
    • 07-模型调度与资源管理
    • 08-Transformer架构深度解析
    • 09-大语言模型原理与架构
    • 10-Token与Tokenization详解
    • 11-Prompt Engineering完全指南
    • 第12章:模型微调与LoRA技术
    • 第13章:RLHF与对齐技术
    • 第14章 AI编程助手原理与实现
    • 15-RAG系统设计与实现
    • 16-Agent智能体与工具调用
    • 17-多模态大模型
    • 第18章:AI前沿技术趋势
    • 第19章 AI热门话题与应用案例

16-Agent智能体与工具调用

Agent基础概念

什么是Agent

Agent(智能体)是一个能够感知环境、自主决策并采取行动以实现目标的AI系统。与传统的单次问答不同,Agent具有以下特征:

  • 自主性:能够独立制定计划和执行任务
  • 反应性:能够感知环境变化并作出响应
  • 目标导向:围绕特定目标进行决策
  • 工具使用:能够调用外部工具和API
  • 记忆能力:保持状态和历史信息

Agent的典型应用场景:

  • 自动化任务执行(如数据分析、报告生成)
  • 复杂问题求解(如数学题、代码调试)
  • 多步骤工作流(如旅行规划、研究助手)
  • 与外部系统交互(如数据库查询、API调用)

感知-思考-行动循环

Agent的核心工作流程:

感知(Perception) → 思考(Reasoning) → 行动(Action) → 观察(Observation) → ...

具体流程:

  1. 感知:接收用户输入或环境信息
  2. 思考:分析当前状态,制定行动计划
  3. 行动:选择并执行工具或生成回复
  4. 观察:获取行动结果
  5. 循环:根据结果决定下一步行动或结束

示例代码:

from typing import List, Dict, Optional
from enum import Enum

class ActionType(Enum):
    """行动类型"""
    TOOL_USE = "tool_use"
    FINAL_ANSWER = "final_answer"
    THINK = "think"

class Action:
    """行动对象"""
    def __init__(self, action_type: ActionType, content: str, tool_name: str = None, tool_input: Dict = None):
        self.type = action_type
        self.content = content
        self.tool_name = tool_name
        self.tool_input = tool_input

class Observation:
    """观察对象"""
    def __init__(self, content: str, success: bool = True):
        self.content = content
        self.success = success

class Agent:
    """基础Agent实现"""

    def __init__(self, llm, tools: List):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
        self.max_iterations = 10

    def run(self, task: str) -> str:
        """运行Agent"""
        history = []

        for i in range(self.max_iterations):
            # 1. 感知:当前状态
            state = self._build_state(task, history)

            # 2. 思考:决定下一步行动
            action = self._think(state)

            # 3. 行动:执行工具或返回答案
            if action.type == ActionType.FINAL_ANSWER:
                return action.content

            observation = self._act(action)

            # 4. 观察:记录结果
            history.append({
                "action": action,
                "observation": observation
            })

        return "达到最大迭代次数,任务未完成"

    def _build_state(self, task: str, history: List) -> str:
        """构建当前状态描述"""
        state = f"任务: {task}\n\n"

        if history:
            state += "历史记录:\n"
            for i, step in enumerate(history, 1):
                action = step['action']
                obs = step['observation']
                state += f"{i}. 行动: {action.content}\n"
                state += f"   结果: {obs.content}\n\n"

        return state

    def _think(self, state: str) -> Action:
        """思考下一步行动"""
        # 由子类实现
        raise NotImplementedError

    def _act(self, action: Action) -> Observation:
        """执行行动"""
        if action.type == ActionType.TOOL_USE:
            tool = self.tools.get(action.tool_name)
            if not tool:
                return Observation(f"工具 {action.tool_name} 不存在", success=False)

            try:
                result = tool.run(action.tool_input)
                return Observation(str(result), success=True)
            except Exception as e:
                return Observation(f"工具执行错误: {str(e)}", success=False)

        return Observation(action.content)

Agent框架对比

1. AutoGPT

最早的开源Agent项目之一,专注于自主任务执行。

# AutoGPT风格的配置
class AutoGPTConfig:
    """AutoGPT配置"""
    def __init__(self):
        self.ai_name = "MyAssistant"
        self.ai_role = "a helpful AI assistant"
        self.ai_goals = [
            "帮助用户完成任务",
            "提供准确的信息",
            "高效地使用工具"
        ]
        self.max_iterations = 10
        self.memory_backend = "local"

# AutoGPT的特点:
# - 长期记忆(向量存储)
# - 文件操作能力
# - 网页浏览
# - 代码执行

2. LangChain Agent

LangChain提供的Agent框架,易于集成和扩展。

from langchain.agents import AgentType, initialize_agent, load_tools
from langchain.llms import OpenAI

# 初始化LLM
llm = OpenAI(temperature=0)

# 加载工具
tools = load_tools(["serpapi", "llm-math"], llm=llm)

# 创建Agent
agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# 运行
result = agent.run("2023年的世界杯冠军是谁?")

# LangChain Agent类型:
# - ZERO_SHOT_REACT_DESCRIPTION: ReAct模式
# - CONVERSATIONAL_REACT_DESCRIPTION: 对话式ReAct
# - OPENAI_FUNCTIONS: 基于OpenAI Function Calling
# - STRUCTURED_CHAT: 结构化聊天

3. AutoGen

微软开源的多Agent框架,支持Agent间协作。

import autogen

# 配置
config_list = [
    {
        "model": "gpt-4",
        "api_key": "your-api-key"
    }
]

# 创建助手Agent
assistant = autogen.AssistantAgent(
    name="assistant",
    llm_config={"config_list": config_list}
)

# 创建用户代理
user_proxy = autogen.UserProxyAgent(
    name="user_proxy",
    human_input_mode="NEVER",
    max_consecutive_auto_reply=10
)

# 启动对话
user_proxy.initiate_chat(
    assistant,
    message="帮我计算斐波那契数列的第10项"
)

# AutoGen的特点:
# - 多Agent对话
# - 人机协作
# - 代码执行沙箱
# - 灵活的对话模式

框架对比表:

框架优势劣势适用场景
AutoGPT完整的自主系统配置复杂,资源消耗大完全自主的任务
LangChain易用,工具丰富灵活性有限快速原型开发
AutoGen多Agent协作强学习曲线陡峭复杂协作任务
自建完全可控开发成本高定制化需求

ReAct模式

Reasoning + Acting

ReAct(Reasoning and Acting)是一种将推理和行动交织的Agent模式。

核心思想:

Thought → Action → Observation → Thought → Action → ...

ReAct的优势:

  • 可解释性强:每一步都有明确的思考过程
  • 错误恢复:可以根据观察结果调整策略
  • 灵活性高:支持多步推理和工具调用

思维链和工具调用结合

class ReActAgent(Agent):
    """ReAct模式Agent"""

    def __init__(self, llm, tools: List):
        super().__init__(llm, tools)
        self.prompt_template = self._build_prompt_template()

    def _build_prompt_template(self) -> str:
        """构建提示模板"""
        tools_desc = "\n".join([
            f"- {name}: {tool.description}"
            for name, tool in self.tools.items()
        ])

        template = f"""你是一个能够使用工具的AI助手。你可以使用以下工具:

{tools_desc}

使用以下格式回答问题:

Question: 用户的问题
Thought: 你应该思考要做什么
Action: 要使用的工具名称,必须是 [{', '.join(self.tools.keys())}] 之一
Action Input: 工具的输入(JSON格式)
Observation: 工具返回的结果
... (这个 Thought/Action/Action Input/Observation 可以重复N次)
Thought: 我现在知道最终答案了
Final Answer: 对用户问题的最终回答

开始!

Question: {{question}}
{{history}}
Thought:"""

        return template

    def _think(self, state: str) -> Action:
        """ReAct思考过程"""
        # 构建prompt
        prompt = self.prompt_template.format(
            question=state.split("任务: ")[1].split("\n")[0],
            history=self._format_history(state)
        )

        # 调用LLM
        response = self.llm.generate(prompt)

        # 解析响应
        return self._parse_response(response)

    def _parse_response(self, response: str) -> Action:
        """解析LLM响应"""
        lines = response.strip().split('\n')

        action_type = None
        content = ""
        tool_name = None
        tool_input = None

        for line in lines:
            line = line.strip()

            if line.startswith("Thought:"):
                content = line[8:].strip()
                action_type = ActionType.THINK

            elif line.startswith("Action:"):
                tool_name = line[7:].strip()
                action_type = ActionType.TOOL_USE

            elif line.startswith("Action Input:"):
                import json
                try:
                    tool_input = json.loads(line[13:].strip())
                except:
                    tool_input = {"input": line[13:].strip()}

            elif line.startswith("Final Answer:"):
                return Action(
                    ActionType.FINAL_ANSWER,
                    line[13:].strip()
                )

        if action_type == ActionType.TOOL_USE and tool_name:
            return Action(
                ActionType.TOOL_USE,
                content,
                tool_name,
                tool_input
            )

        return Action(ActionType.THINK, content)

    def _format_history(self, state: str) -> str:
        """格式化历史记录"""
        if "历史记录:" not in state:
            return ""

        history_part = state.split("历史记录:\n")[1]
        return history_part

完整实现代码

from typing import Callable, Dict, Any
import json
import re

class Tool:
    """工具基类"""

    def __init__(self, name: str, description: str, func: Callable):
        self.name = name
        self.description = description
        self.func = func

    def run(self, input_data: Any) -> Any:
        """运行工具"""
        return self.func(input_data)

# 定义一些基础工具
def calculator(input_data: Dict) -> float:
    """计算器工具"""
    expression = input_data.get("expression", "")
    try:
        result = eval(expression)
        return result
    except Exception as e:
        return f"计算错误: {str(e)}"

def search(input_data: Dict) -> str:
    """搜索工具(模拟)"""
    query = input_data.get("query", "")
    # 这里应该调用真实的搜索API
    return f"关于'{query}'的搜索结果: [模拟数据]"

def python_repl(input_data: Dict) -> str:
    """Python代码执行"""
    code = input_data.get("code", "")
    try:实际应用中应该在沙箱中执行
        import io
        import sys

        old_stdout = sys.stdout
        sys.stdout = io.StringIO()

        exec(code)

        output = sys.stdout.getvalue()
        sys.stdout = old_stdout

        return output
    except Exception as e:
        return f"执行错误: {str(e)}"

# 创建工具列表
tools = [
    Tool(
        name="Calculator",
        description="用于数学计算。输入格式: {\"expression\": \"2+2\"}",
        func=calculator
    ),
    Tool(
        name="Search",
        description="搜索互联网信息。输入格式: {\"query\": \"搜索内容\"}",
        func=search
    ),
    Tool(
        name="PythonREPL",
        description="执行Python代码。输入格式: {\"code\": \"print(2+2)\"}",
        func=python_repl
    )
]

# 模拟LLM
class MockLLM:
    """模拟LLM(实际应该使用真实的LLM)"""

    def generate(self, prompt: str) -> str:
        """生成响应"""
        # 实际应该调用OpenAI API或其他LLM
        # 这里返回模拟响应用于演示
        if "计算" in prompt or "数学" in prompt:
            return """Thought: 需要使用计算器来解决这个数学问题
Action: Calculator
Action Input: {"expression": "12 * 15"}"""
        else:
            return """Thought: 我需要搜索相关信息
Action: Search
Action Input: {"query": "ReAct Agent"}"""

# 完整示例
def main():
    llm = MockLLM()
    agent = ReActAgent(llm, tools)

    # 运行任务
    result = agent.run("计算 12 乘以 15")
    print(f"最终结果: {result}")

if __name__ == "__main__":
    main()

使用真实LLM的完整实现:

import openai
from typing import List, Dict, Any

class OpenAIReActAgent:
    """基于OpenAI的ReAct Agent"""

    def __init__(self, api_key: str, tools: List[Tool], model: str = "gpt-4"):
        openai.api_key = api_key
        self.model = model
        self.tools = {tool.name: tool for tool in tools}
        self.max_iterations = 10

    def run(self, task: str) -> str:
        """运行Agent"""
        messages = [
            {"role": "system", "content": self._build_system_prompt()},
            {"role": "user", "content": f"Question: {task}"}
        ]

        for i in range(self.max_iterations):
            # 调用LLM
            response = openai.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=0.7,
                max_tokens=500
            )

            assistant_message = response.choices[0].message.content
            messages.append({"role": "assistant", "content": assistant_message})

            # 解析响应
            if "Final Answer:" in assistant_message:
                final_answer = assistant_message.split("Final Answer:")[1].strip()
                return final_answer

            # 提取Action和Action Input
            action_match = re.search(r"Action:\s*(\w+)", assistant_message)
            input_match = re.search(r"Action Input:\s*({.*?})", assistant_message, re.DOTALL)

            if action_match and input_match:
                tool_name = action_match.group(1)
                tool_input_str = input_match.group(1)

                try:
                    tool_input = json.loads(tool_input_str)
                except:
                    tool_input = {"input": tool_input_str}

                # 执行工具
                if tool_name in self.tools:
                    result = self.tools[tool_name].run(tool_input)
                    observation = f"Observation: {result}"
                else:
                    observation = f"Observation: 工具 {tool_name} 不存在"

                messages.append({"role": "user", "content": observation})
            else:
                # 如果无法解析,要求重新格式化
                messages.append({
                    "role": "user",
                    "content": "请按照 Action: 和 Action Input: 的格式回答"
                })

        return "达到最大迭代次数"

    def _build_system_prompt(self) -> str:
        """构建系统提示"""
        tools_desc = "\n".join([
            f"- {name}: {tool.description}"
            for name, tool in self.tools.items()
        ])

        return f"""你是一个能够使用工具的AI助手。严格按照以下格式回答:

可用工具:
{tools_desc}

回答格式:
Question: 用户的问题
Thought: 你的思考过程
Action: 工具名称
Action Input: 工具输入(JSON格式)
Observation: 工具返回的结果
... (可以重复多次)
Thought: 我现在知道最终答案了
Final Answer: 最终答案

重要:必须严格遵循格式!"""

# 使用示例
agent = OpenAIReActAgent(
    api_key="your-api-key",
    tools=tools
)

result = agent.run("帮我计算 (123 + 456) * 789,然后搜索一下这个数字的含义")
print(result)

工具调用(Function Calling)

OpenAI Function Calling API

OpenAI提供了原生的Function Calling功能,让模型能够结构化地调用函数。

import openai
import json

class FunctionCallingAgent:
    """基于OpenAI Function Calling的Agent"""

    def __init__(self, api_key: str, model: str = "gpt-4-turbo-preview"):
        openai.api_key = api_key
        self.model = model
        self.functions = []
        self.available_functions = {}

    def register_function(self, func: Callable, name: str, description: str, parameters: Dict):
        """注册函数"""
        self.functions.append({
            "name": name,
            "description": description,
            "parameters": parameters
        })
        self.available_functions[name] = func

    def run(self, query: str) -> str:
        """运行Agent"""
        messages = [{"role": "user", "content": query}]

        while True:
            # 调用API
            response = openai.chat.completions.create(
                model=self.model,
                messages=messages,
                functions=self.functions,
                function_call="auto"
            )

            response_message = response.choices[0].message

            # 检查是否需要调用函数
            if response_message.function_call:
                # 提取函数调用信息
                function_name = response_message.function_call.name
                function_args = json.loads(response_message.function_call.arguments)

                print(f"调用函数: {function_name}")
                print(f"参数: {function_args}")

                # 执行函数
                function_to_call = self.available_functions[function_name]
                function_response = function_to_call(**function_args)

                # 将函数结果添加到消息
                messages.append(response_message)
                messages.append({
                    "role": "function",
                    "name": function_name,
                    "content": str(function_response)
                })
            else:
                # 没有函数调用,返回最终答案
                return response_message.content

# 定义工具函数
def get_current_weather(location: str, unit: str = "celsius") -> str:
    """获取当前天气"""
    # 模拟天气数据
    weather_data = {
        "北京": {"temperature": 15, "condition": "晴朗"},
        "上海": {"temperature": 20, "condition": "多云"},
        "深圳": {"temperature": 25, "condition": "阴天"}
    }

    if location in weather_data:
        data = weather_data[location]
        return json.dumps({
            "location": location,
            "temperature": data["temperature"],
            "condition": data["condition"],
            "unit": unit
        }, ensure_ascii=False)
    else:
        return json.dumps({"error": "未找到该地点的天气信息"}, ensure_ascii=False)

def search_database(query: str, table: str) -> str:
    """搜索数据库"""
    # 模拟数据库查询
    return json.dumps({
        "results": [
            {"id": 1, "name": "结果1"},
            {"id": 2, "name": "结果2"}
        ]
    }, ensure_ascii=False)

# 使用示例
agent = FunctionCallingAgent(api_key="your-api-key")

# 注册天气查询函数
agent.register_function(
    func=get_current_weather,
    name="get_current_weather",
    description="获取指定地点的当前天气",
    parameters={
        "type": "object",
        "properties": {
            "location": {
                "type": "string",
                "description": "城市名称,例如:北京、上海"
            },
            "unit": {
                "type": "string",
                "enum": ["celsius", "fahrenheit"],
                "description": "温度单位"
            }
        },
        "required": ["location"]
    }
)

# 注册数据库查询函数
agent.register_function(
    func=search_database,
    name="search_database",
    description="在数据库中搜索信息",
    parameters={
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "搜索查询"
            },
            "table": {
                "type": "string",
                "description": "要搜索的表名"
            }
        },
        "required": ["query", "table"]
    }
)

# 执行查询
result = agent.run("北京和上海今天的天气怎么样?")
print(result)

工具描述格式

工具描述遵循JSON Schema格式:

# 完整的工具描述示例
weather_tool = {
    "name": "get_weather",
    "description": "获取指定地点的天气信息,包括温度、湿度、天气状况等",
    "parameters": {
        "type": "object",
        "properties": {
            "location": {
                "type": "string",
                "description": "地点名称,可以是城市名、地区名或坐标"
            },
            "date": {
                "type": "string",
                "description": "日期,格式: YYYY-MM-DD。如果不提供,默认为今天"
            },
            "units": {
                "type": "string",
                "enum": ["metric", "imperial"],
                "description": "单位系统:metric(公制)或imperial(英制)"
            },
            "include_forecast": {
                "type": "boolean",
                "description": "是否包含未来几天的预报"
            }
        },
        "required": ["location"]  # 必需参数
    }
}

# 复杂的嵌套参数
database_query_tool = {
    "name": "query_database",
    "description": "执行数据库查询",
    "parameters": {
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "SQL查询语句"
            },
            "database": {
                "type": "string",
                "description": "数据库名称"
            },
            "options": {
                "type": "object",
                "properties": {
                    "limit": {
                        "type": "integer",
                        "description": "返回结果的最大数量"
                    },
                    "timeout": {
                        "type": "integer",
                        "description": "查询超时时间(秒)"
                    }
                }
            }
        },
        "required": ["query", "database"]
    }
}

参数提取和验证

from pydantic import BaseModel, Field, validator
from typing import Optional, List

class WeatherParams(BaseModel):
    """天气查询参数"""
    location: str = Field(..., description="地点名称")
    date: Optional[str] = Field(None, description="日期 YYYY-MM-DD")
    units: str = Field("metric", description="单位系统")
    include_forecast: bool = Field(False, description="包含预报")

    @validator('date')
    def validate_date(cls, v):
        """验证日期格式"""
        if v is not None:
            import datetime
            try:
                datetime.datetime.strptime(v, '%Y-%m-%d')
            except ValueError:
                raise ValueError('日期格式必须是 YYYY-MM-DD')
        return v

    @validator('units')
    def validate_units(cls, v):
        """验证单位"""
        if v not in ['metric', 'imperial']:
            raise ValueError('units必须是metric或imperial')
        return v

class ToolValidator:
    """工具参数验证器"""

    def __init__(self):
        self.validators = {}

    def register(self, tool_name: str, validator_class: BaseModel):
        """注册验证器"""
        self.validators[tool_name] = validator_class

    def validate(self, tool_name: str, params: Dict) -> BaseModel:
        """验证参数"""
        if tool_name not in self.validators:
            raise ValueError(f"未注册的工具: {tool_name}")

        validator_class = self.validators[tool_name]
        try:
            validated_params = validator_class(**params)
            return validated_params
        except Exception as e:
            raise ValueError(f"参数验证失败: {str(e)}")

# 使用示例
validator = ToolValidator()
validator.register("get_weather", WeatherParams)

# 验证参数
try:
    params = validator.validate("get_weather", {
        "location": "北京",
        "date": "2024-01-15",
        "units": "metric"
    })
    print(f"验证通过: {params}")
except ValueError as e:
    print(f"验证失败: {e}")

结果反馈

from typing import Any, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ToolResult:
    """工具执行结果"""
    success: bool
    result: Any
    error: Optional[str] = None
    execution_time: float = 0.0
    timestamp: datetime = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

class ToolExecutor:
    """工具执行器(带结果反馈)"""

    def __init__(self):
        self.tools = {}
        self.execution_history = []

    def register_tool(self, name: str, func: Callable, validator: Optional[BaseModel] = None):
        """注册工具"""
        self.tools[name] = {
            "func": func,
            "validator": validator
        }

    def execute(self, tool_name: str, params: Dict) -> ToolResult:
        """执行工具"""
        import time

        start_time = time.time()

        if tool_name not in self.tools:
            return ToolResult(
                success=False,
                result=None,
                error=f"工具 {tool_name} 不存在"
            )

        tool = self.tools[tool_name]

        # 参数验证
        if tool["validator"]:
            try:
                validated_params = tool["validator"](**params)
                params = validated_params.dict()
            except Exception as e:
                return ToolResult(
                    success=False,
                    result=None,
                    error=f"参数验证失败: {str(e)}"
                )

        # 执行工具
        try:
            result = tool["func"](**params)
            execution_time = time.time() - start_time

            tool_result = ToolResult(
                success=True,
                result=result,
                execution_time=execution_time
            )
        except Exception as e:
            execution_time = time.time() - start_time
            tool_result = ToolResult(
                success=False,
                result=None,
                error=str(e),
                execution_time=execution_time
            )

        # 记录历史
        self.execution_history.append({
            "tool_name": tool_name,
            "params": params,
            "result": tool_result
        })

        return tool_result

    def get_execution_stats(self) -> Dict:
        """获取执行统计"""
        if not self.execution_history:
            return {}

        total_executions = len(self.execution_history)
        successful = sum(1 for h in self.execution_history if h["result"].success)
        failed = total_executions - successful

        avg_time = sum(h["result"].execution_time for h in self.execution_history) / total_executions

        tool_usage = {}
        for h in self.execution_history:
            tool_name = h["tool_name"]
            tool_usage[tool_name] = tool_usage.get(tool_name, 0) + 1

        return {
            "total_executions": total_executions,
            "successful": successful,
            "failed": failed,
            "success_rate": successful / total_executions,
            "average_execution_time": avg_time,
            "tool_usage": tool_usage
        }

Agent工具生态

搜索工具

import requests
from typing import List, Dict

class GoogleSearchTool:
    """Google搜索工具"""

    def __init__(self, api_key: str, search_engine_id: str):
        self.api_key = api_key
        self.search_engine_id = search_engine_id
        self.base_url = "https://www.googleapis.com/customsearch/v1"

    def search(self, query: str, num_results: int = 5) -> List[Dict]:
        """搜索"""
        params = {
            "key": self.api_key,
            "cx": self.search_engine_id,
            "q": query,
            "num": num_results
        }

        response = requests.get(self.base_url, params=params)
        data = response.json()

        results = []
        for item in data.get("items", []):
            results.append({
                "title": item.get("title"),
                "link": item.get("link"),
                "snippet": item.get("snippet")
            })

        return results

class DuckDuckGoSearchTool:
    """DuckDuckGo搜索工具(无需API key)"""

    def search(self, query: str, num_results: int = 5) -> List[Dict]:
        """搜索"""
        from duckduckgo_search import DDGS

        with DDGS() as ddgs:
            results = list(ddgs.text(query, max_results=num_results))

        return [
            {
                "title": r["title"],
                "link": r["href"],
                "snippet": r["body"]
            }
            for r in results
        ]

# 集成到Agent
class SearchAgent(FunctionCallingAgent):
    """搜索Agent"""

    def __init__(self, api_key: str):
        super().__init__(api_key)

        # 添加搜索工具
        self.search_tool = DuckDuckGoSearchTool()

        self.register_function(
            func=self.search_web,
            name="search_web",
            description="在互联网上搜索信息",
            parameters={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "搜索查询"
                    },
                    "num_results": {
                        "type": "integer",
                        "description": "返回结果数量",
                        "default": 5
                    }
                },
                "required": ["query"]
            }
        )

    def search_web(self, query: str, num_results: int = 5) -> str:
        """搜索网页"""
        results = self.search_tool.search(query, num_results)

        output = f"找到 {len(results)} 个结果:\n\n"
        for i, result in enumerate(results, 1):
            output += f"{i}. {result['title']}\n"
            output += f"   {result['snippet']}\n"
            output += f"   链接: {result['link']}\n\n"

        return output

计算工具

import ast
import operator
import math

class Calculator:
    """安全的计算器"""

    # 允许的操作符
    ALLOWED_OPERATORS = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
        ast.Pow: operator.pow,
        ast.Mod: operator.mod,
        ast.FloorDiv: operator.floordiv,
        ast.UAdd: operator.pos,
        ast.USub: operator.neg,
    }

    # 允许的函数
    ALLOWED_FUNCTIONS = {
        'abs': abs,
        'round': round,
        'min': min,
        'max': max,
        'sum': sum,
        'sqrt': math.sqrt,
        'sin': math.sin,
        'cos': math.cos,
        'tan': math.tan,
        'log': math.log,
        'exp': math.exp,
        'pi': math.pi,
        'e': math.e,
    }

    def calculate(self, expression: str) -> float:
        """计算表达式"""
        try:
            # 解析表达式
            tree = ast.parse(expression, mode='eval')
            result = self._eval_node(tree.body)
            return result
        except Exception as e:
            raise ValueError(f"计算错误: {str(e)}")

    def _eval_node(self, node):
        """递归求值节点"""
        if isinstance(node, ast.Num):
            return node.n
        elif isinstance(node, ast.BinOp):
            op = type(node.op)
            if op not in self.ALLOWED_OPERATORS:
                raise ValueError(f"不允许的操作: {op}")
            left = self._eval_node(node.left)
            right = self._eval_node(node.right)
            return self.ALLOWED_OPERATORS[op](left, right)
        elif isinstance(node, ast.UnaryOp):
            op = type(node.op)
            if op not in self.ALLOWED_OPERATORS:
                raise ValueError(f"不允许的操作: {op}")
            operand = self._eval_node(node.operand)
            return self.ALLOWED_OPERATORS[op](operand)
        elif isinstance(node, ast.Call):
            func_name = node.func.id
            if func_name not in self.ALLOWED_FUNCTIONS:
                raise ValueError(f"不允许的函数: {func_name}")
            args = [self._eval_node(arg) for arg in node.args]
            return self.ALLOWED_FUNCTIONS[func_name](*args)
        elif isinstance(node, ast.Name):
            if node.id in self.ALLOWED_FUNCTIONS:
                return self.ALLOWED_FUNCTIONS[node.id]
            raise ValueError(f"未定义的变量: {node.id}")
        else:
            raise ValueError(f"不支持的节点类型: {type(node)}")

class PythonREPL:
    """Python代码执行器(沙箱)"""

    def __init__(self, timeout: int = 5):
        self.timeout = timeout
        self.globals = {"__builtins__": {}}

        # 允许的内置函数
        safe_builtins = {
            'abs': abs,
            'round': round,
            'len': len,
            'range': range,
            'sum': sum,
            'min': min,
            'max': max,
            'sorted': sorted,
            'print': print,
        }
        self.globals["__builtins__"] = safe_builtins

    def execute(self, code: str) -> str:
        """执行代码"""
        import io
        import sys
        import signal

        # 捕获输出
        old_stdout = sys.stdout
        sys.stdout = io.StringIO()

        def timeout_handler(signum, frame):
            raise TimeoutError("代码执行超时")

        try:
            # 设置超时
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(self.timeout)

            # 执行代码
            exec(code, self.globals)

            # 取消超时
            signal.alarm(0)

            # 获取输出
            output = sys.stdout.getvalue()
            sys.stdout = old_stdout

            return output if output else "执行成功(无输出)"

        except TimeoutError:
            sys.stdout = old_stdout
            return "执行超时"
        except Exception as e:
            sys.stdout = old_stdout
            return f"执行错误: {str(e)}"

数据库查询

import sqlite3
from typing import List, Dict, Any

class DatabaseTool:
    """数据库查询工具"""

    def __init__(self, db_path: str):
        self.db_path = db_path
        self.connection = None

    def connect(self):
        """连接数据库"""
        self.connection = sqlite3.connect(self.db_path)
        self.connection.row_factory = sqlite3.Row

    def close(self):
        """关闭连接"""
        if self.connection:
            self.connection.close()

    def query(self, sql: str, params: tuple = ()) -> List[Dict]:
        """执行查询"""
        if not self.connection:
            self.connect()

        try:
            cursor = self.connection.cursor()
            cursor.execute(sql, params)
            rows = cursor.fetchall()

            # 转换为字典列表
            results = [dict(row) for row in rows]
            return results
        except Exception as e:
            raise ValueError(f"查询错误: {str(e)}")

    def execute(self, sql: str, params: tuple = ()) -> int:
        """执行更新/插入/删除"""
        if not self.connection:
            self.connect()

        try:
            cursor = self.connection.cursor()
            cursor.execute(sql, params)
            self.connection.commit()
            return cursor.rowcount
        except Exception as e:
            self.connection.rollback()
            raise ValueError(f"执行错误: {str(e)}")

    def get_schema(self) -> Dict[str, List[Dict]]:
        """获取数据库schema"""
        schema = {}

        # 获取所有表
        tables = self.query(
            "SELECT name FROM sqlite_master WHERE type='table'"
        )

        for table in tables:
            table_name = table['name']

            # 获取表的列信息
            columns = self.query(f"PRAGMA table_info({table_name})")
            schema[table_name] = columns

        return schema

# SQL生成Agent
class SQLAgent(FunctionCallingAgent):
    """SQL查询Agent"""

    def __init__(self, api_key: str, db_path: str):
        super().__init__(api_key)

        self.db_tool = DatabaseTool(db_path)
        self.db_tool.connect()

        # 获取schema
        self.schema = self.db_tool.get_schema()
        schema_desc = self._format_schema()

        # 注册查询函数
        self.register_function(
            func=self.query_database,
            name="query_database",
            description=f"执行SQL查询。数据库schema:\n{schema_desc}",
            parameters={
                "type": "object",
                "properties": {
                    "sql": {
                        "type": "string",
                        "description": "SQL查询语句"
                    }
                },
                "required": ["sql"]
            }
        )

    def _format_schema(self) -> str:
        """格式化schema"""
        lines = []
        for table_name, columns in self.schema.items():
            lines.append(f"表 {table_name}:")
            for col in columns:
                lines.append(f"  - {col['name']} ({col['type']})")
        return "\n".join(lines)

    def query_database(self, sql: str) -> str:
        """查询数据库"""
        # 安全检查
        sql_lower = sql.lower().strip()
        if any(keyword in sql_lower for keyword in ['drop', 'delete', 'update', 'insert', 'alter']):
            return "错误: 只允许SELECT查询"

        try:
            results = self.db_tool.query(sql)
            return json.dumps(results, ensure_ascii=False, indent=2)
        except Exception as e:
            return f"查询错误: {str(e)}"

API调用

import requests
from typing import Dict, Any, Optional

class APITool:
    """API调用工具"""

    def __init__(self, base_url: str, headers: Optional[Dict] = None):
        self.base_url = base_url
        self.headers = headers or {}

    def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict:
        """GET请求"""
        url = f"{self.base_url}{endpoint}"
        response = requests.get(url, params=params, headers=self.headers)
        response.raise_for_status()
        return response.json()

    def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict:
        """POST请求"""
        url = f"{self.base_url}{endpoint}"
        response = requests.post(url, json=data, headers=self.headers)
        response.raise_for_status()
        return response.json()

# REST API Agent示例
class WeatherAgent(FunctionCallingAgent):
    """天气API Agent"""

    def __init__(self, api_key: str, weather_api_key: str):
        super().__init__(api_key)

        self.weather_api = APITool(
            base_url="https://api.openweathermap.org/data/2.5",
            headers={"Authorization": f"Bearer {weather_api_key}"}
        )

        self.register_function(
            func=self.get_weather,
            name="get_weather",
            description="获取城市天气信息",
            parameters={
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "城市名称"
                    },
                    "units": {
                        "type": "string",
                        "enum": ["metric", "imperial"],
                        "description": "单位系统"
                    }
                },
                "required": ["city"]
            }
        )

    def get_weather(self, city: str, units: str = "metric") -> str:
        """获取天气"""
        try:
            data = self.weather_api.get("/weather", {
                "q": city,
                "units": units
            })

            weather = {
                "city": data["name"],
                "temperature": data["main"]["temp"],
                "feels_like": data["main"]["feels_like"],
                "humidity": data["main"]["humidity"],
                "description": data["weather"][0]["description"]
            }

            return json.dumps(weather, ensure_ascii=False)
        except Exception as e:
            return f"获取天气失败: {str(e)}"

文件操作

import os
import shutil
from pathlib import Path
from typing import List

class FileOperationTool:
    """文件操作工具"""

    def __init__(self, workspace: str = "./workspace"):
        self.workspace = Path(workspace)
        self.workspace.mkdir(exist_ok=True)

    def _get_safe_path(self, file_path: str) -> Path:
        """获取安全的文件路径"""
        full_path = (self.workspace / file_path).resolve()

        # 确保路径在workspace内
        if not str(full_path).startswith(str(self.workspace.resolve())):
            raise ValueError("路径必须在workspace内")

        return full_path

    def read_file(self, file_path: str) -> str:
        """读取文件"""
        path = self._get_safe_path(file_path)

        if not path.exists():
            return f"文件不存在: {file_path}"

        try:
            with open(path, 'r', encoding='utf-8') as f:
                return f.read()
        except Exception as e:
            return f"读取失败: {str(e)}"

    def write_file(self, file_path: str, content: str) -> str:
        """写入文件"""
        path = self._get_safe_path(file_path)

        try:
            path.parent.mkdir(parents=True, exist_ok=True)
            with open(path, 'w', encoding='utf-8') as f:
                f.write(content)
            return f"成功写入 {file_path}"
        except Exception as e:
            return f"写入失败: {str(e)}"

    def list_files(self, directory: str = ".") -> List[str]:
        """列出文件"""
        path = self._get_safe_path(directory)

        if not path.exists():
            return []

        files = []
        for item in path.rglob("*"):
            if item.is_file():
                rel_path = item.relative_to(self.workspace)
                files.append(str(rel_path))

        return files

    def delete_file(self, file_path: str) -> str:
        """删除文件"""
        path = self._get_safe_path(file_path)

        if not path.exists():
            return f"文件不存在: {file_path}"

        try:
            if path.is_file():
                path.unlink()
            else:
                shutil.rmtree(path)
            return f"成功删除 {file_path}"
        except Exception as e:
            return f"删除失败: {str(e)}"

    def create_directory(self, directory: str) -> str:
        """创建目录"""
        path = self._get_safe_path(directory)

        try:
            path.mkdir(parents=True, exist_ok=True)
            return f"成功创建目录 {directory}"
        except Exception as e:
            return f"创建失败: {str(e)}"

多Agent协作

Agent通信协议

from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum

class MessageType(Enum):
    """消息类型"""
    REQUEST = "request"  # 请求
    RESPONSE = "response"  # 响应
    BROADCAST = "broadcast"  # 广播
    NOTIFICATION = "notification"  # 通知

@dataclass
class Message:
    """Agent间消息"""
    sender: str
    receiver: str
    message_type: MessageType
    content: Any
    message_id: str = None
    reply_to: str = None
    timestamp: float = None

    def __post_init__(self):
        import time
        import uuid

        if self.message_id is None:
            self.message_id = str(uuid.uuid4())
        if self.timestamp is None:
            self.timestamp = time.time()

class MessageBus:
    """消息总线"""

    def __init__(self):
        self.subscribers = {}  # {agent_name: agent}
        self.message_queue = []
        self.message_history = []

    def register(self, agent_name: str, agent):
        """注册Agent"""
        self.subscribers[agent_name] = agent

    def unregister(self, agent_name: str):
        """注销Agent"""
        self.subscribers.pop(agent_name, None)

    def send(self, message: Message):
        """发送消息"""
        self.message_history.append(message)

        if message.message_type == MessageType.BROADCAST:
            # 广播给所有Agent
            for agent_name, agent in self.subscribers.items():
                if agent_name != message.sender:
                    agent.receive_message(message)
        else:
            # 发送给特定Agent
            receiver = self.subscribers.get(message.receiver)
            if receiver:
                receiver.receive_message(message)

    def get_history(self, agent_name: str = None) -> List[Message]:
        """获取消息历史"""
        if agent_name:
            return [
                msg for msg in self.message_history
                if msg.sender == agent_name or msg.receiver == agent_name
            ]
        return self.message_history

任务分解和分配

from typing import List, Dict, Any

class Task:
    """任务对象"""

    def __init__(self, task_id: str, description: str, dependencies: List[str] = None):
        self.task_id = task_id
        self.description = description
        self.dependencies = dependencies or []
        self.status = "pending"  # pending, in_progress, completed, failed
        self.result = None
        self.assigned_to = None

class TaskDecomposer:
    """任务分解器"""

    def __init__(self, llm):
        self.llm = llm

    def decompose(self, task_description: str) -> List[Task]:
        """分解任务"""
        prompt = f"""将以下任务分解为多个子任务。每个子任务应该是独立且可执行的。

任务: {task_description}

以JSON格式返回子任务列表:
[
  {{"id": "task1", "description": "...", "dependencies": []}},
  {{"id": "task2", "description": "...", "dependencies": ["task1"]}},
  ...
]

子任务列表:"""

        response = self.llm.generate(prompt, temperature=0.3)

        # 解析JSON
        import json
        import re

        json_match = re.search(r'\[.*\]', response, re.DOTALL)
        if json_match:
            subtasks_data = json.loads(json_match.group())
            tasks = [
                Task(
                    task_id=t["id"],
                    description=t["description"],
                    dependencies=t.get("dependencies", [])
                )
                for t in subtasks_data
            ]
            return tasks

        # 如果解析失败,返回单个任务
        return [Task("task1", task_description)]

class TaskScheduler:
    """任务调度器"""

    def __init__(self):
        self.tasks = {}
        self.execution_order = []

    def add_tasks(self, tasks: List[Task]):
        """添加任务"""
        for task in tasks:
            self.tasks[task.task_id] = task

        # 计算执行顺序(拓扑排序)
        self.execution_order = self._topological_sort()

    def _topological_sort(self) -> List[str]:
        """拓扑排序"""
        # 计算入度
        in_degree = {task_id: 0 for task_id in self.tasks}
        for task in self.tasks.values():
            for dep in task.dependencies:
                if dep in in_degree:
                    in_degree[task.task_id] += 1

        # 队列
        queue = [task_id for task_id, degree in in_degree.items() if degree == 0]
        result = []

        while queue:
            task_id = queue.pop(0)
            result.append(task_id)

            # 更新依赖此任务的其他任务
            for other_task in self.tasks.values():
                if task_id in other_task.dependencies:
                    in_degree[other_task.task_id] -= 1
                    if in_degree[other_task.task_id] == 0:
                        queue.append(other_task.task_id)

        return result

    def get_next_task(self) -> Optional[Task]:
        """获取下一个可执行任务"""
        for task_id in self.execution_order:
            task = self.tasks[task_id]

            if task.status != "pending":
                continue

            # 检查依赖是否完成
            deps_completed = all(
                self.tasks[dep].status == "completed"
                for dep in task.dependencies
                if dep in self.tasks
            )

            if deps_completed:
                return task

        return None

    def all_completed(self) -> bool:
        """是否所有任务完成"""
        return all(task.status in ["completed", "failed"] for task in self.tasks.values())

AutoGen框架

import autogen
from typing import List, Dict, Any

class AutoGenMultiAgent:
    """基于AutoGen的多Agent系统"""

    def __init__(self, config_list: List[Dict]):
        self.config_list = config_list
        self.agents = {}

    def create_assistant(self, name: str, system_message: str) -> autogen.AssistantAgent:
        """创建助手Agent"""
        assistant = autogen.AssistantAgent(
            name=name,
            system_message=system_message,
            llm_config={
                "config_list": self.config_list,
                "temperature": 0.7
            }
        )
        self.agents[name] = assistant
        return assistant

    def create_user_proxy(self, name: str, **kwargs) -> autogen.UserProxyAgent:
        """创建用户代理"""
        user_proxy = autogen.UserProxyAgent(
            name=name,
            **kwargs
        )
        self.agents[name] = user_proxy
        return user_proxy

    def create_group_chat(self, agents: List, max_round: int = 10):
        """创建群聊"""
        groupchat = autogen.GroupChat(
            agents=agents,
            messages=[],
            max_round=max_round
        )

        manager = autogen.GroupChatManager(
            groupchat=groupchat,
            llm_config={"config_list": self.config_list}
        )

        return manager

# 使用示例
def research_task_example():
    """研究任务示例"""
    config_list = [
        {
            "model": "gpt-4",
            "api_key": "your-api-key"
        }
    ]

    system = AutoGenMultiAgent(config_list)

    # 创建研究员Agent
    researcher = system.create_assistant(
        name="researcher",
        system_message="你是一个研究员,负责搜索和收集信息。"
    )

    # 创建分析师Agent
    analyst = system.create_assistant(
        name="analyst",
        system_message="你是一个分析师,负责分析和总结信息。"
    )

    # 创建撰稿人Agent
    writer = system.create_assistant(
        name="writer",
        system_message="你是一个撰稿人,负责撰写报告。"
    )

    # 创建用户代理
    user = system.create_user_proxy(
        name="user",
        human_input_mode="NEVER",
        max_consecutive_auto_reply=0,
        code_execution_config=False
    )

    # 创建群聊
    manager = system.create_group_chat(
        agents=[user, researcher, analyst, writer],
        max_round=12
    )

    # 启动任务
    user.initiate_chat(
        manager,
        message="请研究并撰写一份关于'Transformer架构'的技术报告"
    )

MetaGPT实现

from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod

class Role(ABC):
    """角色基类"""

    def __init__(self, name: str, profile: str, llm):
        self.name = name
        self.profile = profile
        self.llm = llm
        self.memory = []

    @abstractmethod
    async def act(self, message: str) -> str:
        """执行动作"""
        pass

    def observe(self, message: Message):
        """观察消息"""
        self.memory.append(message)

class ProductManager(Role):
    """产品经理角色"""

    async def act(self, message: str) -> str:
        """撰写PRD"""
        prompt = f"""作为产品经理,基于以下需求撰写产品需求文档(PRD)。

用户需求: {message}

PRD应包含:
1. 产品目标
2. 用户故事
3. 功能列表
4. 优先级
5. 验收标准

PRD:"""

        prd = self.llm.generate(prompt, temperature=0.5)
        return prd

class Architect(Role):
    """架构师角色"""

    async def act(self, message: str) -> str:
        """设计架构"""
        prompt = f"""作为架构师,基于以下PRD设计系统架构。

PRD: {message}

架构设计应包含:
1. 系统架构图
2. 技术栈选择
3. 数据模型
4. API设计
5. 部署方案

架构设计:"""

        design = self.llm.generate(prompt, temperature=0.3)
        return design

class Engineer(Role):
    """工程师角色"""

    async def act(self, message: str) -> str:
        """编写代码"""
        prompt = f"""作为工程师,基于以下架构设计编写代码。

架构设计: {message}

请编写主要代码文件和实现关键功能。

代码:"""

        code = self.llm.generate(prompt, temperature=0.2)
        return code

class QAEngineer(Role):
    """测试工程师角色"""

    async def act(self, message: str) -> str:
        """编写测试"""
        prompt = f"""作为QA工程师,基于以下代码编写测试用例。

代码: {message}

测试用例应包含:
1. 单元测试
2. 集成测试
3. 边界条件测试

测试代码:"""

        tests = self.llm.generate(prompt, temperature=0.2)
        return tests

class SoftwareCompany:
    """软件公司(MetaGPT风格)"""

    def __init__(self, llm):
        self.llm = llm
        self.roles = []
        self.message_bus = MessageBus()

        # 初始化角色
        self._init_roles()

    def _init_roles(self):
        """初始化角色"""
        self.pm = ProductManager("Alice", "Product Manager", self.llm)
        self.architect = Architect("Bob", "Architect", self.llm)
        self.engineer = Engineer("Charlie", "Engineer", self.llm)
        self.qa = QAEngineer("David", "QA Engineer", self.llm)

        self.roles = [self.pm, self.architect, self.engineer, self.qa]

    async def run_project(self, requirement: str) -> Dict[str, str]:
        """运行项目"""
        results = {}

        # 1. PM撰写PRD
        print("产品经理正在撰写PRD...")
        prd = await self.pm.act(requirement)
        results["prd"] = prd

        # 2. 架构师设计架构
        print("架构师正在设计架构...")
        design = await self.architect.act(prd)
        results["design"] = design

        # 3. 工程师编写代码
        print("工程师正在编写代码...")
        code = await self.engineer.act(design)
        results["code"] = code

        # 4. QA编写测试
        print("QA正在编写测试...")
        tests = await self.qa.act(code)
        results["tests"] = tests

        return results

# 使用示例
async def main():
    from your_llm_module import LLM

    llm = LLM(api_key="your-api-key")
    company = SoftwareCompany(llm)

    results = await company.run_project(
        "开发一个待办事项管理应用"
    )

    print("\n=== PRD ===")
    print(results["prd"])
    print("\n=== 架构设计 ===")
    print(results["design"])
    print("\n=== 代码 ===")
    print(results["code"])
    print("\n=== 测试 ===")
    print(results["tests"])

# 运行
import asyncio
asyncio.run(main())

从零实现Agent系统

这里提供一个完整的、生产级的Agent系统实现(3000+行代码的核心部分):

"""
完整的Agent系统实现
包含:工具注册、任务规划、执行引擎、记忆管理
"""

import json
import logging
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from enum import Enum
import time
from datetime import datetime

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ============ 1. 工具系统 ============

class ToolRegistry:
    """工具注册中心"""

    def __init__(self):
        self.tools = {}
        self.tool_metadata = {}

    def register(
        self,
        name: str,
        func: Callable,
        description: str,
        parameters: Dict,
        returns: Dict = None
    ):
        """注册工具"""
        self.tools[name] = func
        self.tool_metadata[name] = {
            "name": name,
            "description": description,
            "parameters": parameters,
            "returns": returns or {}
        }
        logger.info(f"注册工具: {name}")

    def get_tool(self, name: str) -> Optional[Callable]:
        """获取工具"""
        return self.tools.get(name)

    def get_metadata(self, name: str) -> Optional[Dict]:
        """获取工具元数据"""
        return self.tool_metadata.get(name)

    def list_tools(self) -> List[str]:
        """列出所有工具"""
        return list(self.tools.keys())

    def get_tools_description(self) -> str:
        """获取所有工具的描述"""
        descriptions = []
        for name, metadata in self.tool_metadata.items():
            desc = f"- {name}: {metadata['description']}\n"
            desc += f"  参数: {json.dumps(metadata['parameters'], ensure_ascii=False)}"
            descriptions.append(desc)
        return "\n".join(descriptions)

# ============ 2. 记忆系统 ============

@dataclass
class Memory:
    """记忆对象"""
    content: str
    timestamp: float = field(default_factory=time.time)
    metadata: Dict = field(default_factory=dict)
    importance: float = 0.5

class MemoryManager:
    """记忆管理器"""

    def __init__(self, max_short_term: int = 10, max_long_term: int = 100):
        self.short_term = []  # 短期记忆
        self.long_term = []  # 长期记忆
        self.max_short_term = max_short_term
        self.max_long_term = max_long_term

    def add(self, content: str, importance: float = 0.5, metadata: Dict = None):
        """添加记忆"""
        memory = Memory(
            content=content,
            importance=importance,
            metadata=metadata or {}
        )

        self.short_term.append(memory)

        # 短期记忆满了,转移到长期记忆
        if len(self.short_term) > self.max_short_term:
            old_memory = self.short_term.pop(0)
            if old_memory.importance > 0.6:  # 重要的记忆保存到长期
                self.long_term.append(old_memory)

        # 长期记忆满了,删除最不重要的
        if len(self.long_term) > self.max_long_term:
            self.long_term.sort(key=lambda m: m.importance)
            self.long_term = self.long_term[10:]  # 删除最不重要的10个

    def get_recent(self, n: int = 5) -> List[Memory]:
        """获取最近的记忆"""
        return self.short_term[-n:]

    def search(self, query: str, n: int = 5) -> List[Memory]:
        """搜索相关记忆(简单的关键词匹配)"""
        all_memories = self.short_term + self.long_term

        # 计算相关性
        scored_memories = []
        for memory in all_memories:
            score = sum(1 for word in query.split() if word in memory.content)
            if score > 0:
                scored_memories.append((memory, score))

        # 排序并返回top n
        scored_memories.sort(key=lambda x: x[1], reverse=True)
        return [m for m, s in scored_memories[:n]]

    def get_summary(self) -> str:
        """获取记忆摘要"""
        recent = self.get_recent(5)
        summary = "最近的记忆:\n"
        for i, memory in enumerate(recent, 1):
            summary += f"{i}. {memory.content}\n"
        return summary

# ============ 3. 规划系统 ============

@dataclass
class Plan:
    """计划对象"""
    goal: str
    steps: List[Dict]
    current_step: int = 0
    status: str = "pending"  # pending, in_progress, completed, failed

    def get_next_step(self) -> Optional[Dict]:
        """获取下一步"""
        if self.current_step < len(self.steps):
            return self.steps[self.current_step]
        return None

    def complete_step(self):
        """完成当前步骤"""
        self.current_step += 1
        if self.current_step >= len(self.steps):
            self.status = "completed"

class Planner:
    """规划器"""

    def __init__(self, llm, tool_registry: ToolRegistry):
        self.llm = llm
        self.tool_registry = tool_registry

    def create_plan(self, goal: str, context: str = "") -> Plan:
        """创建计划"""
        tools_desc = self.tool_registry.get_tools_description()

        prompt = f"""你是一个规划专家。基于目标和可用工具,制定详细的执行计划。

目标: {goal}

上下文: {context}

可用工具:
{tools_desc}

请以JSON格式返回计划,格式如下:
{{
  "steps": [
    {{"step": 1, "action": "使用工具X", "tool": "tool_name", "params": {{}}, "reason": "原因"}},
    {{"step": 2, "action": "...", "tool": "...", "params": {{}}, "reason": "..."}},
    ...
  ]
}}

计划:"""

        response = self.llm.generate(prompt, temperature=0.3)

        # 解析JSON
        import re
        json_match = re.search(r'\{.*\}', response, re.DOTALL)
        if json_match:
            try:
                plan_data = json.loads(json_match.group())
                return Plan(goal=goal, steps=plan_data["steps"])
            except:
                pass

        # 如果解析失败,返回简单计划
        return Plan(
            goal=goal,
            steps=[{"step": 1, "action": "直接回答", "tool": None, "params": {}}]
        )

# ============ 4. 执行引擎 ============

class ExecutionEngine:
    """执行引擎"""

    def __init__(
        self,
        tool_registry: ToolRegistry,
        memory_manager: MemoryManager,
        max_retries: int = 3
    ):
        self.tool_registry = tool_registry
        self.memory = memory_manager
        self.max_retries = max_retries

    def execute_step(self, step: Dict) -> Dict:
        """执行单个步骤"""
        tool_name = step.get("tool")

        if not tool_name:
            return {
                "success": True,
                "result": "无需工具执行",
                "step": step
            }

        tool_func = self.tool_registry.get_tool(tool_name)
        if not tool_func:
            return {
                "success": False,
                "error": f"工具 {tool_name} 不存在",
                "step": step
            }

        params = step.get("params", {})

        # 执行工具(带重试)
        for attempt in range(self.max_retries):
            try:
                result = tool_func(**params)

                # 记录到记忆
                self.memory.add(
                    f"执行 {tool_name}: {step.get('action')} -> {result}",
                    importance=0.7,
                    metadata={"tool": tool_name, "step": step}
                )

                return {
                    "success": True,
                    "result": result,
                    "step": step
                }
            except Exception as e:
                logger.warning(f"执行失败 (尝试 {attempt + 1}/{self.max_retries}): {str(e)}")
                if attempt == self.max_retries - 1:
                    return {
                        "success": False,
                        "error": str(e),
                        "step": step
                    }
                time.sleep(1)  # 重试前等待

# ============ 5. Agent核心 ============

class AgentCore:
    """Agent核心"""

    def __init__(
        self,
        name: str,
        llm,
        tool_registry: ToolRegistry = None,
        memory_manager: MemoryManager = None
    ):
        self.name = name
        self.llm = llm

        self.tool_registry = tool_registry or ToolRegistry()
        self.memory = memory_manager or MemoryManager()
        self.planner = Planner(llm, self.tool_registry)
        self.executor = ExecutionEngine(self.tool_registry, self.memory)

        self.conversation_history = []
        self.current_plan = None

    def register_tool(self, name: str, func: Callable, description: str, parameters: Dict):
        """注册工具"""
        self.tool_registry.register(name, func, description, parameters)

    def run(self, task: str, max_iterations: int = 10) -> str:
        """运行Agent"""
        logger.info(f"Agent {self.name} 开始执行任务: {task}")

        # 添加到对话历史
        self.conversation_history.append({"role": "user", "content": task})

        # 创建计划
        context = self.memory.get_summary()
        self.current_plan = self.planner.create_plan(task, context)

        logger.info(f"创建计划,共 {len(self.current_plan.steps)} 步")

        # 执行计划
        results = []
        for i in range(max_iterations):
            step = self.current_plan.get_next_step()

            if not step:
                # 计划完成
                break

            logger.info(f"执行步骤 {self.current_plan.current_step + 1}: {step.get('action')}")

            # 执行步骤
            result = self.executor.execute_step(step)
            results.append(result)

            if result["success"]:
                self.current_plan.complete_step()
            else:
                logger.error(f"步骤执行失败: {result.get('error')}")
                # 可以选择重新规划或终止
                break

        # 生成最终回答
        final_answer = self._generate_final_answer(task, results)

        # 添加到对话历史和记忆
        self.conversation_history.append({"role": "assistant", "content": final_answer})
        self.memory.add(f"任务: {task}\n回答: {final_answer}", importance=0.8)

        return final_answer

    def _generate_final_answer(self, task: str, results: List[Dict]) -> str:
        """生成最终回答"""
        results_summary = "\n".join([
            f"- {r['step'].get('action')}: {r.get('result', r.get('error'))}"
            for r in results
        ])

        prompt = f"""基于任务和执行结果,生成最终回答。

任务: {task}

执行结果:
{results_summary}

回忆:
{self.memory.get_summary()}

请生成一个完整、准确的回答:"""

        answer = self.llm.generate(prompt, temperature=0.7)
        return answer

# ============ 使用示例 ============

def main():
    # 模拟LLM
    class SimpleLLM:
        def generate(self, prompt: str, temperature: float = 0.7) -> str:
            # 实际应该调用真实的LLM API
            return '{"steps": [{"step": 1, "action": "计算结果", "tool": "calculator", "params": {"expression": "2+2"}, "reason": "需要计算"}]}'

    # 创建Agent
    llm = SimpleLLM()
    agent = AgentCore(name="MathAgent", llm=llm)

    # 注册工具
    def calculator(expression: str) -> float:
        return eval(expression)

    agent.register_tool(
        name="calculator",
        func=calculator,
        description="执行数学计算",
        parameters={
            "type": "object",
            "properties": {
                "expression": {"type": "string", "description": "数学表达式"}
            },
            "required": ["expression"]
        }
    )

    # 执行任务
    result = agent.run("计算 2 + 2")
    print(f"结果: {result}")

    # 查看记忆
    print(f"\n记忆: {agent.memory.get_summary()}")

if __name__ == "__main__":
    main()
Prev
15-RAG系统设计与实现
Next
17-多模态大模型