多 Agent 协作系统
概述
多 Agent 系统(Multi-Agent System, MAS)通过多个专业化 Agent 的协作来解决复杂任务。本章深入讲解多 Agent 系统的架构模式、通信机制、协作策略和实战案例。
多 Agent 架构模式
架构对比
┌─────────────────────────────────────────────────────────────────────────────┐
│ Multi-Agent Architecture Patterns │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Hierarchical (层级式) │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ ┌──────────────┐ │ │
│ │ │ Supervisor │ │ │
│ │ │ Agent │ │ │
│ │ └───────┬──────┘ │ │
│ │ ┌──────────────┼──────────────┐ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Worker A │ │ Worker B │ │ Worker C │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │
│ │ 特点: 中心化控制,明确分工 │ │
│ │ 优点: 易于管理和调试 │ │
│ │ 缺点: 单点瓶颈,扩展性受限 │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ 2. Collaborative (协作式) │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Agent A │◀───────▶│ Agent B │◀───────▶│ Agent C │ │ │
│ │ │(Research)│ │(Writing) │ │(Review) │ │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │
│ │ │ │ │ │ │
│ │ └────────────────────┴────────────────────┘ │ │
│ │ Shared Workspace │ │
│ │ │ │
│ │ 特点: 对等通信,共享工作区 │ │
│ │ 优点: 灵活协作,无单点故障 │ │
│ │ 缺点: 协调复杂,可能冲突 │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ 3. Pipeline (流水线式) │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ Input ──▶ Agent A ──▶ Agent B ──▶ Agent C ──▶ Agent D ──▶ Output │ │
│ │ (Parse) (Plan) (Execute) (Validate) │ │
│ │ │ │
│ │ 特点: 顺序处理,明确流程 │ │
│ │ 优点: 流程清晰,易于优化 │ │
│ │ 缺点: 延迟累积,不适合交互 │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ 4. Debate/Adversarial (辩论式) │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Agent A │◀────── Debate ─────────▶│ Agent B │ │ │
│ │ │(Pro) │ │(Con) │ │ │
│ │ └────┬─────┘ └────┬─────┘ │ │
│ │ │ │ │ │
│ │ └──────────────┬──────────────────────┘ │ │
│ │ ▼ │ │
│ │ ┌──────────┐ │ │
│ │ │ Judge │ │ │
│ │ └──────────┘ │ │
│ │ │ │
│ │ 特点: 对抗性验证,多角度思考 │ │
│ │ 优点: 提高准确性,减少偏见 │ │
│ │ 缺点: 资源消耗大,可能僵局 │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
核心组件实现
Agent 基础框架
"""
多 Agent 系统核心框架
"""
from typing import List, Dict, Optional, Any, Callable
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from enum import Enum
import asyncio
import json
from datetime import datetime
class MessageType(Enum):
"""消息类型"""
TASK = "task"
RESULT = "result"
QUERY = "query"
RESPONSE = "response"
BROADCAST = "broadcast"
ERROR = "error"
@dataclass
class Message:
"""Agent 间消息"""
id: str
type: MessageType
sender: str
receiver: str # "*" 表示广播
content: Any
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict = field(default_factory=dict)
reply_to: Optional[str] = None
@dataclass
class AgentConfig:
"""Agent 配置"""
name: str
role: str
description: str
capabilities: List[str] = field(default_factory=list)
model: str = "gpt-4"
temperature: float = 0.7
max_tokens: int = 4000
tools: List[str] = field(default_factory=list)
class BaseAgent(ABC):
"""Agent 基类"""
def __init__(self, config: AgentConfig, llm, tools: Dict = None):
self.config = config
self.llm = llm
self.tools = tools or {}
self.inbox: asyncio.Queue = asyncio.Queue()
self.outbox: asyncio.Queue = asyncio.Queue()
self.running = False
self.message_handlers: Dict[MessageType, Callable] = {}
# 注册默认处理器
self._register_handlers()
def _register_handlers(self):
"""注册消息处理器"""
self.message_handlers[MessageType.TASK] = self.handle_task
self.message_handlers[MessageType.QUERY] = self.handle_query
self.message_handlers[MessageType.RESULT] = self.handle_result
async def start(self):
"""启动 Agent"""
self.running = True
asyncio.create_task(self._message_loop())
async def stop(self):
"""停止 Agent"""
self.running = False
async def _message_loop(self):
"""消息处理循环"""
while self.running:
try:
message = await asyncio.wait_for(
self.inbox.get(),
timeout=1.0
)
await self._process_message(message)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Agent {self.config.name} error: {e}")
async def _process_message(self, message: Message):
"""处理消息"""
handler = self.message_handlers.get(message.type)
if handler:
try:
response = await handler(message)
if response:
await self.send(response)
except Exception as e:
error_msg = Message(
id=self._generate_id(),
type=MessageType.ERROR,
sender=self.config.name,
receiver=message.sender,
content={"error": str(e)},
reply_to=message.id
)
await self.send(error_msg)
async def send(self, message: Message):
"""发送消息"""
await self.outbox.put(message)
async def receive(self, message: Message):
"""接收消息"""
await self.inbox.put(message)
@abstractmethod
async def handle_task(self, message: Message) -> Optional[Message]:
"""处理任务消息"""
pass
async def handle_query(self, message: Message) -> Optional[Message]:
"""处理查询消息"""
# 默认实现
return None
async def handle_result(self, message: Message) -> Optional[Message]:
"""处理结果消息"""
# 默认实现
return None
def get_system_prompt(self) -> str:
"""获取系统提示"""
return f"""You are {self.config.name}, a {self.config.role}.
Description: {self.config.description}
Your capabilities:
{chr(10).join(f'- {c}' for c in self.config.capabilities)}
Respond professionally and stay focused on your role."""
def _generate_id(self) -> str:
import uuid
return str(uuid.uuid4())
class MessageBus:
"""消息总线"""
def __init__(self):
self.agents: Dict[str, BaseAgent] = {}
self.message_history: List[Message] = []
self.running = False
def register(self, agent: BaseAgent):
"""注册 Agent"""
self.agents[agent.config.name] = agent
def unregister(self, agent_name: str):
"""注销 Agent"""
self.agents.pop(agent_name, None)
async def start(self):
"""启动消息总线"""
self.running = True
# 启动所有 Agent
for agent in self.agents.values():
await agent.start()
# 启动路由循环
asyncio.create_task(self._routing_loop())
async def stop(self):
"""停止消息总线"""
self.running = False
for agent in self.agents.values():
await agent.stop()
async def _routing_loop(self):
"""消息路由循环"""
while self.running:
for agent in self.agents.values():
try:
message = await asyncio.wait_for(
agent.outbox.get(),
timeout=0.1
)
await self._route_message(message)
except asyncio.TimeoutError:
continue
async def _route_message(self, message: Message):
"""路由消息"""
self.message_history.append(message)
if message.receiver == "*":
# 广播
for name, agent in self.agents.items():
if name != message.sender:
await agent.receive(message)
else:
# 点对点
target = self.agents.get(message.receiver)
if target:
await target.receive(message)
async def send_to(self, agent_name: str, message: Message):
"""发送消息给指定 Agent"""
agent = self.agents.get(agent_name)
if agent:
await agent.receive(message)
层级式多 Agent 系统
"""
层级式多 Agent 系统实现
"""
from typing import List, Dict, Optional
from dataclasses import dataclass
import json
@dataclass
class Task:
"""任务定义"""
id: str
description: str
subtasks: List["Task"] = None
assigned_to: str = None
status: str = "pending"
result: Any = None
class SupervisorAgent(BaseAgent):
"""监督者 Agent"""
def __init__(self, config: AgentConfig, llm, worker_agents: List[str]):
super().__init__(config, llm)
self.worker_agents = worker_agents
self.pending_tasks: Dict[str, Task] = {}
self.completed_tasks: Dict[str, Task] = {}
async def handle_task(self, message: Message) -> Optional[Message]:
"""处理任务 - 分解并分配"""
task_description = message.content.get("task", "")
# 1. 分解任务
subtasks = await self._decompose_task(task_description)
# 2. 分配任务
assignments = await self._assign_tasks(subtasks)
# 3. 发送任务给 worker
for assignment in assignments:
task_msg = Message(
id=self._generate_id(),
type=MessageType.TASK,
sender=self.config.name,
receiver=assignment["agent"],
content={
"task": assignment["task"],
"context": assignment.get("context", "")
}
)
await self.send(task_msg)
# 记录待处理任务
self.pending_tasks[task_msg.id] = Task(
id=task_msg.id,
description=assignment["task"],
assigned_to=assignment["agent"]
)
return None
async def handle_result(self, message: Message) -> Optional[Message]:
"""处理 worker 返回的结果"""
task_id = message.reply_to
result = message.content.get("result", "")
if task_id in self.pending_tasks:
task = self.pending_tasks.pop(task_id)
task.status = "completed"
task.result = result
self.completed_tasks[task_id] = task
# 检查是否所有任务完成
if not self.pending_tasks:
# 综合结果
final_result = await self._synthesize_results()
return Message(
id=self._generate_id(),
type=MessageType.RESULT,
sender=self.config.name,
receiver="user",
content={"result": final_result}
)
return None
async def _decompose_task(self, task: str) -> List[Dict]:
"""分解任务"""
workers_info = "\n".join([
f"- {name}" for name in self.worker_agents
])
prompt = f"""You are a task manager. Decompose the following task into subtasks
that can be assigned to specialized workers.
Available workers:
{workers_info}
Task: {task}
Output a JSON array of subtasks:
[
{{"task": "subtask description", "worker_type": "preferred worker"}},
...
]
Only output the JSON array, nothing else."""
response = await self.llm.chat([{"role": "user", "content": prompt}])
try:
subtasks = json.loads(response)
return subtasks
except json.JSONDecodeError:
return [{"task": task, "worker_type": self.worker_agents[0]}]
async def _assign_tasks(self, subtasks: List[Dict]) -> List[Dict]:
"""分配任务给 worker"""
assignments = []
for subtask in subtasks:
# 简单分配逻辑
preferred = subtask.get("worker_type", "")
assigned = None
for agent in self.worker_agents:
if preferred.lower() in agent.lower():
assigned = agent
break
if not assigned:
assigned = self.worker_agents[0]
assignments.append({
"agent": assigned,
"task": subtask["task"],
"context": subtask.get("context", "")
})
return assignments
async def _synthesize_results(self) -> str:
"""综合所有结果"""
results_text = []
for task_id, task in self.completed_tasks.items():
results_text.append(f"Task: {task.description}\nResult: {task.result}\n")
prompt = f"""Synthesize the following task results into a coherent final response:
{chr(10).join(results_text)}
Final Response:"""
response = await self.llm.chat([{"role": "user", "content": prompt}])
return response
class WorkerAgent(BaseAgent):
"""工作者 Agent"""
async def handle_task(self, message: Message) -> Optional[Message]:
"""执行任务"""
task = message.content.get("task", "")
context = message.content.get("context", "")
# 执行任务
result = await self._execute_task(task, context)
# 返回结果
return Message(
id=self._generate_id(),
type=MessageType.RESULT,
sender=self.config.name,
receiver=message.sender,
content={"result": result},
reply_to=message.id
)
async def _execute_task(self, task: str, context: str) -> str:
"""执行具体任务"""
system_prompt = self.get_system_prompt()
user_prompt = f"Task: {task}"
if context:
user_prompt = f"Context: {context}\n\n{user_prompt}"
response = await self.llm.chat([
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
])
return response
class HierarchicalSystem:
"""层级式多 Agent 系统"""
def __init__(self, llm):
self.llm = llm
self.bus = MessageBus()
# 创建 Worker Agents
self.workers = {
"researcher": WorkerAgent(
AgentConfig(
name="researcher",
role="Research Specialist",
description="Gathers and analyzes information",
capabilities=["web search", "data analysis", "fact checking"]
),
llm
),
"writer": WorkerAgent(
AgentConfig(
name="writer",
role="Content Writer",
description="Creates well-written content",
capabilities=["writing", "editing", "formatting"]
),
llm
),
"coder": WorkerAgent(
AgentConfig(
name="coder",
role="Software Developer",
description="Writes and reviews code",
capabilities=["coding", "debugging", "code review"]
),
llm
)
}
# 创建 Supervisor
self.supervisor = SupervisorAgent(
AgentConfig(
name="supervisor",
role="Project Manager",
description="Coordinates tasks and synthesizes results",
capabilities=["task decomposition", "delegation", "synthesis"]
),
llm,
list(self.workers.keys())
)
# 注册所有 Agent
self.bus.register(self.supervisor)
for worker in self.workers.values():
self.bus.register(worker)
async def run(self, task: str) -> str:
"""执行任务"""
await self.bus.start()
# 发送初始任务
initial_msg = Message(
id="initial",
type=MessageType.TASK,
sender="user",
receiver="supervisor",
content={"task": task}
)
await self.bus.send_to("supervisor", initial_msg)
# 等待结果
result = await self._wait_for_result()
await self.bus.stop()
return result
async def _wait_for_result(self, timeout: float = 300) -> str:
"""等待最终结果"""
import time
start = time.time()
while time.time() - start < timeout:
# 检查消息历史中是否有最终结果
for msg in reversed(self.bus.message_history):
if msg.type == MessageType.RESULT and msg.receiver == "user":
return msg.content.get("result", "")
await asyncio.sleep(0.5)
return "Timeout waiting for result"
协作式多 Agent 系统
"""
协作式多 Agent 系统实现
"""
from typing import List, Dict, Set, Optional
import asyncio
@dataclass
class SharedState:
"""共享状态"""
data: Dict = field(default_factory=dict)
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
async def get(self, key: str) -> Any:
async with self.lock:
return self.data.get(key)
async def set(self, key: str, value: Any):
async with self.lock:
self.data[key] = value
async def update(self, key: str, updater: Callable):
async with self.lock:
if key in self.data:
self.data[key] = updater(self.data[key])
class CollaborativeAgent(BaseAgent):
"""协作 Agent"""
def __init__(
self,
config: AgentConfig,
llm,
shared_state: SharedState,
peers: List[str] = None
):
super().__init__(config, llm)
self.shared_state = shared_state
self.peers = peers or []
self.contributions: List[Dict] = []
async def handle_task(self, message: Message) -> Optional[Message]:
"""处理任务 - 协作模式"""
task = message.content.get("task", "")
# 1. 分析任务,决定自己能贡献什么
my_contribution = await self._analyze_and_contribute(task)
# 2. 保存到共享状态
await self._save_contribution(my_contribution)
# 3. 通知其他 Agent
await self._notify_peers(my_contribution)
return None
async def handle_response(self, message: Message) -> Optional[Message]:
"""处理其他 Agent 的通知"""
contribution = message.content.get("contribution", {})
# 根据其他 Agent 的贡献,决定是否需要补充
supplement = await self._consider_supplement(contribution)
if supplement:
await self._save_contribution(supplement)
await self._notify_peers(supplement)
return None
async def _analyze_and_contribute(self, task: str) -> Dict:
"""分析任务并产出贡献"""
# 获取当前共享状态
current_work = await self.shared_state.get("work_in_progress") or []
prompt = f"""You are {self.config.name}, a {self.config.role}.
Task: {task}
Current progress from other team members:
{json.dumps(current_work, indent=2) if current_work else "No progress yet."}
Based on your expertise in {', '.join(self.config.capabilities)},
what unique contribution can you make?
Provide your contribution in JSON format:
{{
"type": "your contribution type",
"content": "your contribution",
"builds_on": ["ids of contributions you're building on"]
}}"""
response = await self.llm.chat([{"role": "user", "content": prompt}])
try:
contribution = json.loads(response)
contribution["id"] = self._generate_id()
contribution["author"] = self.config.name
return contribution
except json.JSONDecodeError:
return {
"id": self._generate_id(),
"type": "general",
"content": response,
"author": self.config.name,
"builds_on": []
}
async def _save_contribution(self, contribution: Dict):
"""保存贡献到共享状态"""
await self.shared_state.update(
"work_in_progress",
lambda work: (work or []) + [contribution]
)
self.contributions.append(contribution)
async def _notify_peers(self, contribution: Dict):
"""通知其他 Agent"""
for peer in self.peers:
msg = Message(
id=self._generate_id(),
type=MessageType.RESPONSE,
sender=self.config.name,
receiver=peer,
content={"contribution": contribution}
)
await self.send(msg)
async def _consider_supplement(self, other_contribution: Dict) -> Optional[Dict]:
"""考虑是否需要补充"""
prompt = f"""You are {self.config.name}. Another team member made this contribution:
{json.dumps(other_contribution, indent=2)}
Based on your expertise in {', '.join(self.config.capabilities)},
do you have anything to add or improve?
If yes, provide your supplement in JSON format:
{{"type": "supplement", "content": "your addition", "builds_on": ["{other_contribution['id']}"]}}
If no, respond with: null"""
response = await self.llm.chat([{"role": "user", "content": prompt}])
try:
if response.strip().lower() == "null":
return None
supplement = json.loads(response)
supplement["id"] = self._generate_id()
supplement["author"] = self.config.name
return supplement
except:
return None
class CollaborativeSystem:
"""协作式多 Agent 系统"""
def __init__(self, llm):
self.llm = llm
self.shared_state = SharedState()
self.bus = MessageBus()
self.agents: Dict[str, CollaborativeAgent] = {}
def add_agent(self, config: AgentConfig):
"""添加 Agent"""
# 更新所有现有 Agent 的 peers 列表
peer_names = list(self.agents.keys())
agent = CollaborativeAgent(
config,
self.llm,
self.shared_state,
peers=peer_names
)
# 更新现有 Agent 的 peers
for existing in self.agents.values():
existing.peers.append(config.name)
self.agents[config.name] = agent
self.bus.register(agent)
async def run(self, task: str, max_rounds: int = 3) -> str:
"""执行协作任务"""
await self.bus.start()
# 初始化共享状态
await self.shared_state.set("task", task)
await self.shared_state.set("work_in_progress", [])
# 向所有 Agent 发送任务
for agent_name in self.agents.keys():
msg = Message(
id=self._generate_id(),
type=MessageType.TASK,
sender="coordinator",
receiver=agent_name,
content={"task": task}
)
await self.bus.send_to(agent_name, msg)
# 等待协作完成
await asyncio.sleep(5) # 给 Agent 时间协作
for _ in range(max_rounds):
await asyncio.sleep(2)
# 检查是否有新贡献
work = await self.shared_state.get("work_in_progress") or []
if len(work) >= len(self.agents) * 2: # 每个 Agent 至少贡献 2 次
break
# 综合结果
result = await self._synthesize()
await self.bus.stop()
return result
async def _synthesize(self) -> str:
"""综合所有贡献"""
work = await self.shared_state.get("work_in_progress") or []
prompt = f"""Synthesize the following contributions from a collaborative team:
{json.dumps(work, indent=2)}
Create a coherent final output that incorporates the best of each contribution."""
response = await self.llm.chat([{"role": "user", "content": prompt}])
return response
def _generate_id(self) -> str:
import uuid
return str(uuid.uuid4())
辩论式多 Agent 系统
"""
辩论式多 Agent 系统
"""
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class Argument:
"""论点"""
id: str
position: str # "pro" or "con"
claim: str
evidence: List[str]
rebuttals: List[str] = field(default_factory=list)
author: str = ""
class DebateAgent(BaseAgent):
"""辩论 Agent"""
def __init__(
self,
config: AgentConfig,
llm,
position: str # "pro" or "con"
):
super().__init__(config, llm)
self.position = position
self.arguments: List[Argument] = []
async def make_argument(self, topic: str, opponent_args: List[Argument] = None) -> Argument:
"""提出论点"""
opponent_text = ""
if opponent_args:
opponent_text = "\n".join([
f"- {arg.claim}: {', '.join(arg.evidence)}"
for arg in opponent_args
])
position_word = "support" if self.position == "pro" else "oppose"
prompt = f"""You are arguing to {position_word} the following topic:
Topic: {topic}
{"Opponent's arguments:" + chr(10) + opponent_text if opponent_text else ""}
Make a strong argument with:
1. A clear claim
2. Supporting evidence (2-3 points)
3. If opponent arguments exist, include rebuttals
Output in JSON format:
{{
"claim": "your main claim",
"evidence": ["evidence 1", "evidence 2"],
"rebuttals": ["rebuttal to opponent point 1", ...]
}}"""
response = await self.llm.chat([{"role": "user", "content": prompt}])
try:
data = json.loads(response)
arg = Argument(
id=self._generate_id(),
position=self.position,
claim=data["claim"],
evidence=data.get("evidence", []),
rebuttals=data.get("rebuttals", []),
author=self.config.name
)
self.arguments.append(arg)
return arg
except json.JSONDecodeError:
arg = Argument(
id=self._generate_id(),
position=self.position,
claim=response,
evidence=[],
author=self.config.name
)
self.arguments.append(arg)
return arg
class JudgeAgent(BaseAgent):
"""评判 Agent"""
async def evaluate(
self,
topic: str,
pro_args: List[Argument],
con_args: List[Argument]
) -> Dict:
"""评估辩论"""
pro_text = "\n".join([
f"Claim: {arg.claim}\n Evidence: {', '.join(arg.evidence)}\n Rebuttals: {', '.join(arg.rebuttals)}"
for arg in pro_args
])
con_text = "\n".join([
f"Claim: {arg.claim}\n Evidence: {', '.join(arg.evidence)}\n Rebuttals: {', '.join(arg.rebuttals)}"
for arg in con_args
])
prompt = f"""As an impartial judge, evaluate the following debate:
Topic: {topic}
PRO Arguments:
{pro_text}
CON Arguments:
{con_text}
Evaluate based on:
1. Strength of evidence
2. Quality of reasoning
3. Effectiveness of rebuttals
Provide your judgment in JSON format:
{{
"winner": "pro" or "con" or "tie",
"score_pro": 0-100,
"score_con": 0-100,
"key_insights": ["insight 1", "insight 2"],
"synthesis": "A balanced conclusion considering both sides"
}}"""
response = await self.llm.chat([{"role": "user", "content": prompt}])
try:
return json.loads(response)
except json.JSONDecodeError:
return {
"winner": "tie",
"score_pro": 50,
"score_con": 50,
"synthesis": response
}
class DebateSystem:
"""辩论系统"""
def __init__(self, llm, num_rounds: int = 3):
self.llm = llm
self.num_rounds = num_rounds
self.pro_agent = DebateAgent(
AgentConfig(
name="pro_debater",
role="Pro Debater",
description="Argues in favor of the topic",
capabilities=["argumentation", "evidence gathering", "persuasion"]
),
llm,
position="pro"
)
self.con_agent = DebateAgent(
AgentConfig(
name="con_debater",
role="Con Debater",
description="Argues against the topic",
capabilities=["argumentation", "evidence gathering", "persuasion"]
),
llm,
position="con"
)
self.judge = JudgeAgent(
AgentConfig(
name="judge",
role="Debate Judge",
description="Impartially evaluates debate arguments",
capabilities=["critical thinking", "fair evaluation", "synthesis"]
),
llm
)
async def debate(self, topic: str) -> Dict:
"""进行辩论"""
print(f"Debate Topic: {topic}\n")
for round_num in range(1, self.num_rounds + 1):
print(f"=== Round {round_num} ===")
# Pro 先发言
pro_arg = await self.pro_agent.make_argument(
topic,
self.con_agent.arguments if round_num > 1 else None
)
print(f"\nPRO: {pro_arg.claim}")
# Con 回应
con_arg = await self.con_agent.make_argument(
topic,
self.pro_agent.arguments
)
print(f"CON: {con_arg.claim}")
# 评判
print("\n=== Final Judgment ===")
judgment = await self.judge.evaluate(
topic,
self.pro_agent.arguments,
self.con_agent.arguments
)
print(f"Winner: {judgment.get('winner', 'tie')}")
print(f"Synthesis: {judgment.get('synthesis', '')}")
return {
"topic": topic,
"pro_arguments": [vars(a) for a in self.pro_agent.arguments],
"con_arguments": [vars(a) for a in self.con_agent.arguments],
"judgment": judgment
}
实战案例:代码开发团队
"""
代码开发多 Agent 团队
"""
from typing import List, Dict
class DevelopmentTeam:
"""开发团队多 Agent 系统"""
def __init__(self, llm, tools: ToolRegistry):
self.llm = llm
self.tools = tools
self.bus = MessageBus()
# 产品经理
self.pm = WorkerAgent(
AgentConfig(
name="product_manager",
role="Product Manager",
description="Defines requirements and user stories",
capabilities=["requirement analysis", "user story creation", "priority setting"]
),
llm
)
# 架构师
self.architect = WorkerAgent(
AgentConfig(
name="architect",
role="Software Architect",
description="Designs system architecture",
capabilities=["system design", "API design", "technology selection"]
),
llm
)
# 开发者
self.developer = WorkerAgent(
AgentConfig(
name="developer",
role="Software Developer",
description="Implements features",
capabilities=["coding", "debugging", "unit testing"]
),
llm
)
# 代码审查员
self.reviewer = WorkerAgent(
AgentConfig(
name="reviewer",
role="Code Reviewer",
description="Reviews code for quality and best practices",
capabilities=["code review", "security analysis", "performance review"]
),
llm
)
# QA 工程师
self.qa = WorkerAgent(
AgentConfig(
name="qa_engineer",
role="QA Engineer",
description="Tests and validates functionality",
capabilities=["test planning", "test execution", "bug reporting"]
),
llm
)
# 团队协调者
self.coordinator = SupervisorAgent(
AgentConfig(
name="tech_lead",
role="Technical Lead",
description="Coordinates the development team",
capabilities=["project management", "technical decisions", "team coordination"]
),
llm,
["product_manager", "architect", "developer", "reviewer", "qa_engineer"]
)
# 注册所有 Agent
for agent in [self.pm, self.architect, self.developer, self.reviewer, self.qa, self.coordinator]:
self.bus.register(agent)
async def develop_feature(self, feature_request: str) -> Dict:
"""开发一个功能"""
await self.bus.start()
# 发起开发流程
msg = Message(
id="feature_request",
type=MessageType.TASK,
sender="user",
receiver="tech_lead",
content={"task": f"Develop the following feature:\n{feature_request}"}
)
await self.bus.send_to("tech_lead", msg)
# 等待完成
result = await self._wait_for_completion()
await self.bus.stop()
return result
async def _wait_for_completion(self, timeout: float = 600) -> Dict:
"""等待开发完成"""
import time
start = time.time()
while time.time() - start < timeout:
for msg in reversed(self.bus.message_history):
if msg.type == MessageType.RESULT and msg.receiver == "user":
return msg.content
await asyncio.sleep(1)
return {"error": "Timeout"}
小结
本章深入讲解了多 Agent 协作系统:
- 架构模式:层级式、协作式、流水线、辩论式
- 核心组件:消息总线、共享状态、Agent 基类
- 层级系统:Supervisor-Worker 模式
- 协作系统:对等协作与共享工作区
- 辩论系统:多角度验证与评判
- 实战案例:代码开发团队
下一章我们将探讨 LLM 安全与防护,讲解如何构建安全可靠的 LLM 应用。