HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

Agent 架构设计

概述

LLM Agent 是能够自主规划、执行任务并与外部工具交互的智能系统。本章深入讲解 Agent 的核心架构、规划算法、工具调用机制和记忆系统设计。

Agent 架构概览

核心组件

┌─────────────────────────────────────────────────────────────────────────────┐
│                           LLM Agent Architecture                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                              ┌─────────────┐                                │
│                              │    User     │                                │
│                              │   Input     │                                │
│                              └──────┬──────┘                                │
│                                     │                                       │
│                                     ▼                                       │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │                          Agent Core                                    │ │
│  │  ┌─────────────────────────────────────────────────────────────────┐  │ │
│  │  │                        Planner                                   │  │ │
│  │  │                                                                  │  │ │
│  │  │   Task ──▶ [Decompose] ──▶ [Plan] ──▶ [Select Action]           │  │ │
│  │  │                                                                  │  │ │
│  │  └──────────────────────────────┬──────────────────────────────────┘  │ │
│  │                                 │                                      │ │
│  │  ┌──────────────────────────────┴──────────────────────────────────┐  │ │
│  │  │                       Executor                                   │  │ │
│  │  │                                                                  │  │ │
│  │  │   Action ──▶ [Tool Selection] ──▶ [Execution] ──▶ [Observe]     │  │ │
│  │  │                                                                  │  │ │
│  │  └──────────────────────────────┬──────────────────────────────────┘  │ │
│  └─────────────────────────────────┼─────────────────────────────────────┘ │
│                                    │                                       │
│         ┌──────────────────────────┼──────────────────────────────┐       │
│         │                          │                              │       │
│         ▼                          ▼                              ▼       │
│  ┌─────────────┐           ┌─────────────┐               ┌─────────────┐ │
│  │   Memory    │           │    Tools    │               │  Knowledge  │ │
│  │   System    │           │   Library   │               │    Base     │ │
│  │             │           │             │               │             │ │
│  │ - Short-term│           │ - Search    │               │ - RAG       │ │
│  │ - Long-term │           │ - Code Exec │               │ - Vector DB │ │
│  │ - Episodic  │           │ - API Call  │               │ - KG        │ │
│  │ - Semantic  │           │ - Browser   │               │             │ │
│  └─────────────┘           └─────────────┘               └─────────────┘ │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Agent 类型对比

┌─────────────────────────────────────────────────────────────────────────────┐
│                        Agent Types Comparison                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  1. ReAct Agent (Reasoning + Acting)                                        │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │  Thought ──▶ Action ──▶ Observation ──▶ Thought ──▶ ...              │ │
│  │                                                                       │ │
│  │  特点: 思考与行动交替,透明可解释                                       │ │
│  │  适用: 复杂推理任务,需要可解释性                                       │ │
│  └───────────────────────────────────────────────────────────────────────┘ │
│                                                                             │
│  2. Plan-and-Execute Agent                                                  │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │  Plan [Step1, Step2, ...] ──▶ Execute Step1 ──▶ Execute Step2 ──▶ ...│ │
│  │                                                                       │ │
│  │  特点: 先规划后执行,适合长任务                                         │ │
│  │  适用: 多步骤复杂任务                                                  │ │
│  └───────────────────────────────────────────────────────────────────────┘ │
│                                                                             │
│  3. Function Calling Agent                                                  │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │  Query ──▶ LLM (with tools) ──▶ Function Call ──▶ Result ──▶ Answer  │ │
│  │                                                                       │ │
│  │  特点: 依赖模型原生能力,调用简洁                                       │ │
│  │  适用: OpenAI/Claude 等支持 Function Calling 的模型                    │ │
│  └───────────────────────────────────────────────────────────────────────┘ │
│                                                                             │
│  4. Multi-Agent System                                                      │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │       ┌─────────┐      ┌─────────┐      ┌─────────┐                  │ │
│  │       │ Agent A │ ───▶ │ Agent B │ ───▶ │ Agent C │                  │ │
│  │       │(Planner)│      │(Executor)│     │(Reviewer)│                  │ │
│  │       └─────────┘      └─────────┘      └─────────┘                  │ │
│  │                                                                       │ │
│  │  特点: 多Agent协作,专业化分工                                         │ │
│  │  适用: 复杂项目,需要多种专业能力                                       │ │
│  └───────────────────────────────────────────────────────────────────────┘ │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

核心组件实现

工具系统

"""
工具系统实现
"""
from typing import Any, Dict, List, Callable, Optional, Union
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import json
import inspect

@dataclass
class ToolParameter:
    """工具参数定义"""
    name: str
    type: str
    description: str
    required: bool = True
    default: Any = None
    enum: List[Any] = None

@dataclass
class Tool:
    """工具定义"""
    name: str
    description: str
    parameters: List[ToolParameter]
    function: Callable
    returns: str = "string"
    examples: List[Dict] = field(default_factory=list)

    def to_openai_schema(self) -> Dict:
        """转换为 OpenAI Function Calling 格式"""
        properties = {}
        required = []

        for param in self.parameters:
            prop = {
                "type": param.type,
                "description": param.description
            }
            if param.enum:
                prop["enum"] = param.enum

            properties[param.name] = prop

            if param.required:
                required.append(param.name)

        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": {
                    "type": "object",
                    "properties": properties,
                    "required": required
                }
            }
        }

    def to_anthropic_schema(self) -> Dict:
        """转换为 Anthropic Tool Use 格式"""
        properties = {}
        required = []

        for param in self.parameters:
            properties[param.name] = {
                "type": param.type,
                "description": param.description
            }
            if param.required:
                required.append(param.name)

        return {
            "name": self.name,
            "description": self.description,
            "input_schema": {
                "type": "object",
                "properties": properties,
                "required": required
            }
        }

    async def execute(self, **kwargs) -> Any:
        """执行工具"""
        # 验证参数
        self._validate_params(kwargs)

        # 执行
        if inspect.iscoroutinefunction(self.function):
            result = await self.function(**kwargs)
        else:
            result = self.function(**kwargs)

        return result

    def _validate_params(self, kwargs: Dict):
        """验证参数"""
        for param in self.parameters:
            if param.required and param.name not in kwargs:
                raise ValueError(f"Missing required parameter: {param.name}")


class ToolRegistry:
    """工具注册表"""

    def __init__(self):
        self.tools: Dict[str, Tool] = {}

    def register(self, tool: Tool):
        """注册工具"""
        self.tools[tool.name] = tool

    def register_function(
        self,
        name: str = None,
        description: str = None
    ):
        """装饰器方式注册工具"""
        def decorator(func: Callable):
            tool_name = name or func.__name__
            tool_desc = description or func.__doc__ or ""

            # 从函数签名提取参数
            sig = inspect.signature(func)
            params = []

            for param_name, param in sig.parameters.items():
                param_type = "string"
                if param.annotation != inspect.Parameter.empty:
                    if param.annotation == int:
                        param_type = "integer"
                    elif param.annotation == float:
                        param_type = "number"
                    elif param.annotation == bool:
                        param_type = "boolean"
                    elif param.annotation == list:
                        param_type = "array"

                params.append(ToolParameter(
                    name=param_name,
                    type=param_type,
                    description="",
                    required=param.default == inspect.Parameter.empty
                ))

            tool = Tool(
                name=tool_name,
                description=tool_desc,
                parameters=params,
                function=func
            )

            self.register(tool)
            return func

        return decorator

    def get(self, name: str) -> Optional[Tool]:
        return self.tools.get(name)

    def list_tools(self) -> List[Tool]:
        return list(self.tools.values())

    def get_schemas(self, format: str = "openai") -> List[Dict]:
        """获取所有工具的 schema"""
        schemas = []
        for tool in self.tools.values():
            if format == "openai":
                schemas.append(tool.to_openai_schema())
            elif format == "anthropic":
                schemas.append(tool.to_anthropic_schema())
        return schemas


# 内置工具实现
class BuiltinTools:
    """内置工具集"""

    @staticmethod
    def create_search_tool(search_engine) -> Tool:
        """创建搜索工具"""
        async def search(query: str, num_results: int = 5) -> str:
            results = await search_engine.search(query, num_results)
            return json.dumps(results, ensure_ascii=False)

        return Tool(
            name="web_search",
            description="Search the web for information. Use this when you need current information or facts.",
            parameters=[
                ToolParameter("query", "string", "The search query"),
                ToolParameter("num_results", "integer", "Number of results to return", required=False)
            ],
            function=search
        )

    @staticmethod
    def create_calculator_tool() -> Tool:
        """创建计算器工具"""
        def calculate(expression: str) -> str:
            try:
                # 安全执行数学表达式
                allowed_names = {
                    'abs': abs, 'round': round, 'min': min, 'max': max,
                    'sum': sum, 'pow': pow, 'sqrt': __import__('math').sqrt,
                    'sin': __import__('math').sin, 'cos': __import__('math').cos,
                    'tan': __import__('math').tan, 'log': __import__('math').log,
                    'pi': __import__('math').pi, 'e': __import__('math').e
                }
                result = eval(expression, {"__builtins__": {}}, allowed_names)
                return str(result)
            except Exception as e:
                return f"Error: {str(e)}"

        return Tool(
            name="calculator",
            description="Perform mathematical calculations. Input should be a valid mathematical expression.",
            parameters=[
                ToolParameter("expression", "string", "Mathematical expression to evaluate")
            ],
            function=calculate
        )

    @staticmethod
    def create_code_executor_tool(sandbox_config: Dict = None) -> Tool:
        """创建代码执行工具"""
        async def execute_code(code: str, language: str = "python") -> str:
            # 使用沙箱执行代码
            import subprocess
            import tempfile
            import os

            if language == "python":
                with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
                    f.write(code)
                    temp_file = f.name

                try:
                    result = subprocess.run(
                        ['python', temp_file],
                        capture_output=True,
                        text=True,
                        timeout=30
                    )
                    output = result.stdout if result.returncode == 0 else result.stderr
                    return output[:2000]  # 限制输出长度
                finally:
                    os.unlink(temp_file)
            else:
                return f"Unsupported language: {language}"

        return Tool(
            name="code_executor",
            description="Execute code and return the output. Use for calculations, data processing, etc.",
            parameters=[
                ToolParameter("code", "string", "The code to execute"),
                ToolParameter("language", "string", "Programming language (default: python)", required=False)
            ],
            function=execute_code
        )

ReAct Agent 实现

"""
ReAct Agent 实现
"""
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import re

class AgentState(Enum):
    THINKING = "thinking"
    ACTING = "acting"
    OBSERVING = "observing"
    FINISHED = "finished"
    ERROR = "error"

@dataclass
class AgentStep:
    """Agent 执行步骤"""
    step_num: int
    thought: str
    action: Optional[str] = None
    action_input: Optional[Dict] = None
    observation: Optional[str] = None
    state: AgentState = AgentState.THINKING

@dataclass
class AgentResult:
    """Agent 执行结果"""
    success: bool
    output: str
    steps: List[AgentStep]
    total_tokens: int = 0
    error: Optional[str] = None


class ReActAgent:
    """ReAct Agent"""

    def __init__(
        self,
        llm,
        tools: ToolRegistry,
        max_steps: int = 10,
        verbose: bool = True
    ):
        self.llm = llm
        self.tools = tools
        self.max_steps = max_steps
        self.verbose = verbose

    async def run(self, task: str) -> AgentResult:
        """执行任务"""
        steps = []
        scratchpad = ""

        system_prompt = self._build_system_prompt()

        for step_num in range(1, self.max_steps + 1):
            # 构建提示
            user_prompt = self._build_user_prompt(task, scratchpad)

            # 调用 LLM
            response = await self.llm.chat([
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ])

            # 解析响应
            thought, action, action_input = self._parse_response(response)

            step = AgentStep(
                step_num=step_num,
                thought=thought,
                action=action,
                action_input=action_input,
                state=AgentState.THINKING
            )

            if self.verbose:
                print(f"\n--- Step {step_num} ---")
                print(f"Thought: {thought}")

            # 检查是否完成
            if action == "Final Answer":
                step.state = AgentState.FINISHED
                steps.append(step)

                return AgentResult(
                    success=True,
                    output=action_input.get("answer", ""),
                    steps=steps
                )

            # 执行动作
            if action:
                step.state = AgentState.ACTING

                if self.verbose:
                    print(f"Action: {action}")
                    print(f"Action Input: {action_input}")

                tool = self.tools.get(action)
                if tool:
                    try:
                        observation = await tool.execute(**action_input)
                        step.observation = observation
                        step.state = AgentState.OBSERVING

                        if self.verbose:
                            print(f"Observation: {observation[:500]}...")
                    except Exception as e:
                        observation = f"Error executing tool: {str(e)}"
                        step.observation = observation
                        step.state = AgentState.ERROR
                else:
                    observation = f"Unknown tool: {action}"
                    step.observation = observation
                    step.state = AgentState.ERROR

                # 更新 scratchpad
                scratchpad += f"\nThought: {thought}\nAction: {action}\nAction Input: {json.dumps(action_input)}\nObservation: {observation}\n"

            steps.append(step)

        # 达到最大步数
        return AgentResult(
            success=False,
            output="Max steps reached without finding an answer",
            steps=steps,
            error="Max steps exceeded"
        )

    def _build_system_prompt(self) -> str:
        """构建系统提示"""
        tool_descriptions = []
        for tool in self.tools.list_tools():
            params = ", ".join([f"{p.name}: {p.type}" for p in tool.parameters])
            tool_descriptions.append(f"- {tool.name}({params}): {tool.description}")

        tools_text = "\n".join(tool_descriptions)

        return f"""You are a helpful AI assistant that can use tools to accomplish tasks.

Available Tools:
{tools_text}

You should follow the ReAct format:
Thought: [Your reasoning about what to do next]
Action: [The tool to use, or "Final Answer" if you have the answer]
Action Input: {{"param1": "value1", "param2": "value2"}}

When you have gathered enough information, use:
Action: Final Answer
Action Input: {{"answer": "Your final answer here"}}

Important:
- Always think step by step
- Use tools when needed to gather information
- Be concise in your thoughts
- Provide a final answer when you have sufficient information"""

    def _build_user_prompt(self, task: str, scratchpad: str) -> str:
        """构建用户提示"""
        prompt = f"Task: {task}\n"

        if scratchpad:
            prompt += f"\nPrevious steps:\n{scratchpad}\n"

        prompt += "\nWhat should you do next?"

        return prompt

    def _parse_response(self, response: str) -> Tuple[str, Optional[str], Optional[Dict]]:
        """解析 LLM 响应"""
        thought = ""
        action = None
        action_input = None

        # 提取 Thought
        thought_match = re.search(r'Thought:\s*(.+?)(?=Action:|$)', response, re.DOTALL)
        if thought_match:
            thought = thought_match.group(1).strip()

        # 提取 Action
        action_match = re.search(r'Action:\s*(.+?)(?=Action Input:|$)', response, re.DOTALL)
        if action_match:
            action = action_match.group(1).strip()

        # 提取 Action Input
        input_match = re.search(r'Action Input:\s*(\{.+?\})', response, re.DOTALL)
        if input_match:
            try:
                action_input = json.loads(input_match.group(1))
            except json.JSONDecodeError:
                action_input = {"raw": input_match.group(1)}

        return thought, action, action_input


class PlanAndExecuteAgent:
    """Plan-and-Execute Agent"""

    def __init__(
        self,
        llm,
        tools: ToolRegistry,
        max_replans: int = 3
    ):
        self.llm = llm
        self.tools = tools
        self.max_replans = max_replans
        self.executor = ReActAgent(llm, tools, max_steps=5)

    async def run(self, task: str) -> AgentResult:
        """执行任务"""
        all_steps = []

        # 1. 生成计划
        plan = await self._create_plan(task)
        print(f"Generated Plan:\n{self._format_plan(plan)}")

        # 2. 执行计划
        completed_steps = []
        results = []

        for i, step in enumerate(plan):
            print(f"\n=== Executing Step {i+1}: {step} ===")

            # 执行单步
            step_result = await self.executor.run(
                f"Complete this step: {step}\n\nContext from previous steps:\n{self._format_results(results)}"
            )

            all_steps.extend(step_result.steps)

            if step_result.success:
                completed_steps.append(step)
                results.append({
                    "step": step,
                    "result": step_result.output
                })
            else:
                # 尝试重新规划
                print(f"Step failed, attempting to replan...")
                new_plan = await self._replan(
                    task,
                    completed_steps,
                    step,
                    step_result.error
                )

                if new_plan:
                    plan = new_plan
                    continue
                else:
                    return AgentResult(
                        success=False,
                        output=f"Failed at step: {step}",
                        steps=all_steps,
                        error=step_result.error
                    )

        # 3. 综合结果
        final_answer = await self._synthesize(task, results)

        return AgentResult(
            success=True,
            output=final_answer,
            steps=all_steps
        )

    async def _create_plan(self, task: str) -> List[str]:
        """创建执行计划"""
        tool_names = [t.name for t in self.tools.list_tools()]

        prompt = f"""Create a step-by-step plan to accomplish the following task.
Each step should be a clear, actionable item.

Available tools: {', '.join(tool_names)}

Task: {task}

Output the plan as a numbered list:
1. [First step]
2. [Second step]
...

Keep the plan concise (3-7 steps)."""

        response = await self.llm.chat([{"role": "user", "content": prompt}])

        # 解析计划
        steps = []
        for line in response.strip().split('\n'):
            line = line.strip()
            if line and line[0].isdigit():
                # 移除数字前缀
                step = re.sub(r'^\d+[\.\)]\s*', '', line)
                if step:
                    steps.append(step)

        return steps

    async def _replan(
        self,
        task: str,
        completed_steps: List[str],
        failed_step: str,
        error: str
    ) -> Optional[List[str]]:
        """重新规划"""
        prompt = f"""The original plan failed. Create a new plan to complete the task.

Task: {task}

Completed steps:
{chr(10).join(f'- {s}' for s in completed_steps)}

Failed step: {failed_step}
Error: {error}

Create a revised plan to complete the remaining work:"""

        response = await self.llm.chat([{"role": "user", "content": prompt}])

        # 解析新计划
        steps = []
        for line in response.strip().split('\n'):
            line = line.strip()
            if line and line[0].isdigit():
                step = re.sub(r'^\d+[\.\)]\s*', '', line)
                if step:
                    steps.append(step)

        return steps if steps else None

    async def _synthesize(self, task: str, results: List[Dict]) -> str:
        """综合结果"""
        results_text = self._format_results(results)

        prompt = f"""Based on the following completed steps and their results, provide a final answer to the task.

Task: {task}

Completed Steps and Results:
{results_text}

Final Answer:"""

        response = await self.llm.chat([{"role": "user", "content": prompt}])
        return response.strip()

    def _format_plan(self, plan: List[str]) -> str:
        return "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))

    def _format_results(self, results: List[Dict]) -> str:
        if not results:
            return "No previous results."

        text = []
        for i, r in enumerate(results):
            text.append(f"Step {i+1}: {r['step']}")
            text.append(f"Result: {r['result']}\n")
        return "\n".join(text)

记忆系统

"""
Agent 记忆系统
"""
from typing import List, Dict, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
from abc import ABC, abstractmethod
import json
import numpy as np

@dataclass
class Memory:
    """记忆项"""
    id: str
    content: str
    type: str  # "conversation", "fact", "task", "reflection"
    timestamp: datetime
    metadata: Dict = field(default_factory=dict)
    embedding: Optional[List[float]] = None
    importance: float = 0.5

@dataclass
class ConversationTurn:
    """对话轮次"""
    role: str
    content: str
    timestamp: datetime
    metadata: Dict = field(default_factory=dict)


class MemoryStore(ABC):
    """记忆存储基类"""

    @abstractmethod
    def add(self, memory: Memory) -> str:
        pass

    @abstractmethod
    def get(self, memory_id: str) -> Optional[Memory]:
        pass

    @abstractmethod
    def search(self, query: str, top_k: int = 10) -> List[Memory]:
        pass

    @abstractmethod
    def delete(self, memory_id: str) -> bool:
        pass


class ShortTermMemory:
    """短期记忆 - 对话上下文"""

    def __init__(self, max_turns: int = 20, max_tokens: int = 4000):
        self.max_turns = max_turns
        self.max_tokens = max_tokens
        self.conversation: List[ConversationTurn] = []

    def add_turn(self, role: str, content: str, metadata: Dict = None):
        """添加对话轮次"""
        turn = ConversationTurn(
            role=role,
            content=content,
            timestamp=datetime.now(),
            metadata=metadata or {}
        )
        self.conversation.append(turn)

        # 保持在限制内
        self._trim()

    def get_context(self, last_n: int = None) -> List[Dict]:
        """获取对话上下文"""
        turns = self.conversation[-last_n:] if last_n else self.conversation
        return [
            {"role": t.role, "content": t.content}
            for t in turns
        ]

    def get_summary(self, llm=None) -> str:
        """获取对话摘要"""
        if not self.conversation:
            return ""

        if llm:
            # 使用 LLM 生成摘要
            context = self.get_context()
            prompt = f"""Summarize the following conversation in 2-3 sentences:

{json.dumps(context, indent=2)}

Summary:"""
            return llm.complete(prompt)
        else:
            # 简单摘要
            return f"Conversation with {len(self.conversation)} turns."

    def clear(self):
        """清空记忆"""
        self.conversation = []

    def _trim(self):
        """裁剪记忆"""
        # 按轮次裁剪
        if len(self.conversation) > self.max_turns:
            self.conversation = self.conversation[-self.max_turns:]

        # 按 token 裁剪(简化估算)
        total_chars = sum(len(t.content) for t in self.conversation)
        while total_chars > self.max_tokens * 4 and len(self.conversation) > 1:
            self.conversation.pop(0)
            total_chars = sum(len(t.content) for t in self.conversation)


class LongTermMemory:
    """长期记忆 - 向量存储"""

    def __init__(
        self,
        vector_store,
        embedding_model,
        importance_threshold: float = 0.3
    ):
        self.vector_store = vector_store
        self.embedding_model = embedding_model
        self.importance_threshold = importance_threshold

    def store(self, content: str, type: str, metadata: Dict = None) -> str:
        """存储记忆"""
        # 计算重要性
        importance = self._compute_importance(content)

        if importance < self.importance_threshold:
            return None  # 不够重要,不存储

        # 计算嵌入
        embedding = self.embedding_model.encode(content)

        memory = Memory(
            id=self._generate_id(),
            content=content,
            type=type,
            timestamp=datetime.now(),
            metadata=metadata or {},
            embedding=embedding.tolist(),
            importance=importance
        )

        # 存储到向量库
        self.vector_store.add(
            ids=[memory.id],
            embeddings=[memory.embedding],
            contents=[memory.content],
            metadatas=[{
                "type": memory.type,
                "timestamp": memory.timestamp.isoformat(),
                "importance": memory.importance,
                **memory.metadata
            }]
        )

        return memory.id

    def recall(
        self,
        query: str,
        top_k: int = 5,
        type_filter: str = None,
        recency_weight: float = 0.2
    ) -> List[Memory]:
        """回忆相关记忆"""
        # 向量检索
        query_embedding = self.embedding_model.encode(query)

        filters = {}
        if type_filter:
            filters["type"] = type_filter

        results = self.vector_store.search(
            query_embedding.tolist(),
            top_k=top_k * 2,  # 检索更多,后续过滤
            filters=filters
        )

        # 结合时间衰减重新排序
        scored_results = []
        now = datetime.now()

        for result in results:
            timestamp = datetime.fromisoformat(result.metadata.get("timestamp", now.isoformat()))
            hours_ago = (now - timestamp).total_seconds() / 3600

            # 时间衰减
            recency_score = np.exp(-hours_ago / 168)  # 一周半衰期

            # 综合得分
            final_score = (1 - recency_weight) * result.score + recency_weight * recency_score

            scored_results.append((result, final_score))

        # 排序
        scored_results.sort(key=lambda x: x[1], reverse=True)

        # 转换为 Memory 对象
        memories = []
        for result, score in scored_results[:top_k]:
            memories.append(Memory(
                id=result.chunk_id,
                content=result.content,
                type=result.metadata.get("type", "unknown"),
                timestamp=datetime.fromisoformat(result.metadata.get("timestamp")),
                metadata=result.metadata,
                importance=result.metadata.get("importance", 0.5)
            ))

        return memories

    def _compute_importance(self, content: str) -> float:
        """计算记忆重要性"""
        # 简单启发式规则
        importance = 0.5

        # 长度因素
        if len(content) > 200:
            importance += 0.1
        if len(content) > 500:
            importance += 0.1

        # 关键词因素
        important_keywords = ["important", "remember", "key", "critical", "决定", "重要", "记住"]
        for kw in important_keywords:
            if kw.lower() in content.lower():
                importance += 0.15
                break

        # 问题/任务因素
        if "?" in content or "任务" in content or "目标" in content:
            importance += 0.1

        return min(importance, 1.0)

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())


class EpisodicMemory:
    """情景记忆 - 任务执行历史"""

    def __init__(self, max_episodes: int = 100):
        self.episodes: List[Dict] = []
        self.max_episodes = max_episodes

    def record_episode(
        self,
        task: str,
        steps: List[AgentStep],
        result: str,
        success: bool
    ):
        """记录任务执行"""
        episode = {
            "id": self._generate_id(),
            "task": task,
            "steps": [
                {
                    "thought": s.thought,
                    "action": s.action,
                    "observation": s.observation[:200] if s.observation else None
                }
                for s in steps
            ],
            "result": result,
            "success": success,
            "timestamp": datetime.now().isoformat(),
            "num_steps": len(steps)
        }

        self.episodes.append(episode)

        # 限制数量
        if len(self.episodes) > self.max_episodes:
            self.episodes = self.episodes[-self.max_episodes:]

    def find_similar_tasks(self, task: str, top_k: int = 3) -> List[Dict]:
        """查找相似任务"""
        # 简单关键词匹配
        task_words = set(task.lower().split())

        scored = []
        for ep in self.episodes:
            ep_words = set(ep["task"].lower().split())
            overlap = len(task_words & ep_words) / max(len(task_words | ep_words), 1)
            scored.append((ep, overlap))

        scored.sort(key=lambda x: x[1], reverse=True)

        return [ep for ep, _ in scored[:top_k]]

    def get_success_rate(self, task_type: str = None) -> float:
        """获取成功率"""
        if not self.episodes:
            return 0.0

        relevant = self.episodes
        if task_type:
            relevant = [ep for ep in self.episodes if task_type.lower() in ep["task"].lower()]

        if not relevant:
            return 0.0

        return sum(1 for ep in relevant if ep["success"]) / len(relevant)

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())


class AgentMemory:
    """Agent 综合记忆系统"""

    def __init__(
        self,
        vector_store,
        embedding_model,
        llm=None
    ):
        self.short_term = ShortTermMemory()
        self.long_term = LongTermMemory(vector_store, embedding_model)
        self.episodic = EpisodicMemory()
        self.llm = llm

    def add_interaction(self, role: str, content: str):
        """添加交互"""
        self.short_term.add_turn(role, content)

        # 重要内容存入长期记忆
        if role == "assistant" and len(content) > 100:
            self.long_term.store(content, type="response")

    def get_context(self, query: str = None) -> Dict:
        """获取相关上下文"""
        context = {
            "conversation": self.short_term.get_context(),
            "relevant_memories": [],
            "similar_tasks": []
        }

        if query:
            # 从长期记忆中检索
            memories = self.long_term.recall(query, top_k=3)
            context["relevant_memories"] = [
                {"content": m.content, "type": m.type}
                for m in memories
            ]

            # 从情景记忆中查找
            similar = self.episodic.find_similar_tasks(query, top_k=2)
            context["similar_tasks"] = similar

        return context

    def reflect(self):
        """反思并提取洞察"""
        if not self.llm:
            return

        # 获取最近对话
        recent_context = self.short_term.get_context(last_n=10)

        if len(recent_context) < 4:
            return

        prompt = f"""Based on the following recent conversation, extract key insights or facts that should be remembered:

{json.dumps(recent_context, indent=2)}

List 1-3 key insights (each on a new line):"""

        response = self.llm.complete(prompt)

        # 存储洞察
        for line in response.strip().split('\n'):
            line = line.strip()
            if line and len(line) > 10:
                self.long_term.store(line, type="reflection")

小结

本章深入讲解了 LLM Agent 的核心架构:

  1. 工具系统:工具定义、注册表、Schema 转换
  2. ReAct Agent:思考-行动-观察循环
  3. Plan-and-Execute:先规划后执行的模式
  4. 记忆系统:短期、长期、情景记忆

下一章我们将探讨 多 Agent 协作,讲解如何构建多 Agent 协同工作的系统。