HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于

Agent应用开发实战:从Demo到生产环境的鸿沟

一、从Demo到生产的挑战

1.1 项目背景

我们开发的企业级AI Agent系统需要:

  • 自动化任务:数据查询、报表生成、工单处理、代码审查
  • 日调用量:50000+ 次Agent执行
  • 成功率要求:> 95%
  • 响应时间:P99 < 30s
  • 成本控制:单次执行成本 < $0.05

1.2 Demo vs 生产环境

维度Demo原型生产环境鸿沟
错误处理无/简单try-catch分层重试、降级、回滚⭐⭐⭐⭐⭐
超时控制无限制多级超时、断路器⭐⭐⭐⭐
成本控制不考虑Token计数、模型分级⭐⭐⭐⭐⭐
可观测性print调试结构化日志、链路追踪⭐⭐⭐⭐
并发控制单线程限流、熔断、队列⭐⭐⭐⭐
工具管理硬编码动态注册、权限控制⭐⭐⭐

1.3 Agent架构演进

二、ReAct模式实现

2.1 ReAct原理

Reasoning + Acting:推理与行动交替进行

Thought: 我需要查询用户的订单信息
Action: query_database
Action Input: {"user_id": "12345", "table": "orders"}
Observation: 查询成功,用户有3个订单

Thought: 我需要计算订单总金额
Action: calculate_sum
Action Input: {"values": [99.9, 199.0, 299.5]}
Observation: 总金额为598.4元

Thought: 我已经得到了答案
Final Answer: 用户12345共有3个订单,总金额598.4元

2.2 生产级ReAct实现

from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import time
import logging

logger = logging.getLogger(__name__)

class AgentStep(Enum):
    """Agent执行步骤"""
    THOUGHT = "thought"
    ACTION = "action"
    OBSERVATION = "observation"
    FINAL_ANSWER = "final_answer"
    ERROR = "error"

@dataclass
class AgentAction:
    """Agent动作"""
    tool_name: str
    tool_input: Dict[str, Any]
    log: str  # 推理日志

@dataclass
class AgentObservation:
    """观察结果"""
    output: Any
    error: Optional[str] = None
    execution_time: float = 0.0

@dataclass
class AgentFinish:
    """最终结果"""
    output: str
    log: str

class Tool:
    """工具基类"""
    def __init__(
        self,
        name: str,
        description: str,
        func: Callable,
        timeout: int = 30,
        retry: int = 3
    ):
        self.name = name
        self.description = description
        self.func = func
        self.timeout = timeout
        self.retry = retry

    def run(self, **kwargs) -> Any:
        """执行工具"""
        return self.func(**kwargs)

class ReActAgent:
    """生产级ReAct Agent"""

    def __init__(
        self,
        llm,
        tools: List[Tool],
        max_iterations: int = 10,
        max_execution_time: int = 300,
        enable_tracing: bool = True
    ):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
        self.max_iterations = max_iterations
        self.max_execution_time = max_execution_time
        self.enable_tracing = enable_tracing

        # 监控指标
        self.total_tokens = 0
        self.total_cost = 0.0
        self.iteration_count = 0

    def run(self, query: str) -> Dict[str, Any]:
        """执行Agent"""
        start_time = time.time()
        trace = []  # 执行追踪

        try:
            # 构建初始提示词
            prompt = self._build_initial_prompt(query)
            intermediate_steps = []

            for i in range(self.max_iterations):
                self.iteration_count = i + 1

                # 检查总体超时
                if time.time() - start_time > self.max_execution_time:
                    raise TimeoutError(
                        f"Agent执行超时: {self.max_execution_time}s"
                    )

                # LLM推理
                thought_action = self._get_next_action(
                    prompt,
                    intermediate_steps
                )

                # 追踪
                if self.enable_tracing:
                    trace.append({
                        "iteration": i + 1,
                        "thought": thought_action.get("thought", ""),
                        "action": thought_action.get("action", ""),
                        "timestamp": time.time()
                    })

                # 判断是否结束
                if "final_answer" in thought_action:
                    return self._build_result(
                        output=thought_action["final_answer"],
                        trace=trace,
                        execution_time=time.time() - start_time,
                        success=True
                    )

                # 执行动作
                action = AgentAction(
                    tool_name=thought_action["action"],
                    tool_input=thought_action["action_input"],
                    log=thought_action.get("thought", "")
                )

                observation = self._execute_action(action)

                # 记录中间步骤
                intermediate_steps.append((action, observation))

                # 追踪观察结果
                if self.enable_tracing:
                    trace[-1]["observation"] = str(observation.output)
                    trace[-1]["execution_time"] = observation.execution_time

                # 如果执行出错且无法恢复
                if observation.error and not self._is_recoverable(observation):
                    raise RuntimeError(f"工具执行失败: {observation.error}")

            # 达到最大迭代次数
            raise RuntimeError(
                f"达到最大迭代次数({self.max_iterations}),未能完成任务"
            )

        except Exception as e:
            logger.error(f"Agent执行失败: {str(e)}", exc_info=True)
            return self._build_result(
                output=f"执行失败: {str(e)}",
                trace=trace,
                execution_time=time.time() - start_time,
                success=False,
                error=str(e)
            )

    def _get_next_action(
        self,
        initial_prompt: str,
        intermediate_steps: List
    ) -> Dict[str, Any]:
        """获取下一步动作"""

        # 构建包含历史的提示词
        prompt = self._build_prompt_with_history(
            initial_prompt,
            intermediate_steps
        )

        # 调用LLM
        response = self.llm.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.1,
            max_tokens=1000
        )

        # 统计token
        self.total_tokens += response.usage.total_tokens
        self.total_cost += self._calculate_cost(response.usage)

        # 解析输出
        return self._parse_llm_output(response.choices[0].message.content)

    def _execute_action(self, action: AgentAction) -> AgentObservation:
        """执行工具调用(带重试和超时)"""

        tool = self.tools.get(action.tool_name)
        if not tool:
            return AgentObservation(
                output=None,
                error=f"工具不存在: {action.tool_name}"
            )

        # 执行工具(带超时和重试)
        for attempt in range(tool.retry):
            try:
                start_time = time.time()

                # 带超时执行
                import signal

                def timeout_handler(signum, frame):
                    raise TimeoutError("工具执行超时")

                signal.signal(signal.SIGALRM, timeout_handler)
                signal.alarm(tool.timeout)

                try:
                    result = tool.run(**action.tool_input)
                finally:
                    signal.alarm(0)  # 取消超时

                execution_time = time.time() - start_time

                return AgentObservation(
                    output=result,
                    execution_time=execution_time
                )

            except TimeoutError as e:
                logger.warning(
                    f"工具{tool.name}执行超时 (尝试{attempt+1}/{tool.retry})"
                )
                if attempt == tool.retry - 1:
                    return AgentObservation(
                        output=None,
                        error=f"执行超时({tool.timeout}s)"
                    )

            except Exception as e:
                logger.warning(
                    f"工具{tool.name}执行失败 (尝试{attempt+1}/{tool.retry}): {str(e)}"
                )
                if attempt == tool.retry - 1:
                    return AgentObservation(
                        output=None,
                        error=str(e)
                    )

                # 指数退避
                time.sleep(2 ** attempt)

    def _build_initial_prompt(self, query: str) -> str:
        """构建初始提示词"""

        tools_desc = "\n".join([
            f"- {name}: {tool.description}"
            for name, tool in self.tools.items()
        ])

        prompt = f"""你是一个智能助手,可以使用以下工具来完成任务:

{tools_desc}

请使用以下格式:

Thought: 你的思考过程
Action: 工具名称
Action Input: {{"参数名": "参数值"}}
Observation: 工具执行结果

... (重复Thought/Action/Observation多次)

Thought: 我已经得到了答案
Final Answer: 最终答案

重要规则:
1. 每次只执行一个Action
2. Action Input必须是有效的JSON格式
3. 如果工具执行失败,考虑使用其他工具或方法
4. 如果无法完成任务,明确说明原因

用户问题:{query}

开始!
"""
        return prompt

    def _build_prompt_with_history(
        self,
        initial_prompt: str,
        intermediate_steps: List
    ) -> str:
        """构建包含历史的提示词"""

        history = []
        for action, observation in intermediate_steps:
            history.append(f"Thought: {action.log}")
            history.append(f"Action: {action.tool_name}")
            history.append(f"Action Input: {action.tool_input}")
            history.append(f"Observation: {observation.output}")

        if history:
            return initial_prompt + "\n\n" + "\n".join(history) + "\n\n"
        return initial_prompt

    def _parse_llm_output(self, output: str) -> Dict[str, Any]:
        """解析LLM输出"""
        import re
        import json

        result = {}

        # 提取Thought
        thought_match = re.search(r'Thought:\s*(.+?)(?=\n|$)', output, re.DOTALL)
        if thought_match:
            result["thought"] = thought_match.group(1).strip()

        # 检查是否是最终答案
        final_match = re.search(r'Final Answer:\s*(.+?)$', output, re.DOTALL)
        if final_match:
            result["final_answer"] = final_match.group(1).strip()
            return result

        # 提取Action
        action_match = re.search(r'Action:\s*(\w+)', output)
        if action_match:
            result["action"] = action_match.group(1).strip()

        # 提取Action Input
        input_match = re.search(r'Action Input:\s*({.+?})', output, re.DOTALL)
        if input_match:
            try:
                result["action_input"] = json.loads(input_match.group(1))
            except json.JSONDecodeError:
                result["action_input"] = {}

        return result

    def _is_recoverable(self, observation: AgentObservation) -> bool:
        """判断错误是否可恢复"""
        if not observation.error:
            return True

        # 可恢复的错误类型
        recoverable_errors = [
            "timeout",
            "rate limit",
            "temporary",
            "retry"
        ]

        return any(
            err.lower() in observation.error.lower()
            for err in recoverable_errors
        )

    def _calculate_cost(self, usage) -> float:
        """计算成本"""
        # GPT-4定价(示例)
        input_cost = usage.prompt_tokens * 0.00003  # $0.03/1K tokens
        output_cost = usage.completion_tokens * 0.00006  # $0.06/1K tokens
        return input_cost + output_cost

    def _build_result(
        self,
        output: str,
        trace: List[Dict],
        execution_time: float,
        success: bool,
        error: Optional[str] = None
    ) -> Dict[str, Any]:
        """构建结果"""
        return {
            "output": output,
            "success": success,
            "error": error,
            "metadata": {
                "execution_time": execution_time,
                "iterations": self.iteration_count,
                "total_tokens": self.total_tokens,
                "total_cost": self.total_cost,
                "trace": trace if self.enable_tracing else []
            }
        }

2.3 工具定义示例

# 定义具体工具
import requests
import sqlite3

def search_web(query: str, max_results: int = 5) -> str:
    """搜索网页"""
    try:
        # 调用搜索API(示例)
        response = requests.get(
            "https://api.search.com/search",
            params={"q": query, "limit": max_results},
            timeout=10
        )
        response.raise_for_status()
        results = response.json()["results"]

        return "\n".join([
            f"{i+1}. {r['title']}: {r['snippet']}"
            for i, r in enumerate(results)
        ])
    except Exception as e:
        raise RuntimeError(f"搜索失败: {str(e)}")

def query_database(sql: str, database: str = "production") -> str:
    """查询数据库"""
    try:
        conn = sqlite3.connect(f"{database}.db")
        cursor = conn.cursor()

        # SQL注入防护
        if any(word in sql.upper() for word in ["DROP", "DELETE", "UPDATE", "INSERT"]):
            raise ValueError("只允许SELECT查询")

        cursor.execute(sql)
        results = cursor.fetchall()
        conn.close()

        if not results:
            return "查询结果为空"

        # 格式化结果
        return "\n".join([str(row) for row in results[:100]])  # 限制100行

    except Exception as e:
        raise RuntimeError(f"数据库查询失败: {str(e)}")

def calculate(expression: str) -> float:
    """计算数学表达式"""
    try:
        # 安全的eval(只允许数学运算)
        allowed_chars = set("0123456789+-*/() .")
        if not all(c in allowed_chars for c in expression):
            raise ValueError("表达式包含非法字符")

        result = eval(expression, {"__builtins__": {}}, {})
        return round(result, 2)

    except Exception as e:
        raise RuntimeError(f"计算失败: {str(e)}")

# 注册工具
tools = [
    Tool(
        name="search_web",
        description="搜索网页获取最新信息。输入:查询字符串",
        func=search_web,
        timeout=15,
        retry=3
    ),
    Tool(
        name="query_database",
        description="查询数据库。输入:SQL语句(仅支持SELECT)",
        func=query_database,
        timeout=30,
        retry=2
    ),
    Tool(
        name="calculate",
        description="计算数学表达式。输入:数学表达式字符串",
        func=calculate,
        timeout=5,
        retry=1
    )
]

# 创建Agent
agent = ReActAgent(
    llm=openai,
    tools=tools,
    max_iterations=10,
    max_execution_time=300,
    enable_tracing=True
)

# 执行
result = agent.run("查询上个月销售额最高的前3个产品,并计算总销售额")
print(result)

三、Function Calling模式

3.1 OpenAI Function Calling

from typing import List, Dict, Any
import json
import openai

class FunctionCallingAgent:
    """基于OpenAI Function Calling的Agent"""

    def __init__(
        self,
        tools: List[Dict[str, Any]],
        model: str = "gpt-4-1106-preview"
    ):
        self.tools = tools
        self.model = model
        self.function_map = {}  # 函数名到实际函数的映射

    def register_function(self, name: str, func: callable):
        """注册函数实现"""
        self.function_map[name] = func

    def run(self, query: str, max_iterations: int = 5) -> Dict[str, Any]:
        """执行Agent"""

        messages = [
            {"role": "user", "content": query}
        ]

        for i in range(max_iterations):
            # 调用LLM
            response = openai.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.tools,
                tool_choice="auto"
            )

            message = response.choices[0].message

            # 如果没有工具调用,返回结果
            if not message.tool_calls:
                return {
                    "output": message.content,
                    "success": True,
                    "iterations": i + 1
                }

            # 添加LLM的响应
            messages.append(message)

            # 执行工具调用
            for tool_call in message.tool_calls:
                function_name = tool_call.function.name
                function_args = json.loads(tool_call.function.arguments)

                # 执行函数
                try:
                    function_response = self.function_map[function_name](
                        **function_args
                    )
                    success = True
                    error = None
                except Exception as e:
                    function_response = None
                    success = False
                    error = str(e)

                # 添加函数结果到消息
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": json.dumps({
                        "success": success,
                        "result": function_response,
                        "error": error
                    }, ensure_ascii=False)
                })

        return {
            "output": "达到最大迭代次数",
            "success": False,
            "iterations": max_iterations
        }

# 定义工具schema
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "获取指定城市的天气信息",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "城市名称,例如:北京、上海"
                    },
                    "unit": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"],
                        "description": "温度单位"
                    }
                },
                "required": ["city"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "search_products",
            "description": "搜索商品信息",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "搜索关键词"
                    },
                    "category": {
                        "type": "string",
                        "description": "商品分类"
                    },
                    "max_results": {
                        "type": "integer",
                        "description": "最大返回结果数",
                        "default": 10
                    }
                },
                "required": ["query"]
            }
        }
    }
]

# 实现函数
def get_weather(city: str, unit: str = "celsius") -> Dict:
    """获取天气(模拟)"""
    # 调用天气API
    return {
        "city": city,
        "temperature": 25,
        "unit": unit,
        "condition": "晴"
    }

def search_products(
    query: str,
    category: str = None,
    max_results: int = 10
) -> List[Dict]:
    """搜索商品(模拟)"""
    # 调用商品搜索API
    return [
        {"name": f"商品{i}", "price": 99.9 * i}
        for i in range(1, max_results + 1)
    ]

# 创建Agent
agent = FunctionCallingAgent(tools=tools)
agent.register_function("get_weather", get_weather)
agent.register_function("search_products", search_products)

# 执行
result = agent.run("北京今天天气怎么样?另外帮我搜索一下电子产品")

四、错误处理与容错

4.1 分层错误处理

from enum import Enum
from typing import Optional
import traceback

class ErrorSeverity(Enum):
    """错误严重程度"""
    LOW = "low"          # 可忽略
    MEDIUM = "medium"    # 需要重试
    HIGH = "high"        # 需要降级
    CRITICAL = "critical"  # 需要终止

class AgentError(Exception):
    """Agent错误基类"""
    def __init__(
        self,
        message: str,
        severity: ErrorSeverity = ErrorSeverity.MEDIUM,
        recoverable: bool = True,
        original_error: Optional[Exception] = None
    ):
        self.message = message
        self.severity = severity
        self.recoverable = recoverable
        self.original_error = original_error
        super().__init__(message)

class ToolExecutionError(AgentError):
    """工具执行错误"""
    pass

class LLMError(AgentError):
    """LLM调用错误"""
    pass

class TimeoutError(AgentError):
    """超时错误"""
    def __init__(self, message: str):
        super().__init__(
            message,
            severity=ErrorSeverity.HIGH,
            recoverable=False
        )

class ErrorHandler:
    """错误处理器"""

    @staticmethod
    def handle_error(
        error: Exception,
        context: Dict[str, Any]
    ) -> Dict[str, Any]:
        """统一错误处理"""

        # 包装为AgentError
        if not isinstance(error, AgentError):
            agent_error = AgentError(
                message=str(error),
                severity=ErrorSeverity.MEDIUM,
                recoverable=True,
                original_error=error
            )
        else:
            agent_error = error

        # 记录错误
        logger.error(
            f"Agent错误: {agent_error.message}",
            extra={
                "severity": agent_error.severity.value,
                "recoverable": agent_error.recoverable,
                "context": context,
                "traceback": traceback.format_exc()
            }
        )

        # 根据严重程度决定处理策略
        if agent_error.severity == ErrorSeverity.CRITICAL:
            return {
                "action": "terminate",
                "message": "遇到严重错误,终止执行"
            }

        elif agent_error.severity == ErrorSeverity.HIGH:
            return {
                "action": "fallback",
                "message": "尝试降级处理"
            }

        elif agent_error.severity == ErrorSeverity.MEDIUM:
            if agent_error.recoverable:
                return {
                    "action": "retry",
                    "message": "尝试重试"
                }
            else:
                return {
                    "action": "skip",
                    "message": "跳过当前步骤"
                }

        else:  # LOW
            return {
                "action": "ignore",
                "message": "忽略错误,继续执行"
            }

# 集成到Agent
class RobustAgent(ReActAgent):
    """容错Agent"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.error_handler = ErrorHandler()
        self.fallback_response = "抱歉,我遇到了一些问题,无法完成您的请求。"

    def run(self, query: str) -> Dict[str, Any]:
        """执行(带容错)"""
        try:
            return super().run(query)

        except AgentError as e:
            # 错误处理
            handling = self.error_handler.handle_error(e, {"query": query})

            if handling["action"] == "fallback":
                # 降级处理:使用简单的单轮对话
                return self._fallback_response(query)

            elif handling["action"] == "retry":
                # 重试(限制次数)
                return self._retry_with_backoff(query, max_retries=2)

            else:
                # 返回友好的错误信息
                return {
                    "output": self.fallback_response,
                    "success": False,
                    "error": e.message,
                    "metadata": {"error_handling": handling}
                }

        except Exception as e:
            # 未知错误
            logger.exception("未知错误")
            return {
                "output": self.fallback_response,
                "success": False,
                "error": str(e)
            }

    def _fallback_response(self, query: str) -> Dict[str, Any]:
        """降级响应:不使用工具,直接LLM回答"""
        try:
            response = self.llm.chat.completions.create(
                model="gpt-3.5-turbo",  # 使用更便宜的模型
                messages=[{"role": "user", "content": query}],
                max_tokens=500
            )

            return {
                "output": response.choices[0].message.content,
                "success": True,
                "metadata": {"mode": "fallback"}
            }
        except:
            return {
                "output": self.fallback_response,
                "success": False
            }

    def _retry_with_backoff(
        self,
        query: str,
        max_retries: int = 2
    ) -> Dict[str, Any]:
        """指数退避重试"""
        import time

        for attempt in range(max_retries):
            try:
                return super().run(query)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                wait_time = 2 ** attempt
                logger.info(f"重试 {attempt + 1}/{max_retries},等待{wait_time}秒")
                time.sleep(wait_time)

五、超时与熔断控制

5.1 多级超时控制

import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator

class TimeoutController:
    """超时控制器"""

    def __init__(
        self,
        tool_timeout: int = 30,      # 单个工具超时
        iteration_timeout: int = 60,  # 单次迭代超时
        total_timeout: int = 300      # 总体超时
    ):
        self.tool_timeout = tool_timeout
        self.iteration_timeout = iteration_timeout
        self.total_timeout = total_timeout

    @asynccontextmanager
    async def timeout_context(
        self,
        timeout: int,
        error_message: str
    ) -> AsyncGenerator:
        """超时上下文管理器"""
        try:
            async with asyncio.timeout(timeout):
                yield
        except asyncio.TimeoutError:
            raise TimeoutError(error_message)

    async def execute_with_timeout(
        self,
        func: callable,
        timeout: int,
        *args,
        **kwargs
    ):
        """带超时执行函数"""
        async with self.timeout_context(
            timeout,
            f"执行超时({timeout}s)"
        ):
            if asyncio.iscoroutinefunction(func):
                return await func(*args, **kwargs)
            else:
                # 同步函数转异步
                loop = asyncio.get_event_loop()
                return await loop.run_in_executor(
                    None,
                    lambda: func(*args, **kwargs)
                )

# 异步Agent实现
class AsyncAgent:
    """异步Agent(支持超时控制)"""

    def __init__(
        self,
        llm,
        tools: List[Tool],
        timeout_controller: TimeoutController
    ):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
        self.timeout_ctrl = timeout_controller

    async def run(self, query: str) -> Dict[str, Any]:
        """异步执行"""
        try:
            # 总体超时控制
            async with self.timeout_ctrl.timeout_context(
                self.timeout_ctrl.total_timeout,
                f"Agent总执行时间超时({self.timeout_ctrl.total_timeout}s)"
            ):
                return await self._execute(query)

        except TimeoutError as e:
            return {
                "output": "执行超时",
                "success": False,
                "error": str(e)
            }

    async def _execute(self, query: str) -> Dict[str, Any]:
        """实际执行逻辑"""
        # ... Agent逻辑 ...

        # 迭代超时控制
        async with self.timeout_ctrl.timeout_context(
            self.timeout_ctrl.iteration_timeout,
            "单次迭代超时"
        ):
            action = await self._get_next_action(query)

        # 工具执行超时控制
        observation = await self.timeout_ctrl.execute_with_timeout(
            self._execute_tool,
            self.timeout_ctrl.tool_timeout,
            action
        )

        return {"output": "结果", "success": True}

5.2 熔断器模式

from datetime import datetime, timedelta
from collections import deque

class CircuitBreaker:
    """熔断器"""

    def __init__(
        self,
        failure_threshold: int = 5,    # 失败阈值
        timeout: int = 60,              # 熔断超时(秒)
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half_open

        # 滑动窗口记录
        self.recent_failures = deque(maxlen=failure_threshold * 2)

    def call(self, func: callable, *args, **kwargs):
        """通过熔断器调用函数"""

        if self.state == "open":
            # 检查是否可以转为half_open
            if self._should_attempt_reset():
                self.state = "half_open"
            else:
                raise Exception("熔断器开启,拒绝请求")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result

        except self.expected_exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        """成功回调"""
        if self.state == "half_open":
            self.state = "closed"
            self.failure_count = 0
            self.recent_failures.clear()

    def _on_failure(self):
        """失败回调"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        self.recent_failures.append(datetime.now())

        # 检查是否需要开启熔断
        if self.failure_count >= self.failure_threshold:
            self.state = "open"
            logger.warning(f"熔断器开启,失败次数: {self.failure_count}")

    def _should_attempt_reset(self) -> bool:
        """是否应该尝试重置"""
        return (
            self.last_failure_time and
            datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout)
        )

# 为工具添加熔断器
class ProtectedTool(Tool):
    """带熔断器的工具"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=5,
            timeout=60
        )

    def run(self, **kwargs):
        """执行(带熔断)"""
        return self.circuit_breaker.call(self.func, **kwargs)

六、成本优化策略

6.1 Token使用优化

class TokenOptimizer:
    """Token优化器"""

    def __init__(
        self,
        max_tokens_per_request: int = 4000,
        max_cost_per_execution: float = 0.05
    ):
        self.max_tokens_per_request = max_tokens_per_request
        self.max_cost_per_execution = max_cost_per_execution
        self.current_cost = 0.0

    def optimize_prompt(
        self,
        prompt: str,
        tokenizer
    ) -> str:
        """优化提示词长度"""
        tokens = tokenizer.encode(prompt)

        if len(tokens) <= self.max_tokens_per_request:
            return prompt

        # 截断策略:保留开头和结尾
        half = self.max_tokens_per_request // 2
        truncated_tokens = tokens[:half] + tokens[-half:]

        return tokenizer.decode(truncated_tokens)

    def should_use_expensive_model(
        self,
        complexity: str
    ) -> bool:
        """是否使用昂贵模型"""
        # 简单任务使用便宜模型
        if complexity == "simple":
            return False

        # 检查成本限制
        if self.current_cost >= self.max_cost_per_execution:
            logger.warning("达到成本上限,使用便宜模型")
            return False

        return True

    def select_model(self, task_complexity: str) -> str:
        """选择模型"""
        if self.should_use_expensive_model(task_complexity):
            return "gpt-4"
        else:
            return "gpt-3.5-turbo"

# 模型分级调用
class TieredLLMCaller:
    """分级LLM调用器"""

    def __init__(self):
        self.models = {
            "simple": {
                "model": "gpt-3.5-turbo",
                "cost_per_1k": 0.002,
                "max_tokens": 4096
            },
            "medium": {
                "model": "gpt-4",
                "cost_per_1k": 0.03,
                "max_tokens": 8192
            },
            "complex": {
                "model": "gpt-4-32k",
                "cost_per_1k": 0.06,
                "max_tokens": 32768
            }
        }

    def call(
        self,
        prompt: str,
        complexity: str = "simple"
    ):
        """分级调用"""
        model_config = self.models.get(complexity, self.models["simple"])

        response = openai.chat.completions.create(
            model=model_config["model"],
            messages=[{"role": "user", "content": prompt}],
            max_tokens=min(1000, model_config["max_tokens"])
        )

        # 计算实际成本
        actual_cost = (
            response.usage.total_tokens / 1000 * model_config["cost_per_1k"]
        )

        return {
            "response": response,
            "cost": actual_cost,
            "model": model_config["model"]
        }

6.2 缓存策略

import hashlib
import redis
import json

class AgentCache:
    """Agent结果缓存"""

    def __init__(
        self,
        redis_client: redis.Redis,
        ttl: int = 3600  # 缓存1小时
    ):
        self.redis = redis_client
        self.ttl = ttl

    def _generate_key(self, query: str, context: Dict = None) -> str:
        """生成缓存key"""
        cache_input = {
            "query": query,
            "context": context or {}
        }
        hash_input = json.dumps(cache_input, sort_keys=True)
        return f"agent:cache:{hashlib.md5(hash_input.encode()).hexdigest()}"

    def get(self, query: str, context: Dict = None) -> Optional[Dict]:
        """获取缓存"""
        key = self._generate_key(query, context)
        cached = self.redis.get(key)

        if cached:
            logger.info(f"命中缓存: {key}")
            return json.loads(cached)

        return None

    def set(
        self,
        query: str,
        result: Dict,
        context: Dict = None
    ):
        """设置缓存"""
        key = self._generate_key(query, context)
        self.redis.setex(
            key,
            self.ttl,
            json.dumps(result, ensure_ascii=False)
        )

# 集成缓存
class CachedAgent:
    """带缓存的Agent"""

    def __init__(
        self,
        agent: ReActAgent,
        cache: AgentCache
    ):
        self.agent = agent
        self.cache = cache

    def run(self, query: str) -> Dict[str, Any]:
        """执行(先查缓存)"""

        # 查缓存
        cached_result = self.cache.get(query)
        if cached_result:
            return {
                **cached_result,
                "from_cache": True
            }

        # 执行Agent
        result = self.agent.run(query)

        # 只缓存成功的结果
        if result["success"]:
            self.cache.set(query, result)

        return {
            **result,
            "from_cache": False
        }

七、可观测性与监控

7.1 结构化日志

import logging
import json
from datetime import datetime

class StructuredLogger:
    """结构化日志"""

    def __init__(self, logger_name: str):
        self.logger = logging.getLogger(logger_name)

    def log_agent_execution(
        self,
        query: str,
        result: Dict,
        trace: List[Dict]
    ):
        """记录Agent执行"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "event": "agent_execution",
            "query": query,
            "success": result["success"],
            "execution_time": result["metadata"]["execution_time"],
            "iterations": result["metadata"]["iterations"],
            "total_tokens": result["metadata"]["total_tokens"],
            "total_cost": result["metadata"]["total_cost"],
            "trace": trace
        }

        self.logger.info(json.dumps(log_entry, ensure_ascii=False))

    def log_tool_execution(
        self,
        tool_name: str,
        inputs: Dict,
        output: Any,
        execution_time: float,
        success: bool,
        error: Optional[str] = None
    ):
        """记录工具执行"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "event": "tool_execution",
            "tool_name": tool_name,
            "inputs": inputs,
            "output": str(output)[:500],  # 限制长度
            "execution_time": execution_time,
            "success": success,
            "error": error
        }

        level = logging.INFO if success else logging.ERROR
        self.logger.log(level, json.dumps(log_entry, ensure_ascii=False))

7.2 Metrics收集

from prometheus_client import Counter, Histogram, Gauge

# Agent执行指标
agent_requests_total = Counter(
    'agent_requests_total',
    'Total agent requests',
    ['status', 'complexity']
)

agent_execution_time = Histogram(
    'agent_execution_time_seconds',
    'Agent execution time',
    ['complexity'],
    buckets=[1, 5, 10, 30, 60, 120, 300]
)

agent_iterations = Histogram(
    'agent_iterations',
    'Number of iterations per execution',
    buckets=[1, 2, 3, 5, 10, 15, 20]
)

agent_cost = Histogram(
    'agent_cost_usd',
    'Agent execution cost in USD',
    buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5]
)

# 工具执行指标
tool_calls_total = Counter(
    'tool_calls_total',
    'Total tool calls',
    ['tool_name', 'status']
)

tool_execution_time = Histogram(
    'tool_execution_time_seconds',
    'Tool execution time',
    ['tool_name'],
    buckets=[0.1, 0.5, 1, 5, 10, 30]
)

# 使用示例
class MonitoredAgent:
    """带监控的Agent"""

    def run(self, query: str, complexity: str = "medium") -> Dict:
        agent_requests_total.labels(
            status='started',
            complexity=complexity
        ).inc()

        start_time = time.time()

        try:
            result = super().run(query)

            # 记录成功指标
            agent_requests_total.labels(
                status='success',
                complexity=complexity
            ).inc()

            execution_time = time.time() - start_time
            agent_execution_time.labels(complexity=complexity).observe(execution_time)
            agent_iterations.observe(result["metadata"]["iterations"])
            agent_cost.observe(result["metadata"]["total_cost"])

            return result

        except Exception as e:
            agent_requests_total.labels(
                status='error',
                complexity=complexity
            ).inc()
            raise

八、生产级最佳实践总结

8.1 架构设计清单

  • [ ] 错误处理:分层错误处理、优雅降级
  • [ ] 超时控制:工具/迭代/总体三级超时
  • [ ] 熔断机制:防止级联失败
  • [ ] 重试策略:指数退避、限制次数
  • [ ] 成本控制:模型分级、Token优化、缓存
  • [ ] 可观测性:结构化日志、Metrics、链路追踪
  • [ ] 限流保护:并发控制、队列管理
  • [ ] 安全防护:输入验证、权限控制、SQL注入防护

8.2 关键指标对比

指标Demo生产环境提升
成功率60%96%+60%
平均成本$0.15$0.03-80%
P99延迟180s28s-84%
可用性N/A99.5%-
错误恢复0%85%-

8.3 技术选型建议

场景推荐方案理由
简单任务Function Calling性能好、成本低
复杂推理ReAct灵活性高
多步骤工作流LangGraph状态管理强
高并发异步Agent + 队列吞吐量高
低成本缓存 + 模型分级成本降低80%+

九、参考资源

  • LangChain官方文档
  • OpenAI Function Calling
  • ReAct论文
  • LangGraph