Agent应用开发实战:从Demo到生产环境的鸿沟
一、从Demo到生产的挑战
1.1 项目背景
我们开发的企业级AI Agent系统需要:
- 自动化任务:数据查询、报表生成、工单处理、代码审查
- 日调用量:50000+ 次Agent执行
- 成功率要求:> 95%
- 响应时间:P99 < 30s
- 成本控制:单次执行成本 < $0.05
1.2 Demo vs 生产环境
| 维度 | Demo原型 | 生产环境 | 鸿沟 |
|---|---|---|---|
| 错误处理 | 无/简单try-catch | 分层重试、降级、回滚 | ⭐⭐⭐⭐⭐ |
| 超时控制 | 无限制 | 多级超时、断路器 | ⭐⭐⭐⭐ |
| 成本控制 | 不考虑 | Token计数、模型分级 | ⭐⭐⭐⭐⭐ |
| 可观测性 | print调试 | 结构化日志、链路追踪 | ⭐⭐⭐⭐ |
| 并发控制 | 单线程 | 限流、熔断、队列 | ⭐⭐⭐⭐ |
| 工具管理 | 硬编码 | 动态注册、权限控制 | ⭐⭐⭐ |
1.3 Agent架构演进
二、ReAct模式实现
2.1 ReAct原理
Reasoning + Acting:推理与行动交替进行
Thought: 我需要查询用户的订单信息
Action: query_database
Action Input: {"user_id": "12345", "table": "orders"}
Observation: 查询成功,用户有3个订单
Thought: 我需要计算订单总金额
Action: calculate_sum
Action Input: {"values": [99.9, 199.0, 299.5]}
Observation: 总金额为598.4元
Thought: 我已经得到了答案
Final Answer: 用户12345共有3个订单,总金额598.4元
2.2 生产级ReAct实现
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import time
import logging
logger = logging.getLogger(__name__)
class AgentStep(Enum):
"""Agent执行步骤"""
THOUGHT = "thought"
ACTION = "action"
OBSERVATION = "observation"
FINAL_ANSWER = "final_answer"
ERROR = "error"
@dataclass
class AgentAction:
"""Agent动作"""
tool_name: str
tool_input: Dict[str, Any]
log: str # 推理日志
@dataclass
class AgentObservation:
"""观察结果"""
output: Any
error: Optional[str] = None
execution_time: float = 0.0
@dataclass
class AgentFinish:
"""最终结果"""
output: str
log: str
class Tool:
"""工具基类"""
def __init__(
self,
name: str,
description: str,
func: Callable,
timeout: int = 30,
retry: int = 3
):
self.name = name
self.description = description
self.func = func
self.timeout = timeout
self.retry = retry
def run(self, **kwargs) -> Any:
"""执行工具"""
return self.func(**kwargs)
class ReActAgent:
"""生产级ReAct Agent"""
def __init__(
self,
llm,
tools: List[Tool],
max_iterations: int = 10,
max_execution_time: int = 300,
enable_tracing: bool = True
):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.max_iterations = max_iterations
self.max_execution_time = max_execution_time
self.enable_tracing = enable_tracing
# 监控指标
self.total_tokens = 0
self.total_cost = 0.0
self.iteration_count = 0
def run(self, query: str) -> Dict[str, Any]:
"""执行Agent"""
start_time = time.time()
trace = [] # 执行追踪
try:
# 构建初始提示词
prompt = self._build_initial_prompt(query)
intermediate_steps = []
for i in range(self.max_iterations):
self.iteration_count = i + 1
# 检查总体超时
if time.time() - start_time > self.max_execution_time:
raise TimeoutError(
f"Agent执行超时: {self.max_execution_time}s"
)
# LLM推理
thought_action = self._get_next_action(
prompt,
intermediate_steps
)
# 追踪
if self.enable_tracing:
trace.append({
"iteration": i + 1,
"thought": thought_action.get("thought", ""),
"action": thought_action.get("action", ""),
"timestamp": time.time()
})
# 判断是否结束
if "final_answer" in thought_action:
return self._build_result(
output=thought_action["final_answer"],
trace=trace,
execution_time=time.time() - start_time,
success=True
)
# 执行动作
action = AgentAction(
tool_name=thought_action["action"],
tool_input=thought_action["action_input"],
log=thought_action.get("thought", "")
)
observation = self._execute_action(action)
# 记录中间步骤
intermediate_steps.append((action, observation))
# 追踪观察结果
if self.enable_tracing:
trace[-1]["observation"] = str(observation.output)
trace[-1]["execution_time"] = observation.execution_time
# 如果执行出错且无法恢复
if observation.error and not self._is_recoverable(observation):
raise RuntimeError(f"工具执行失败: {observation.error}")
# 达到最大迭代次数
raise RuntimeError(
f"达到最大迭代次数({self.max_iterations}),未能完成任务"
)
except Exception as e:
logger.error(f"Agent执行失败: {str(e)}", exc_info=True)
return self._build_result(
output=f"执行失败: {str(e)}",
trace=trace,
execution_time=time.time() - start_time,
success=False,
error=str(e)
)
def _get_next_action(
self,
initial_prompt: str,
intermediate_steps: List
) -> Dict[str, Any]:
"""获取下一步动作"""
# 构建包含历史的提示词
prompt = self._build_prompt_with_history(
initial_prompt,
intermediate_steps
)
# 调用LLM
response = self.llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=1000
)
# 统计token
self.total_tokens += response.usage.total_tokens
self.total_cost += self._calculate_cost(response.usage)
# 解析输出
return self._parse_llm_output(response.choices[0].message.content)
def _execute_action(self, action: AgentAction) -> AgentObservation:
"""执行工具调用(带重试和超时)"""
tool = self.tools.get(action.tool_name)
if not tool:
return AgentObservation(
output=None,
error=f"工具不存在: {action.tool_name}"
)
# 执行工具(带超时和重试)
for attempt in range(tool.retry):
try:
start_time = time.time()
# 带超时执行
import signal
def timeout_handler(signum, frame):
raise TimeoutError("工具执行超时")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(tool.timeout)
try:
result = tool.run(**action.tool_input)
finally:
signal.alarm(0) # 取消超时
execution_time = time.time() - start_time
return AgentObservation(
output=result,
execution_time=execution_time
)
except TimeoutError as e:
logger.warning(
f"工具{tool.name}执行超时 (尝试{attempt+1}/{tool.retry})"
)
if attempt == tool.retry - 1:
return AgentObservation(
output=None,
error=f"执行超时({tool.timeout}s)"
)
except Exception as e:
logger.warning(
f"工具{tool.name}执行失败 (尝试{attempt+1}/{tool.retry}): {str(e)}"
)
if attempt == tool.retry - 1:
return AgentObservation(
output=None,
error=str(e)
)
# 指数退避
time.sleep(2 ** attempt)
def _build_initial_prompt(self, query: str) -> str:
"""构建初始提示词"""
tools_desc = "\n".join([
f"- {name}: {tool.description}"
for name, tool in self.tools.items()
])
prompt = f"""你是一个智能助手,可以使用以下工具来完成任务:
{tools_desc}
请使用以下格式:
Thought: 你的思考过程
Action: 工具名称
Action Input: {{"参数名": "参数值"}}
Observation: 工具执行结果
... (重复Thought/Action/Observation多次)
Thought: 我已经得到了答案
Final Answer: 最终答案
重要规则:
1. 每次只执行一个Action
2. Action Input必须是有效的JSON格式
3. 如果工具执行失败,考虑使用其他工具或方法
4. 如果无法完成任务,明确说明原因
用户问题:{query}
开始!
"""
return prompt
def _build_prompt_with_history(
self,
initial_prompt: str,
intermediate_steps: List
) -> str:
"""构建包含历史的提示词"""
history = []
for action, observation in intermediate_steps:
history.append(f"Thought: {action.log}")
history.append(f"Action: {action.tool_name}")
history.append(f"Action Input: {action.tool_input}")
history.append(f"Observation: {observation.output}")
if history:
return initial_prompt + "\n\n" + "\n".join(history) + "\n\n"
return initial_prompt
def _parse_llm_output(self, output: str) -> Dict[str, Any]:
"""解析LLM输出"""
import re
import json
result = {}
# 提取Thought
thought_match = re.search(r'Thought:\s*(.+?)(?=\n|$)', output, re.DOTALL)
if thought_match:
result["thought"] = thought_match.group(1).strip()
# 检查是否是最终答案
final_match = re.search(r'Final Answer:\s*(.+?)$', output, re.DOTALL)
if final_match:
result["final_answer"] = final_match.group(1).strip()
return result
# 提取Action
action_match = re.search(r'Action:\s*(\w+)', output)
if action_match:
result["action"] = action_match.group(1).strip()
# 提取Action Input
input_match = re.search(r'Action Input:\s*({.+?})', output, re.DOTALL)
if input_match:
try:
result["action_input"] = json.loads(input_match.group(1))
except json.JSONDecodeError:
result["action_input"] = {}
return result
def _is_recoverable(self, observation: AgentObservation) -> bool:
"""判断错误是否可恢复"""
if not observation.error:
return True
# 可恢复的错误类型
recoverable_errors = [
"timeout",
"rate limit",
"temporary",
"retry"
]
return any(
err.lower() in observation.error.lower()
for err in recoverable_errors
)
def _calculate_cost(self, usage) -> float:
"""计算成本"""
# GPT-4定价(示例)
input_cost = usage.prompt_tokens * 0.00003 # $0.03/1K tokens
output_cost = usage.completion_tokens * 0.00006 # $0.06/1K tokens
return input_cost + output_cost
def _build_result(
self,
output: str,
trace: List[Dict],
execution_time: float,
success: bool,
error: Optional[str] = None
) -> Dict[str, Any]:
"""构建结果"""
return {
"output": output,
"success": success,
"error": error,
"metadata": {
"execution_time": execution_time,
"iterations": self.iteration_count,
"total_tokens": self.total_tokens,
"total_cost": self.total_cost,
"trace": trace if self.enable_tracing else []
}
}
2.3 工具定义示例
# 定义具体工具
import requests
import sqlite3
def search_web(query: str, max_results: int = 5) -> str:
"""搜索网页"""
try:
# 调用搜索API(示例)
response = requests.get(
"https://api.search.com/search",
params={"q": query, "limit": max_results},
timeout=10
)
response.raise_for_status()
results = response.json()["results"]
return "\n".join([
f"{i+1}. {r['title']}: {r['snippet']}"
for i, r in enumerate(results)
])
except Exception as e:
raise RuntimeError(f"搜索失败: {str(e)}")
def query_database(sql: str, database: str = "production") -> str:
"""查询数据库"""
try:
conn = sqlite3.connect(f"{database}.db")
cursor = conn.cursor()
# SQL注入防护
if any(word in sql.upper() for word in ["DROP", "DELETE", "UPDATE", "INSERT"]):
raise ValueError("只允许SELECT查询")
cursor.execute(sql)
results = cursor.fetchall()
conn.close()
if not results:
return "查询结果为空"
# 格式化结果
return "\n".join([str(row) for row in results[:100]]) # 限制100行
except Exception as e:
raise RuntimeError(f"数据库查询失败: {str(e)}")
def calculate(expression: str) -> float:
"""计算数学表达式"""
try:
# 安全的eval(只允许数学运算)
allowed_chars = set("0123456789+-*/() .")
if not all(c in allowed_chars for c in expression):
raise ValueError("表达式包含非法字符")
result = eval(expression, {"__builtins__": {}}, {})
return round(result, 2)
except Exception as e:
raise RuntimeError(f"计算失败: {str(e)}")
# 注册工具
tools = [
Tool(
name="search_web",
description="搜索网页获取最新信息。输入:查询字符串",
func=search_web,
timeout=15,
retry=3
),
Tool(
name="query_database",
description="查询数据库。输入:SQL语句(仅支持SELECT)",
func=query_database,
timeout=30,
retry=2
),
Tool(
name="calculate",
description="计算数学表达式。输入:数学表达式字符串",
func=calculate,
timeout=5,
retry=1
)
]
# 创建Agent
agent = ReActAgent(
llm=openai,
tools=tools,
max_iterations=10,
max_execution_time=300,
enable_tracing=True
)
# 执行
result = agent.run("查询上个月销售额最高的前3个产品,并计算总销售额")
print(result)
三、Function Calling模式
3.1 OpenAI Function Calling
from typing import List, Dict, Any
import json
import openai
class FunctionCallingAgent:
"""基于OpenAI Function Calling的Agent"""
def __init__(
self,
tools: List[Dict[str, Any]],
model: str = "gpt-4-1106-preview"
):
self.tools = tools
self.model = model
self.function_map = {} # 函数名到实际函数的映射
def register_function(self, name: str, func: callable):
"""注册函数实现"""
self.function_map[name] = func
def run(self, query: str, max_iterations: int = 5) -> Dict[str, Any]:
"""执行Agent"""
messages = [
{"role": "user", "content": query}
]
for i in range(max_iterations):
# 调用LLM
response = openai.chat.completions.create(
model=self.model,
messages=messages,
tools=self.tools,
tool_choice="auto"
)
message = response.choices[0].message
# 如果没有工具调用,返回结果
if not message.tool_calls:
return {
"output": message.content,
"success": True,
"iterations": i + 1
}
# 添加LLM的响应
messages.append(message)
# 执行工具调用
for tool_call in message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
# 执行函数
try:
function_response = self.function_map[function_name](
**function_args
)
success = True
error = None
except Exception as e:
function_response = None
success = False
error = str(e)
# 添加函数结果到消息
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps({
"success": success,
"result": function_response,
"error": error
}, ensure_ascii=False)
})
return {
"output": "达到最大迭代次数",
"success": False,
"iterations": max_iterations
}
# 定义工具schema
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,例如:北京、上海"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "温度单位"
}
},
"required": ["city"]
}
}
},
{
"type": "function",
"function": {
"name": "search_products",
"description": "搜索商品信息",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
},
"category": {
"type": "string",
"description": "商品分类"
},
"max_results": {
"type": "integer",
"description": "最大返回结果数",
"default": 10
}
},
"required": ["query"]
}
}
}
]
# 实现函数
def get_weather(city: str, unit: str = "celsius") -> Dict:
"""获取天气(模拟)"""
# 调用天气API
return {
"city": city,
"temperature": 25,
"unit": unit,
"condition": "晴"
}
def search_products(
query: str,
category: str = None,
max_results: int = 10
) -> List[Dict]:
"""搜索商品(模拟)"""
# 调用商品搜索API
return [
{"name": f"商品{i}", "price": 99.9 * i}
for i in range(1, max_results + 1)
]
# 创建Agent
agent = FunctionCallingAgent(tools=tools)
agent.register_function("get_weather", get_weather)
agent.register_function("search_products", search_products)
# 执行
result = agent.run("北京今天天气怎么样?另外帮我搜索一下电子产品")
四、错误处理与容错
4.1 分层错误处理
from enum import Enum
from typing import Optional
import traceback
class ErrorSeverity(Enum):
"""错误严重程度"""
LOW = "low" # 可忽略
MEDIUM = "medium" # 需要重试
HIGH = "high" # 需要降级
CRITICAL = "critical" # 需要终止
class AgentError(Exception):
"""Agent错误基类"""
def __init__(
self,
message: str,
severity: ErrorSeverity = ErrorSeverity.MEDIUM,
recoverable: bool = True,
original_error: Optional[Exception] = None
):
self.message = message
self.severity = severity
self.recoverable = recoverable
self.original_error = original_error
super().__init__(message)
class ToolExecutionError(AgentError):
"""工具执行错误"""
pass
class LLMError(AgentError):
"""LLM调用错误"""
pass
class TimeoutError(AgentError):
"""超时错误"""
def __init__(self, message: str):
super().__init__(
message,
severity=ErrorSeverity.HIGH,
recoverable=False
)
class ErrorHandler:
"""错误处理器"""
@staticmethod
def handle_error(
error: Exception,
context: Dict[str, Any]
) -> Dict[str, Any]:
"""统一错误处理"""
# 包装为AgentError
if not isinstance(error, AgentError):
agent_error = AgentError(
message=str(error),
severity=ErrorSeverity.MEDIUM,
recoverable=True,
original_error=error
)
else:
agent_error = error
# 记录错误
logger.error(
f"Agent错误: {agent_error.message}",
extra={
"severity": agent_error.severity.value,
"recoverable": agent_error.recoverable,
"context": context,
"traceback": traceback.format_exc()
}
)
# 根据严重程度决定处理策略
if agent_error.severity == ErrorSeverity.CRITICAL:
return {
"action": "terminate",
"message": "遇到严重错误,终止执行"
}
elif agent_error.severity == ErrorSeverity.HIGH:
return {
"action": "fallback",
"message": "尝试降级处理"
}
elif agent_error.severity == ErrorSeverity.MEDIUM:
if agent_error.recoverable:
return {
"action": "retry",
"message": "尝试重试"
}
else:
return {
"action": "skip",
"message": "跳过当前步骤"
}
else: # LOW
return {
"action": "ignore",
"message": "忽略错误,继续执行"
}
# 集成到Agent
class RobustAgent(ReActAgent):
"""容错Agent"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.error_handler = ErrorHandler()
self.fallback_response = "抱歉,我遇到了一些问题,无法完成您的请求。"
def run(self, query: str) -> Dict[str, Any]:
"""执行(带容错)"""
try:
return super().run(query)
except AgentError as e:
# 错误处理
handling = self.error_handler.handle_error(e, {"query": query})
if handling["action"] == "fallback":
# 降级处理:使用简单的单轮对话
return self._fallback_response(query)
elif handling["action"] == "retry":
# 重试(限制次数)
return self._retry_with_backoff(query, max_retries=2)
else:
# 返回友好的错误信息
return {
"output": self.fallback_response,
"success": False,
"error": e.message,
"metadata": {"error_handling": handling}
}
except Exception as e:
# 未知错误
logger.exception("未知错误")
return {
"output": self.fallback_response,
"success": False,
"error": str(e)
}
def _fallback_response(self, query: str) -> Dict[str, Any]:
"""降级响应:不使用工具,直接LLM回答"""
try:
response = self.llm.chat.completions.create(
model="gpt-3.5-turbo", # 使用更便宜的模型
messages=[{"role": "user", "content": query}],
max_tokens=500
)
return {
"output": response.choices[0].message.content,
"success": True,
"metadata": {"mode": "fallback"}
}
except:
return {
"output": self.fallback_response,
"success": False
}
def _retry_with_backoff(
self,
query: str,
max_retries: int = 2
) -> Dict[str, Any]:
"""指数退避重试"""
import time
for attempt in range(max_retries):
try:
return super().run(query)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
logger.info(f"重试 {attempt + 1}/{max_retries},等待{wait_time}秒")
time.sleep(wait_time)
五、超时与熔断控制
5.1 多级超时控制
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
class TimeoutController:
"""超时控制器"""
def __init__(
self,
tool_timeout: int = 30, # 单个工具超时
iteration_timeout: int = 60, # 单次迭代超时
total_timeout: int = 300 # 总体超时
):
self.tool_timeout = tool_timeout
self.iteration_timeout = iteration_timeout
self.total_timeout = total_timeout
@asynccontextmanager
async def timeout_context(
self,
timeout: int,
error_message: str
) -> AsyncGenerator:
"""超时上下文管理器"""
try:
async with asyncio.timeout(timeout):
yield
except asyncio.TimeoutError:
raise TimeoutError(error_message)
async def execute_with_timeout(
self,
func: callable,
timeout: int,
*args,
**kwargs
):
"""带超时执行函数"""
async with self.timeout_context(
timeout,
f"执行超时({timeout}s)"
):
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
# 同步函数转异步
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: func(*args, **kwargs)
)
# 异步Agent实现
class AsyncAgent:
"""异步Agent(支持超时控制)"""
def __init__(
self,
llm,
tools: List[Tool],
timeout_controller: TimeoutController
):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.timeout_ctrl = timeout_controller
async def run(self, query: str) -> Dict[str, Any]:
"""异步执行"""
try:
# 总体超时控制
async with self.timeout_ctrl.timeout_context(
self.timeout_ctrl.total_timeout,
f"Agent总执行时间超时({self.timeout_ctrl.total_timeout}s)"
):
return await self._execute(query)
except TimeoutError as e:
return {
"output": "执行超时",
"success": False,
"error": str(e)
}
async def _execute(self, query: str) -> Dict[str, Any]:
"""实际执行逻辑"""
# ... Agent逻辑 ...
# 迭代超时控制
async with self.timeout_ctrl.timeout_context(
self.timeout_ctrl.iteration_timeout,
"单次迭代超时"
):
action = await self._get_next_action(query)
# 工具执行超时控制
observation = await self.timeout_ctrl.execute_with_timeout(
self._execute_tool,
self.timeout_ctrl.tool_timeout,
action
)
return {"output": "结果", "success": True}
5.2 熔断器模式
from datetime import datetime, timedelta
from collections import deque
class CircuitBreaker:
"""熔断器"""
def __init__(
self,
failure_threshold: int = 5, # 失败阈值
timeout: int = 60, # 熔断超时(秒)
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half_open
# 滑动窗口记录
self.recent_failures = deque(maxlen=failure_threshold * 2)
def call(self, func: callable, *args, **kwargs):
"""通过熔断器调用函数"""
if self.state == "open":
# 检查是否可以转为half_open
if self._should_attempt_reset():
self.state = "half_open"
else:
raise Exception("熔断器开启,拒绝请求")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _on_success(self):
"""成功回调"""
if self.state == "half_open":
self.state = "closed"
self.failure_count = 0
self.recent_failures.clear()
def _on_failure(self):
"""失败回调"""
self.failure_count += 1
self.last_failure_time = datetime.now()
self.recent_failures.append(datetime.now())
# 检查是否需要开启熔断
if self.failure_count >= self.failure_threshold:
self.state = "open"
logger.warning(f"熔断器开启,失败次数: {self.failure_count}")
def _should_attempt_reset(self) -> bool:
"""是否应该尝试重置"""
return (
self.last_failure_time and
datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout)
)
# 为工具添加熔断器
class ProtectedTool(Tool):
"""带熔断器的工具"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
timeout=60
)
def run(self, **kwargs):
"""执行(带熔断)"""
return self.circuit_breaker.call(self.func, **kwargs)
六、成本优化策略
6.1 Token使用优化
class TokenOptimizer:
"""Token优化器"""
def __init__(
self,
max_tokens_per_request: int = 4000,
max_cost_per_execution: float = 0.05
):
self.max_tokens_per_request = max_tokens_per_request
self.max_cost_per_execution = max_cost_per_execution
self.current_cost = 0.0
def optimize_prompt(
self,
prompt: str,
tokenizer
) -> str:
"""优化提示词长度"""
tokens = tokenizer.encode(prompt)
if len(tokens) <= self.max_tokens_per_request:
return prompt
# 截断策略:保留开头和结尾
half = self.max_tokens_per_request // 2
truncated_tokens = tokens[:half] + tokens[-half:]
return tokenizer.decode(truncated_tokens)
def should_use_expensive_model(
self,
complexity: str
) -> bool:
"""是否使用昂贵模型"""
# 简单任务使用便宜模型
if complexity == "simple":
return False
# 检查成本限制
if self.current_cost >= self.max_cost_per_execution:
logger.warning("达到成本上限,使用便宜模型")
return False
return True
def select_model(self, task_complexity: str) -> str:
"""选择模型"""
if self.should_use_expensive_model(task_complexity):
return "gpt-4"
else:
return "gpt-3.5-turbo"
# 模型分级调用
class TieredLLMCaller:
"""分级LLM调用器"""
def __init__(self):
self.models = {
"simple": {
"model": "gpt-3.5-turbo",
"cost_per_1k": 0.002,
"max_tokens": 4096
},
"medium": {
"model": "gpt-4",
"cost_per_1k": 0.03,
"max_tokens": 8192
},
"complex": {
"model": "gpt-4-32k",
"cost_per_1k": 0.06,
"max_tokens": 32768
}
}
def call(
self,
prompt: str,
complexity: str = "simple"
):
"""分级调用"""
model_config = self.models.get(complexity, self.models["simple"])
response = openai.chat.completions.create(
model=model_config["model"],
messages=[{"role": "user", "content": prompt}],
max_tokens=min(1000, model_config["max_tokens"])
)
# 计算实际成本
actual_cost = (
response.usage.total_tokens / 1000 * model_config["cost_per_1k"]
)
return {
"response": response,
"cost": actual_cost,
"model": model_config["model"]
}
6.2 缓存策略
import hashlib
import redis
import json
class AgentCache:
"""Agent结果缓存"""
def __init__(
self,
redis_client: redis.Redis,
ttl: int = 3600 # 缓存1小时
):
self.redis = redis_client
self.ttl = ttl
def _generate_key(self, query: str, context: Dict = None) -> str:
"""生成缓存key"""
cache_input = {
"query": query,
"context": context or {}
}
hash_input = json.dumps(cache_input, sort_keys=True)
return f"agent:cache:{hashlib.md5(hash_input.encode()).hexdigest()}"
def get(self, query: str, context: Dict = None) -> Optional[Dict]:
"""获取缓存"""
key = self._generate_key(query, context)
cached = self.redis.get(key)
if cached:
logger.info(f"命中缓存: {key}")
return json.loads(cached)
return None
def set(
self,
query: str,
result: Dict,
context: Dict = None
):
"""设置缓存"""
key = self._generate_key(query, context)
self.redis.setex(
key,
self.ttl,
json.dumps(result, ensure_ascii=False)
)
# 集成缓存
class CachedAgent:
"""带缓存的Agent"""
def __init__(
self,
agent: ReActAgent,
cache: AgentCache
):
self.agent = agent
self.cache = cache
def run(self, query: str) -> Dict[str, Any]:
"""执行(先查缓存)"""
# 查缓存
cached_result = self.cache.get(query)
if cached_result:
return {
**cached_result,
"from_cache": True
}
# 执行Agent
result = self.agent.run(query)
# 只缓存成功的结果
if result["success"]:
self.cache.set(query, result)
return {
**result,
"from_cache": False
}
七、可观测性与监控
7.1 结构化日志
import logging
import json
from datetime import datetime
class StructuredLogger:
"""结构化日志"""
def __init__(self, logger_name: str):
self.logger = logging.getLogger(logger_name)
def log_agent_execution(
self,
query: str,
result: Dict,
trace: List[Dict]
):
"""记录Agent执行"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"event": "agent_execution",
"query": query,
"success": result["success"],
"execution_time": result["metadata"]["execution_time"],
"iterations": result["metadata"]["iterations"],
"total_tokens": result["metadata"]["total_tokens"],
"total_cost": result["metadata"]["total_cost"],
"trace": trace
}
self.logger.info(json.dumps(log_entry, ensure_ascii=False))
def log_tool_execution(
self,
tool_name: str,
inputs: Dict,
output: Any,
execution_time: float,
success: bool,
error: Optional[str] = None
):
"""记录工具执行"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"event": "tool_execution",
"tool_name": tool_name,
"inputs": inputs,
"output": str(output)[:500], # 限制长度
"execution_time": execution_time,
"success": success,
"error": error
}
level = logging.INFO if success else logging.ERROR
self.logger.log(level, json.dumps(log_entry, ensure_ascii=False))
7.2 Metrics收集
from prometheus_client import Counter, Histogram, Gauge
# Agent执行指标
agent_requests_total = Counter(
'agent_requests_total',
'Total agent requests',
['status', 'complexity']
)
agent_execution_time = Histogram(
'agent_execution_time_seconds',
'Agent execution time',
['complexity'],
buckets=[1, 5, 10, 30, 60, 120, 300]
)
agent_iterations = Histogram(
'agent_iterations',
'Number of iterations per execution',
buckets=[1, 2, 3, 5, 10, 15, 20]
)
agent_cost = Histogram(
'agent_cost_usd',
'Agent execution cost in USD',
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5]
)
# 工具执行指标
tool_calls_total = Counter(
'tool_calls_total',
'Total tool calls',
['tool_name', 'status']
)
tool_execution_time = Histogram(
'tool_execution_time_seconds',
'Tool execution time',
['tool_name'],
buckets=[0.1, 0.5, 1, 5, 10, 30]
)
# 使用示例
class MonitoredAgent:
"""带监控的Agent"""
def run(self, query: str, complexity: str = "medium") -> Dict:
agent_requests_total.labels(
status='started',
complexity=complexity
).inc()
start_time = time.time()
try:
result = super().run(query)
# 记录成功指标
agent_requests_total.labels(
status='success',
complexity=complexity
).inc()
execution_time = time.time() - start_time
agent_execution_time.labels(complexity=complexity).observe(execution_time)
agent_iterations.observe(result["metadata"]["iterations"])
agent_cost.observe(result["metadata"]["total_cost"])
return result
except Exception as e:
agent_requests_total.labels(
status='error',
complexity=complexity
).inc()
raise
八、生产级最佳实践总结
8.1 架构设计清单
- [ ] 错误处理:分层错误处理、优雅降级
- [ ] 超时控制:工具/迭代/总体三级超时
- [ ] 熔断机制:防止级联失败
- [ ] 重试策略:指数退避、限制次数
- [ ] 成本控制:模型分级、Token优化、缓存
- [ ] 可观测性:结构化日志、Metrics、链路追踪
- [ ] 限流保护:并发控制、队列管理
- [ ] 安全防护:输入验证、权限控制、SQL注入防护
8.2 关键指标对比
| 指标 | Demo | 生产环境 | 提升 |
|---|---|---|---|
| 成功率 | 60% | 96% | +60% |
| 平均成本 | $0.15 | $0.03 | -80% |
| P99延迟 | 180s | 28s | -84% |
| 可用性 | N/A | 99.5% | - |
| 错误恢复 | 0% | 85% | - |
8.3 技术选型建议
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 简单任务 | Function Calling | 性能好、成本低 |
| 复杂推理 | ReAct | 灵活性高 |
| 多步骤工作流 | LangGraph | 状态管理强 |
| 高并发 | 异步Agent + 队列 | 吞吐量高 |
| 低成本 | 缓存 + 模型分级 | 成本降低80%+ |