HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

多 Agent 协作系统

概述

多 Agent 系统(Multi-Agent System, MAS)通过多个专业化 Agent 的协作来解决复杂任务。本章深入讲解多 Agent 系统的架构模式、通信机制、协作策略和实战案例。

多 Agent 架构模式

架构对比

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Multi-Agent Architecture Patterns                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  1. Hierarchical (层级式)                                                   │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │                     ┌──────────────┐                                  │ │
│  │                     │  Supervisor  │                                  │ │
│  │                     │    Agent     │                                  │ │
│  │                     └───────┬──────┘                                  │ │
│  │              ┌──────────────┼──────────────┐                          │ │
│  │              ▼              ▼              ▼                          │ │
│  │       ┌──────────┐   ┌──────────┐   ┌──────────┐                      │ │
│  │       │ Worker A │   │ Worker B │   │ Worker C │                      │ │
│  │       └──────────┘   └──────────┘   └──────────┘                      │ │
│  │                                                                       │ │
│  │  特点: 中心化控制,明确分工                                             │ │
│  │  优点: 易于管理和调试                                                  │ │
│  │  缺点: 单点瓶颈,扩展性受限                                            │ │
│  └───────────────────────────────────────────────────────────────────────┘ │
│                                                                             │
│  2. Collaborative (协作式)                                                  │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │       ┌──────────┐         ┌──────────┐         ┌──────────┐          │ │
│  │       │ Agent A  │◀───────▶│ Agent B  │◀───────▶│ Agent C  │          │ │
│  │       │(Research)│         │(Writing) │         │(Review)  │          │ │
│  │       └────┬─────┘         └────┬─────┘         └────┬─────┘          │ │
│  │            │                    │                    │                │ │
│  │            └────────────────────┴────────────────────┘                │ │
│  │                         Shared Workspace                              │ │
│  │                                                                       │ │
│  │  特点: 对等通信,共享工作区                                             │ │
│  │  优点: 灵活协作,无单点故障                                            │ │
│  │  缺点: 协调复杂,可能冲突                                              │ │
│  └───────────────────────────────────────────────────────────────────────┘ │
│                                                                             │
│  3. Pipeline (流水线式)                                                     │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │   Input ──▶ Agent A ──▶ Agent B ──▶ Agent C ──▶ Agent D ──▶ Output    │ │
│  │            (Parse)     (Plan)      (Execute)   (Validate)             │ │
│  │                                                                       │ │
│  │  特点: 顺序处理,明确流程                                               │ │
│  │  优点: 流程清晰,易于优化                                              │ │
│  │  缺点: 延迟累积,不适合交互                                            │ │
│  └───────────────────────────────────────────────────────────────────────┘ │
│                                                                             │
│  4. Debate/Adversarial (辩论式)                                             │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │       ┌──────────┐                          ┌──────────┐              │ │
│  │       │ Agent A  │◀────── Debate ─────────▶│ Agent B  │              │ │
│  │       │(Pro)     │                          │(Con)     │              │ │
│  │       └────┬─────┘                          └────┬─────┘              │ │
│  │            │                                     │                    │ │
│  │            └──────────────┬──────────────────────┘                    │ │
│  │                           ▼                                           │ │
│  │                    ┌──────────┐                                       │ │
│  │                    │  Judge   │                                       │ │
│  │                    └──────────┘                                       │ │
│  │                                                                       │ │
│  │  特点: 对抗性验证,多角度思考                                          │ │
│  │  优点: 提高准确性,减少偏见                                            │ │
│  │  缺点: 资源消耗大,可能僵局                                            │ │
│  └───────────────────────────────────────────────────────────────────────┘ │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

核心组件实现

Agent 基础框架

"""
多 Agent 系统核心框架
"""
from typing import List, Dict, Optional, Any, Callable
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from enum import Enum
import asyncio
import json
from datetime import datetime

class MessageType(Enum):
    """消息类型"""
    TASK = "task"
    RESULT = "result"
    QUERY = "query"
    RESPONSE = "response"
    BROADCAST = "broadcast"
    ERROR = "error"

@dataclass
class Message:
    """Agent 间消息"""
    id: str
    type: MessageType
    sender: str
    receiver: str  # "*" 表示广播
    content: Any
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: Dict = field(default_factory=dict)
    reply_to: Optional[str] = None

@dataclass
class AgentConfig:
    """Agent 配置"""
    name: str
    role: str
    description: str
    capabilities: List[str] = field(default_factory=list)
    model: str = "gpt-4"
    temperature: float = 0.7
    max_tokens: int = 4000
    tools: List[str] = field(default_factory=list)


class BaseAgent(ABC):
    """Agent 基类"""

    def __init__(self, config: AgentConfig, llm, tools: Dict = None):
        self.config = config
        self.llm = llm
        self.tools = tools or {}
        self.inbox: asyncio.Queue = asyncio.Queue()
        self.outbox: asyncio.Queue = asyncio.Queue()
        self.running = False
        self.message_handlers: Dict[MessageType, Callable] = {}

        # 注册默认处理器
        self._register_handlers()

    def _register_handlers(self):
        """注册消息处理器"""
        self.message_handlers[MessageType.TASK] = self.handle_task
        self.message_handlers[MessageType.QUERY] = self.handle_query
        self.message_handlers[MessageType.RESULT] = self.handle_result

    async def start(self):
        """启动 Agent"""
        self.running = True
        asyncio.create_task(self._message_loop())

    async def stop(self):
        """停止 Agent"""
        self.running = False

    async def _message_loop(self):
        """消息处理循环"""
        while self.running:
            try:
                message = await asyncio.wait_for(
                    self.inbox.get(),
                    timeout=1.0
                )
                await self._process_message(message)
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"Agent {self.config.name} error: {e}")

    async def _process_message(self, message: Message):
        """处理消息"""
        handler = self.message_handlers.get(message.type)
        if handler:
            try:
                response = await handler(message)
                if response:
                    await self.send(response)
            except Exception as e:
                error_msg = Message(
                    id=self._generate_id(),
                    type=MessageType.ERROR,
                    sender=self.config.name,
                    receiver=message.sender,
                    content={"error": str(e)},
                    reply_to=message.id
                )
                await self.send(error_msg)

    async def send(self, message: Message):
        """发送消息"""
        await self.outbox.put(message)

    async def receive(self, message: Message):
        """接收消息"""
        await self.inbox.put(message)

    @abstractmethod
    async def handle_task(self, message: Message) -> Optional[Message]:
        """处理任务消息"""
        pass

    async def handle_query(self, message: Message) -> Optional[Message]:
        """处理查询消息"""
        # 默认实现
        return None

    async def handle_result(self, message: Message) -> Optional[Message]:
        """处理结果消息"""
        # 默认实现
        return None

    def get_system_prompt(self) -> str:
        """获取系统提示"""
        return f"""You are {self.config.name}, a {self.config.role}.

Description: {self.config.description}

Your capabilities:
{chr(10).join(f'- {c}' for c in self.config.capabilities)}

Respond professionally and stay focused on your role."""

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())


class MessageBus:
    """消息总线"""

    def __init__(self):
        self.agents: Dict[str, BaseAgent] = {}
        self.message_history: List[Message] = []
        self.running = False

    def register(self, agent: BaseAgent):
        """注册 Agent"""
        self.agents[agent.config.name] = agent

    def unregister(self, agent_name: str):
        """注销 Agent"""
        self.agents.pop(agent_name, None)

    async def start(self):
        """启动消息总线"""
        self.running = True

        # 启动所有 Agent
        for agent in self.agents.values():
            await agent.start()

        # 启动路由循环
        asyncio.create_task(self._routing_loop())

    async def stop(self):
        """停止消息总线"""
        self.running = False
        for agent in self.agents.values():
            await agent.stop()

    async def _routing_loop(self):
        """消息路由循环"""
        while self.running:
            for agent in self.agents.values():
                try:
                    message = await asyncio.wait_for(
                        agent.outbox.get(),
                        timeout=0.1
                    )
                    await self._route_message(message)
                except asyncio.TimeoutError:
                    continue

    async def _route_message(self, message: Message):
        """路由消息"""
        self.message_history.append(message)

        if message.receiver == "*":
            # 广播
            for name, agent in self.agents.items():
                if name != message.sender:
                    await agent.receive(message)
        else:
            # 点对点
            target = self.agents.get(message.receiver)
            if target:
                await target.receive(message)

    async def send_to(self, agent_name: str, message: Message):
        """发送消息给指定 Agent"""
        agent = self.agents.get(agent_name)
        if agent:
            await agent.receive(message)

层级式多 Agent 系统

"""
层级式多 Agent 系统实现
"""
from typing import List, Dict, Optional
from dataclasses import dataclass
import json

@dataclass
class Task:
    """任务定义"""
    id: str
    description: str
    subtasks: List["Task"] = None
    assigned_to: str = None
    status: str = "pending"
    result: Any = None


class SupervisorAgent(BaseAgent):
    """监督者 Agent"""

    def __init__(self, config: AgentConfig, llm, worker_agents: List[str]):
        super().__init__(config, llm)
        self.worker_agents = worker_agents
        self.pending_tasks: Dict[str, Task] = {}
        self.completed_tasks: Dict[str, Task] = {}

    async def handle_task(self, message: Message) -> Optional[Message]:
        """处理任务 - 分解并分配"""
        task_description = message.content.get("task", "")

        # 1. 分解任务
        subtasks = await self._decompose_task(task_description)

        # 2. 分配任务
        assignments = await self._assign_tasks(subtasks)

        # 3. 发送任务给 worker
        for assignment in assignments:
            task_msg = Message(
                id=self._generate_id(),
                type=MessageType.TASK,
                sender=self.config.name,
                receiver=assignment["agent"],
                content={
                    "task": assignment["task"],
                    "context": assignment.get("context", "")
                }
            )
            await self.send(task_msg)

            # 记录待处理任务
            self.pending_tasks[task_msg.id] = Task(
                id=task_msg.id,
                description=assignment["task"],
                assigned_to=assignment["agent"]
            )

        return None

    async def handle_result(self, message: Message) -> Optional[Message]:
        """处理 worker 返回的结果"""
        task_id = message.reply_to
        result = message.content.get("result", "")

        if task_id in self.pending_tasks:
            task = self.pending_tasks.pop(task_id)
            task.status = "completed"
            task.result = result
            self.completed_tasks[task_id] = task

        # 检查是否所有任务完成
        if not self.pending_tasks:
            # 综合结果
            final_result = await self._synthesize_results()

            return Message(
                id=self._generate_id(),
                type=MessageType.RESULT,
                sender=self.config.name,
                receiver="user",
                content={"result": final_result}
            )

        return None

    async def _decompose_task(self, task: str) -> List[Dict]:
        """分解任务"""
        workers_info = "\n".join([
            f"- {name}" for name in self.worker_agents
        ])

        prompt = f"""You are a task manager. Decompose the following task into subtasks
that can be assigned to specialized workers.

Available workers:
{workers_info}

Task: {task}

Output a JSON array of subtasks:
[
  {{"task": "subtask description", "worker_type": "preferred worker"}},
  ...
]

Only output the JSON array, nothing else."""

        response = await self.llm.chat([{"role": "user", "content": prompt}])

        try:
            subtasks = json.loads(response)
            return subtasks
        except json.JSONDecodeError:
            return [{"task": task, "worker_type": self.worker_agents[0]}]

    async def _assign_tasks(self, subtasks: List[Dict]) -> List[Dict]:
        """分配任务给 worker"""
        assignments = []

        for subtask in subtasks:
            # 简单分配逻辑
            preferred = subtask.get("worker_type", "")
            assigned = None

            for agent in self.worker_agents:
                if preferred.lower() in agent.lower():
                    assigned = agent
                    break

            if not assigned:
                assigned = self.worker_agents[0]

            assignments.append({
                "agent": assigned,
                "task": subtask["task"],
                "context": subtask.get("context", "")
            })

        return assignments

    async def _synthesize_results(self) -> str:
        """综合所有结果"""
        results_text = []

        for task_id, task in self.completed_tasks.items():
            results_text.append(f"Task: {task.description}\nResult: {task.result}\n")

        prompt = f"""Synthesize the following task results into a coherent final response:

{chr(10).join(results_text)}

Final Response:"""

        response = await self.llm.chat([{"role": "user", "content": prompt}])
        return response


class WorkerAgent(BaseAgent):
    """工作者 Agent"""

    async def handle_task(self, message: Message) -> Optional[Message]:
        """执行任务"""
        task = message.content.get("task", "")
        context = message.content.get("context", "")

        # 执行任务
        result = await self._execute_task(task, context)

        # 返回结果
        return Message(
            id=self._generate_id(),
            type=MessageType.RESULT,
            sender=self.config.name,
            receiver=message.sender,
            content={"result": result},
            reply_to=message.id
        )

    async def _execute_task(self, task: str, context: str) -> str:
        """执行具体任务"""
        system_prompt = self.get_system_prompt()

        user_prompt = f"Task: {task}"
        if context:
            user_prompt = f"Context: {context}\n\n{user_prompt}"

        response = await self.llm.chat([
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ])

        return response


class HierarchicalSystem:
    """层级式多 Agent 系统"""

    def __init__(self, llm):
        self.llm = llm
        self.bus = MessageBus()

        # 创建 Worker Agents
        self.workers = {
            "researcher": WorkerAgent(
                AgentConfig(
                    name="researcher",
                    role="Research Specialist",
                    description="Gathers and analyzes information",
                    capabilities=["web search", "data analysis", "fact checking"]
                ),
                llm
            ),
            "writer": WorkerAgent(
                AgentConfig(
                    name="writer",
                    role="Content Writer",
                    description="Creates well-written content",
                    capabilities=["writing", "editing", "formatting"]
                ),
                llm
            ),
            "coder": WorkerAgent(
                AgentConfig(
                    name="coder",
                    role="Software Developer",
                    description="Writes and reviews code",
                    capabilities=["coding", "debugging", "code review"]
                ),
                llm
            )
        }

        # 创建 Supervisor
        self.supervisor = SupervisorAgent(
            AgentConfig(
                name="supervisor",
                role="Project Manager",
                description="Coordinates tasks and synthesizes results",
                capabilities=["task decomposition", "delegation", "synthesis"]
            ),
            llm,
            list(self.workers.keys())
        )

        # 注册所有 Agent
        self.bus.register(self.supervisor)
        for worker in self.workers.values():
            self.bus.register(worker)

    async def run(self, task: str) -> str:
        """执行任务"""
        await self.bus.start()

        # 发送初始任务
        initial_msg = Message(
            id="initial",
            type=MessageType.TASK,
            sender="user",
            receiver="supervisor",
            content={"task": task}
        )
        await self.bus.send_to("supervisor", initial_msg)

        # 等待结果
        result = await self._wait_for_result()

        await self.bus.stop()

        return result

    async def _wait_for_result(self, timeout: float = 300) -> str:
        """等待最终结果"""
        import time
        start = time.time()

        while time.time() - start < timeout:
            # 检查消息历史中是否有最终结果
            for msg in reversed(self.bus.message_history):
                if msg.type == MessageType.RESULT and msg.receiver == "user":
                    return msg.content.get("result", "")

            await asyncio.sleep(0.5)

        return "Timeout waiting for result"

协作式多 Agent 系统

"""
协作式多 Agent 系统实现
"""
from typing import List, Dict, Set, Optional
import asyncio

@dataclass
class SharedState:
    """共享状态"""
    data: Dict = field(default_factory=dict)
    lock: asyncio.Lock = field(default_factory=asyncio.Lock)

    async def get(self, key: str) -> Any:
        async with self.lock:
            return self.data.get(key)

    async def set(self, key: str, value: Any):
        async with self.lock:
            self.data[key] = value

    async def update(self, key: str, updater: Callable):
        async with self.lock:
            if key in self.data:
                self.data[key] = updater(self.data[key])


class CollaborativeAgent(BaseAgent):
    """协作 Agent"""

    def __init__(
        self,
        config: AgentConfig,
        llm,
        shared_state: SharedState,
        peers: List[str] = None
    ):
        super().__init__(config, llm)
        self.shared_state = shared_state
        self.peers = peers or []
        self.contributions: List[Dict] = []

    async def handle_task(self, message: Message) -> Optional[Message]:
        """处理任务 - 协作模式"""
        task = message.content.get("task", "")

        # 1. 分析任务,决定自己能贡献什么
        my_contribution = await self._analyze_and_contribute(task)

        # 2. 保存到共享状态
        await self._save_contribution(my_contribution)

        # 3. 通知其他 Agent
        await self._notify_peers(my_contribution)

        return None

    async def handle_response(self, message: Message) -> Optional[Message]:
        """处理其他 Agent 的通知"""
        contribution = message.content.get("contribution", {})

        # 根据其他 Agent 的贡献,决定是否需要补充
        supplement = await self._consider_supplement(contribution)

        if supplement:
            await self._save_contribution(supplement)
            await self._notify_peers(supplement)

        return None

    async def _analyze_and_contribute(self, task: str) -> Dict:
        """分析任务并产出贡献"""
        # 获取当前共享状态
        current_work = await self.shared_state.get("work_in_progress") or []

        prompt = f"""You are {self.config.name}, a {self.config.role}.

Task: {task}

Current progress from other team members:
{json.dumps(current_work, indent=2) if current_work else "No progress yet."}

Based on your expertise in {', '.join(self.config.capabilities)},
what unique contribution can you make?

Provide your contribution in JSON format:
{{
    "type": "your contribution type",
    "content": "your contribution",
    "builds_on": ["ids of contributions you're building on"]
}}"""

        response = await self.llm.chat([{"role": "user", "content": prompt}])

        try:
            contribution = json.loads(response)
            contribution["id"] = self._generate_id()
            contribution["author"] = self.config.name
            return contribution
        except json.JSONDecodeError:
            return {
                "id": self._generate_id(),
                "type": "general",
                "content": response,
                "author": self.config.name,
                "builds_on": []
            }

    async def _save_contribution(self, contribution: Dict):
        """保存贡献到共享状态"""
        await self.shared_state.update(
            "work_in_progress",
            lambda work: (work or []) + [contribution]
        )
        self.contributions.append(contribution)

    async def _notify_peers(self, contribution: Dict):
        """通知其他 Agent"""
        for peer in self.peers:
            msg = Message(
                id=self._generate_id(),
                type=MessageType.RESPONSE,
                sender=self.config.name,
                receiver=peer,
                content={"contribution": contribution}
            )
            await self.send(msg)

    async def _consider_supplement(self, other_contribution: Dict) -> Optional[Dict]:
        """考虑是否需要补充"""
        prompt = f"""You are {self.config.name}. Another team member made this contribution:

{json.dumps(other_contribution, indent=2)}

Based on your expertise in {', '.join(self.config.capabilities)},
do you have anything to add or improve?

If yes, provide your supplement in JSON format:
{{"type": "supplement", "content": "your addition", "builds_on": ["{other_contribution['id']}"]}}

If no, respond with: null"""

        response = await self.llm.chat([{"role": "user", "content": prompt}])

        try:
            if response.strip().lower() == "null":
                return None
            supplement = json.loads(response)
            supplement["id"] = self._generate_id()
            supplement["author"] = self.config.name
            return supplement
        except:
            return None


class CollaborativeSystem:
    """协作式多 Agent 系统"""

    def __init__(self, llm):
        self.llm = llm
        self.shared_state = SharedState()
        self.bus = MessageBus()
        self.agents: Dict[str, CollaborativeAgent] = {}

    def add_agent(self, config: AgentConfig):
        """添加 Agent"""
        # 更新所有现有 Agent 的 peers 列表
        peer_names = list(self.agents.keys())

        agent = CollaborativeAgent(
            config,
            self.llm,
            self.shared_state,
            peers=peer_names
        )

        # 更新现有 Agent 的 peers
        for existing in self.agents.values():
            existing.peers.append(config.name)

        self.agents[config.name] = agent
        self.bus.register(agent)

    async def run(self, task: str, max_rounds: int = 3) -> str:
        """执行协作任务"""
        await self.bus.start()

        # 初始化共享状态
        await self.shared_state.set("task", task)
        await self.shared_state.set("work_in_progress", [])

        # 向所有 Agent 发送任务
        for agent_name in self.agents.keys():
            msg = Message(
                id=self._generate_id(),
                type=MessageType.TASK,
                sender="coordinator",
                receiver=agent_name,
                content={"task": task}
            )
            await self.bus.send_to(agent_name, msg)

        # 等待协作完成
        await asyncio.sleep(5)  # 给 Agent 时间协作

        for _ in range(max_rounds):
            await asyncio.sleep(2)

            # 检查是否有新贡献
            work = await self.shared_state.get("work_in_progress") or []
            if len(work) >= len(self.agents) * 2:  # 每个 Agent 至少贡献 2 次
                break

        # 综合结果
        result = await self._synthesize()

        await self.bus.stop()

        return result

    async def _synthesize(self) -> str:
        """综合所有贡献"""
        work = await self.shared_state.get("work_in_progress") or []

        prompt = f"""Synthesize the following contributions from a collaborative team:

{json.dumps(work, indent=2)}

Create a coherent final output that incorporates the best of each contribution."""

        response = await self.llm.chat([{"role": "user", "content": prompt}])
        return response

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())

辩论式多 Agent 系统

"""
辩论式多 Agent 系统
"""
from typing import List, Dict, Optional
from dataclasses import dataclass

@dataclass
class Argument:
    """论点"""
    id: str
    position: str  # "pro" or "con"
    claim: str
    evidence: List[str]
    rebuttals: List[str] = field(default_factory=list)
    author: str = ""


class DebateAgent(BaseAgent):
    """辩论 Agent"""

    def __init__(
        self,
        config: AgentConfig,
        llm,
        position: str  # "pro" or "con"
    ):
        super().__init__(config, llm)
        self.position = position
        self.arguments: List[Argument] = []

    async def make_argument(self, topic: str, opponent_args: List[Argument] = None) -> Argument:
        """提出论点"""
        opponent_text = ""
        if opponent_args:
            opponent_text = "\n".join([
                f"- {arg.claim}: {', '.join(arg.evidence)}"
                for arg in opponent_args
            ])

        position_word = "support" if self.position == "pro" else "oppose"

        prompt = f"""You are arguing to {position_word} the following topic:

Topic: {topic}

{"Opponent's arguments:" + chr(10) + opponent_text if opponent_text else ""}

Make a strong argument with:
1. A clear claim
2. Supporting evidence (2-3 points)
3. If opponent arguments exist, include rebuttals

Output in JSON format:
{{
    "claim": "your main claim",
    "evidence": ["evidence 1", "evidence 2"],
    "rebuttals": ["rebuttal to opponent point 1", ...]
}}"""

        response = await self.llm.chat([{"role": "user", "content": prompt}])

        try:
            data = json.loads(response)
            arg = Argument(
                id=self._generate_id(),
                position=self.position,
                claim=data["claim"],
                evidence=data.get("evidence", []),
                rebuttals=data.get("rebuttals", []),
                author=self.config.name
            )
            self.arguments.append(arg)
            return arg
        except json.JSONDecodeError:
            arg = Argument(
                id=self._generate_id(),
                position=self.position,
                claim=response,
                evidence=[],
                author=self.config.name
            )
            self.arguments.append(arg)
            return arg


class JudgeAgent(BaseAgent):
    """评判 Agent"""

    async def evaluate(
        self,
        topic: str,
        pro_args: List[Argument],
        con_args: List[Argument]
    ) -> Dict:
        """评估辩论"""
        pro_text = "\n".join([
            f"Claim: {arg.claim}\n  Evidence: {', '.join(arg.evidence)}\n  Rebuttals: {', '.join(arg.rebuttals)}"
            for arg in pro_args
        ])

        con_text = "\n".join([
            f"Claim: {arg.claim}\n  Evidence: {', '.join(arg.evidence)}\n  Rebuttals: {', '.join(arg.rebuttals)}"
            for arg in con_args
        ])

        prompt = f"""As an impartial judge, evaluate the following debate:

Topic: {topic}

PRO Arguments:
{pro_text}

CON Arguments:
{con_text}

Evaluate based on:
1. Strength of evidence
2. Quality of reasoning
3. Effectiveness of rebuttals

Provide your judgment in JSON format:
{{
    "winner": "pro" or "con" or "tie",
    "score_pro": 0-100,
    "score_con": 0-100,
    "key_insights": ["insight 1", "insight 2"],
    "synthesis": "A balanced conclusion considering both sides"
}}"""

        response = await self.llm.chat([{"role": "user", "content": prompt}])

        try:
            return json.loads(response)
        except json.JSONDecodeError:
            return {
                "winner": "tie",
                "score_pro": 50,
                "score_con": 50,
                "synthesis": response
            }


class DebateSystem:
    """辩论系统"""

    def __init__(self, llm, num_rounds: int = 3):
        self.llm = llm
        self.num_rounds = num_rounds

        self.pro_agent = DebateAgent(
            AgentConfig(
                name="pro_debater",
                role="Pro Debater",
                description="Argues in favor of the topic",
                capabilities=["argumentation", "evidence gathering", "persuasion"]
            ),
            llm,
            position="pro"
        )

        self.con_agent = DebateAgent(
            AgentConfig(
                name="con_debater",
                role="Con Debater",
                description="Argues against the topic",
                capabilities=["argumentation", "evidence gathering", "persuasion"]
            ),
            llm,
            position="con"
        )

        self.judge = JudgeAgent(
            AgentConfig(
                name="judge",
                role="Debate Judge",
                description="Impartially evaluates debate arguments",
                capabilities=["critical thinking", "fair evaluation", "synthesis"]
            ),
            llm
        )

    async def debate(self, topic: str) -> Dict:
        """进行辩论"""
        print(f"Debate Topic: {topic}\n")

        for round_num in range(1, self.num_rounds + 1):
            print(f"=== Round {round_num} ===")

            # Pro 先发言
            pro_arg = await self.pro_agent.make_argument(
                topic,
                self.con_agent.arguments if round_num > 1 else None
            )
            print(f"\nPRO: {pro_arg.claim}")

            # Con 回应
            con_arg = await self.con_agent.make_argument(
                topic,
                self.pro_agent.arguments
            )
            print(f"CON: {con_arg.claim}")

        # 评判
        print("\n=== Final Judgment ===")
        judgment = await self.judge.evaluate(
            topic,
            self.pro_agent.arguments,
            self.con_agent.arguments
        )

        print(f"Winner: {judgment.get('winner', 'tie')}")
        print(f"Synthesis: {judgment.get('synthesis', '')}")

        return {
            "topic": topic,
            "pro_arguments": [vars(a) for a in self.pro_agent.arguments],
            "con_arguments": [vars(a) for a in self.con_agent.arguments],
            "judgment": judgment
        }

实战案例:代码开发团队

"""
代码开发多 Agent 团队
"""
from typing import List, Dict

class DevelopmentTeam:
    """开发团队多 Agent 系统"""

    def __init__(self, llm, tools: ToolRegistry):
        self.llm = llm
        self.tools = tools
        self.bus = MessageBus()

        # 产品经理
        self.pm = WorkerAgent(
            AgentConfig(
                name="product_manager",
                role="Product Manager",
                description="Defines requirements and user stories",
                capabilities=["requirement analysis", "user story creation", "priority setting"]
            ),
            llm
        )

        # 架构师
        self.architect = WorkerAgent(
            AgentConfig(
                name="architect",
                role="Software Architect",
                description="Designs system architecture",
                capabilities=["system design", "API design", "technology selection"]
            ),
            llm
        )

        # 开发者
        self.developer = WorkerAgent(
            AgentConfig(
                name="developer",
                role="Software Developer",
                description="Implements features",
                capabilities=["coding", "debugging", "unit testing"]
            ),
            llm
        )

        # 代码审查员
        self.reviewer = WorkerAgent(
            AgentConfig(
                name="reviewer",
                role="Code Reviewer",
                description="Reviews code for quality and best practices",
                capabilities=["code review", "security analysis", "performance review"]
            ),
            llm
        )

        # QA 工程师
        self.qa = WorkerAgent(
            AgentConfig(
                name="qa_engineer",
                role="QA Engineer",
                description="Tests and validates functionality",
                capabilities=["test planning", "test execution", "bug reporting"]
            ),
            llm
        )

        # 团队协调者
        self.coordinator = SupervisorAgent(
            AgentConfig(
                name="tech_lead",
                role="Technical Lead",
                description="Coordinates the development team",
                capabilities=["project management", "technical decisions", "team coordination"]
            ),
            llm,
            ["product_manager", "architect", "developer", "reviewer", "qa_engineer"]
        )

        # 注册所有 Agent
        for agent in [self.pm, self.architect, self.developer, self.reviewer, self.qa, self.coordinator]:
            self.bus.register(agent)

    async def develop_feature(self, feature_request: str) -> Dict:
        """开发一个功能"""
        await self.bus.start()

        # 发起开发流程
        msg = Message(
            id="feature_request",
            type=MessageType.TASK,
            sender="user",
            receiver="tech_lead",
            content={"task": f"Develop the following feature:\n{feature_request}"}
        )
        await self.bus.send_to("tech_lead", msg)

        # 等待完成
        result = await self._wait_for_completion()

        await self.bus.stop()

        return result

    async def _wait_for_completion(self, timeout: float = 600) -> Dict:
        """等待开发完成"""
        import time
        start = time.time()

        while time.time() - start < timeout:
            for msg in reversed(self.bus.message_history):
                if msg.type == MessageType.RESULT and msg.receiver == "user":
                    return msg.content

            await asyncio.sleep(1)

        return {"error": "Timeout"}

小结

本章深入讲解了多 Agent 协作系统:

  1. 架构模式:层级式、协作式、流水线、辩论式
  2. 核心组件:消息总线、共享状态、Agent 基类
  3. 层级系统:Supervisor-Worker 模式
  4. 协作系统:对等协作与共享工作区
  5. 辩论系统:多角度验证与评判
  6. 实战案例:代码开发团队

下一章我们将探讨 LLM 安全与防护,讲解如何构建安全可靠的 LLM 应用。