HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 完整学习路径

    • AI教程 - 从零到一的完整学习路径
    • 第00章:AI基础与发展史
    • 第01章:Python与AI开发环境
    • 第02章:数学基础-线性代数与微积分
    • 03-数据集详解-从获取到预处理
    • 04-从零训练第一个模型
    • 05-模型文件详解
    • 06-分布式训练-多GPU与多机
    • 07-模型调度与资源管理
    • 08-Transformer架构深度解析
    • 09-大语言模型原理与架构
    • 10-Token与Tokenization详解
    • 11-Prompt Engineering完全指南
    • 第12章:模型微调与LoRA技术
    • 第13章:RLHF与对齐技术
    • 第14章 AI编程助手原理与实现
    • 15-RAG系统设计与实现
    • 16-Agent智能体与工具调用
    • 17-多模态大模型
    • 第18章:AI前沿技术趋势
    • 第19章 AI热门话题与应用案例

07-模型调度与资源管理

引言

在生产环境中,AI训练和推理不仅需要高效的算法,还需要完善的资源管理和调度系统。本章将介绍如何构建企业级的AI训练平台和推理服务。

1. 训练任务调度

1.1 GPU资源分配

资源需求评估

# resource_calculator.py - 计算训练资源需求

import math

class ResourceCalculator:
    """计算模型训练所需资源"""

    @staticmethod
    def estimate_memory(model_params, batch_size, seq_length=512, precision='fp32'):
        """
        估算显存需求

        Args:
            model_params: 模型参数量(单位:M)
            batch_size: 批量大小
            seq_length: 序列长度
            precision: 精度(fp32/fp16/int8)

        Returns:
            显存需求(单位:GB)
        """
        # 参数占用
        bytes_per_param = {'fp32': 4, 'fp16': 2, 'int8': 1}[precision]
        param_memory = model_params * 1e6 * bytes_per_param

        # 梯度占用(与参数相同)
        grad_memory = param_memory

        # 优化器状态(Adam需要2倍参数)
        optimizer_memory = param_memory * 2

        # 激活值占用(粗略估计)
        # 假设每层激活值为 batch_size * seq_length * hidden_size
        hidden_size = int(math.sqrt(model_params * 1e6 / 12))  # 粗略估计
        activation_memory = batch_size * seq_length * hidden_size * bytes_per_param * 12

        # 总显存
        total_memory = param_memory + grad_memory + optimizer_memory + activation_memory

        # 转换为GB,并加20%缓冲
        total_memory_gb = total_memory / (1024 ** 3) * 1.2

        return {
            'param_memory_gb': param_memory / (1024 ** 3),
            'grad_memory_gb': grad_memory / (1024 ** 3),
            'optimizer_memory_gb': optimizer_memory / (1024 ** 3),
            'activation_memory_gb': activation_memory / (1024 ** 3),
            'total_memory_gb': total_memory_gb
        }

    @staticmethod
    def estimate_training_time(dataset_size, batch_size, num_gpus, samples_per_sec_per_gpu):
        """
        估算训练时间

        Args:
            dataset_size: 数据集大小
            batch_size: 每个GPU的batch size
            num_gpus: GPU数量
            samples_per_sec_per_gpu: 每个GPU每秒处理的样本数

        Returns:
            训练时间(小时)
        """
        effective_batch_size = batch_size * num_gpus
        iterations_per_epoch = dataset_size / effective_batch_size
        samples_per_sec = samples_per_sec_per_gpu * num_gpus
        seconds_per_epoch = dataset_size / samples_per_sec

        return {
            'iterations_per_epoch': iterations_per_epoch,
            'seconds_per_epoch': seconds_per_epoch,
            'hours_per_epoch': seconds_per_epoch / 3600
        }

    @staticmethod
    def recommend_gpu(model_params, batch_size):
        """
        推荐GPU型号

        Args:
            model_params: 模型参数量(M)
            batch_size: 批量大小

        Returns:
            推荐的GPU配置
        """
        memory_req = ResourceCalculator.estimate_memory(
            model_params, batch_size, precision='fp16'
        )['total_memory_gb']

        gpu_specs = {
            'V100-16GB': {'memory': 16, 'tflops': 125},
            'V100-32GB': {'memory': 32, 'tflops': 125},
            'A100-40GB': {'memory': 40, 'tflops': 312},
            'A100-80GB': {'memory': 80, 'tflops': 312},
            'H100-80GB': {'memory': 80, 'tflops': 1000},
        }

        recommendations = []
        for gpu_name, spec in gpu_specs.items():
            if spec['memory'] >= memory_req:
                recommendations.append({
                    'gpu': gpu_name,
                    'memory': spec['memory'],
                    'memory_util': f"{memory_req / spec['memory'] * 100:.1f}%",
                    'tflops': spec['tflops']
                })

        return recommendations

# 使用示例
calculator = ResourceCalculator()

# GPT-2 Medium (345M参数)
result = calculator.estimate_memory(
    model_params=345,
    batch_size=16,
    seq_length=1024,
    precision='fp16'
)

print("GPT-2 Medium (345M) 显存需求:")
for key, value in result.items():
    print(f"  {key}: {value:.2f} GB")

# 推荐GPU
gpus = calculator.recommend_gpu(model_params=345, batch_size=16)
print("\n推荐GPU:")
for gpu in gpus:
    print(f"  {gpu['gpu']}: {gpu['memory']}GB (利用率 {gpu['memory_util']})")

GPU分配策略

# gpu_allocator.py - GPU分配器

import subprocess
import re
from typing import List, Dict, Optional

class GPUAllocator:
    """GPU资源分配器"""

    def __init__(self):
        self.gpu_info = self._get_gpu_info()

    def _get_gpu_info(self) -> List[Dict]:
        """获取GPU信息"""
        try:
            result = subprocess.run(
                ['nvidia-smi', '--query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu',
                 '--format=csv,noheader,nounits'],
                capture_output=True,
                text=True
            )

            gpus = []
            for line in result.stdout.strip().split('\n'):
                if not line:
                    continue

                parts = [p.strip() for p in line.split(',')]
                gpus.append({
                    'index': int(parts[0]),
                    'name': parts[1],
                    'memory_total': int(parts[2]),
                    'memory_used': int(parts[3]),
                    'memory_free': int(parts[4]),
                    'utilization': int(parts[5])
                })

            return gpus

        except Exception as e:
            print(f"Error getting GPU info: {e}")
            return []

    def find_free_gpus(self, num_gpus: int, min_memory: int = 0,
                       max_utilization: int = 10) -> Optional[List[int]]:
        """
        查找空闲GPU

        Args:
            num_gpus: 需要的GPU数量
            min_memory: 最小空闲显存(MB)
            max_utilization: 最大利用率(%)

        Returns:
            GPU索引列表,如果找不到则返回None
        """
        # 过滤满足条件的GPU
        available_gpus = [
            gpu for gpu in self.gpu_info
            if gpu['memory_free'] >= min_memory and gpu['utilization'] <= max_utilization
        ]

        # 按空闲显存排序
        available_gpus.sort(key=lambda x: x['memory_free'], reverse=True)

        if len(available_gpus) >= num_gpus:
            return [gpu['index'] for gpu in available_gpus[:num_gpus]]
        else:
            return None

    def allocate_gpus(self, job_id: str, num_gpus: int, memory_required: int) -> Optional[str]:
        """
        为任务分配GPU

        Args:
            job_id: 任务ID
            num_gpus: 需要的GPU数量
            memory_required: 需要的显存(MB)

        Returns:
            CUDA_VISIBLE_DEVICES环境变量值
        """
        gpu_indices = self.find_free_gpus(num_gpus, min_memory=memory_required)

        if gpu_indices is None:
            print(f"无法为任务 {job_id} 分配 {num_gpus} 个GPU")
            return None

        cuda_visible_devices = ','.join(map(str, gpu_indices))
        print(f"为任务 {job_id} 分配GPU: {cuda_visible_devices}")

        return cuda_visible_devices

    def get_gpu_topology(self) -> str:
        """获取GPU拓扑信息"""
        result = subprocess.run(
            ['nvidia-smi', 'topo', '-m'],
            capture_output=True,
            text=True
        )
        return result.stdout

    def print_status(self):
        """打印GPU状态"""
        print("\nGPU状态:")
        print("-" * 80)
        print(f"{'ID':<4} {'名称':<20} {'显存':<20} {'利用率':<10}")
        print("-" * 80)

        for gpu in self.gpu_info:
            memory_str = f"{gpu['memory_used']}/{gpu['memory_total']} MB"
            util_str = f"{gpu['utilization']}%"

            print(f"{gpu['index']:<4} {gpu['name']:<20} {memory_str:<20} {util_str:<10}")

        print("-" * 80)

# 使用示例
allocator = GPUAllocator()
allocator.print_status()

# 为训练任务分配GPU
cuda_devices = allocator.allocate_gpus(
    job_id='train_job_001',
    num_gpus=4,
    memory_required=20000  # 20GB
)

if cuda_devices:
    print(f"设置环境变量: CUDA_VISIBLE_DEVICES={cuda_devices}")

1.2 任务队列管理

# job_queue.py - 训练任务队列管理

import sqlite3
import json
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict
import threading
import time

class JobStatus(Enum):
    """任务状态"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class TrainingJob:
    """训练任务"""
    def __init__(self, job_id: str, name: str, command: str,
                 num_gpus: int, memory_required: int, priority: int = 0):
        self.job_id = job_id
        self.name = name
        self.command = command
        self.num_gpus = num_gpus
        self.memory_required = memory_required
        self.priority = priority
        self.status = JobStatus.PENDING
        self.created_at = datetime.now()
        self.started_at = None
        self.completed_at = None
        self.gpu_ids = None

    def to_dict(self):
        return {
            'job_id': self.job_id,
            'name': self.name,
            'command': self.command,
            'num_gpus': self.num_gpus,
            'memory_required': self.memory_required,
            'priority': self.priority,
            'status': self.status.value,
            'created_at': self.created_at.isoformat(),
            'started_at': self.started_at.isoformat() if self.started_at else None,
            'completed_at': self.completed_at.isoformat() if self.completed_at else None,
            'gpu_ids': self.gpu_ids
        }

class JobQueue:
    """任务队列"""
    def __init__(self, db_path: str = 'jobs.db'):
        self.db_path = db_path
        self._init_db()
        self.lock = threading.Lock()

    def _init_db(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS jobs (
                job_id TEXT PRIMARY KEY,
                name TEXT,
                command TEXT,
                num_gpus INTEGER,
                memory_required INTEGER,
                priority INTEGER,
                status TEXT,
                created_at TEXT,
                started_at TEXT,
                completed_at TEXT,
                gpu_ids TEXT,
                error_message TEXT
            )
        ''')

        conn.commit()
        conn.close()

    def submit_job(self, job: TrainingJob) -> str:
        """提交任务"""
        with self.lock:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            cursor.execute('''
                INSERT INTO jobs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                job.job_id,
                job.name,
                job.command,
                job.num_gpus,
                job.memory_required,
                job.priority,
                job.status.value,
                job.created_at.isoformat(),
                None,
                None,
                None,
                None
            ))

            conn.commit()
            conn.close()

            print(f"任务已提交: {job.job_id}")
            return job.job_id

    def get_pending_jobs(self) -> List[TrainingJob]:
        """获取待处理任务(按优先级排序)"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            SELECT * FROM jobs
            WHERE status = ?
            ORDER BY priority DESC, created_at ASC
        ''', (JobStatus.PENDING.value,))

        jobs = []
        for row in cursor.fetchall():
            job = TrainingJob(
                job_id=row[0],
                name=row[1],
                command=row[2],
                num_gpus=row[3],
                memory_required=row[4],
                priority=row[5]
            )
            job.status = JobStatus(row[6])
            job.created_at = datetime.fromisoformat(row[7])
            jobs.append(job)

        conn.close()
        return jobs

    def update_job_status(self, job_id: str, status: JobStatus,
                         gpu_ids: Optional[str] = None,
                         error_message: Optional[str] = None):
        """更新任务状态"""
        with self.lock:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            now = datetime.now().isoformat()

            if status == JobStatus.RUNNING:
                cursor.execute('''
                    UPDATE jobs
                    SET status = ?, started_at = ?, gpu_ids = ?
                    WHERE job_id = ?
                ''', (status.value, now, gpu_ids, job_id))

            elif status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
                cursor.execute('''
                    UPDATE jobs
                    SET status = ?, completed_at = ?, error_message = ?
                    WHERE job_id = ?
                ''', (status.value, now, error_message, job_id))

            else:
                cursor.execute('''
                    UPDATE jobs
                    SET status = ?
                    WHERE job_id = ?
                ''', (status.value, job_id))

            conn.commit()
            conn.close()

    def get_job(self, job_id: str) -> Optional[TrainingJob]:
        """获取任务信息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('SELECT * FROM jobs WHERE job_id = ?', (job_id,))
        row = cursor.fetchone()

        if row:
            job = TrainingJob(
                job_id=row[0],
                name=row[1],
                command=row[2],
                num_gpus=row[3],
                memory_required=row[4],
                priority=row[5]
            )
            job.status = JobStatus(row[6])
            job.created_at = datetime.fromisoformat(row[7])
            job.started_at = datetime.fromisoformat(row[8]) if row[8] else None
            job.completed_at = datetime.fromisoformat(row[9]) if row[9] else None
            job.gpu_ids = row[10]

            conn.close()
            return job

        conn.close()
        return None

    def list_jobs(self, status: Optional[JobStatus] = None) -> List[Dict]:
        """列出所有任务"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        if status:
            cursor.execute('SELECT * FROM jobs WHERE status = ? ORDER BY created_at DESC',
                         (status.value,))
        else:
            cursor.execute('SELECT * FROM jobs ORDER BY created_at DESC')

        jobs = []
        for row in cursor.fetchall():
            jobs.append({
                'job_id': row[0],
                'name': row[1],
                'num_gpus': row[3],
                'priority': row[5],
                'status': row[6],
                'created_at': row[7],
                'started_at': row[8],
                'gpu_ids': row[10]
            })

        conn.close()
        return jobs

    def cancel_job(self, job_id: str):
        """取消任务"""
        job = self.get_job(job_id)
        if job and job.status == JobStatus.PENDING:
            self.update_job_status(job_id, JobStatus.CANCELLED)
            print(f"任务已取消: {job_id}")
            return True
        return False

# 使用示例
queue = JobQueue()

# 提交任务
job1 = TrainingJob(
    job_id='job_001',
    name='ResNet50训练',
    command='python train.py --model resnet50',
    num_gpus=4,
    memory_required=20000,
    priority=10
)

job2 = TrainingJob(
    job_id='job_002',
    name='GPT-2训练',
    command='python train_gpt.py',
    num_gpus=8,
    memory_required=40000,
    priority=5
)

queue.submit_job(job1)
queue.submit_job(job2)

# 查看待处理任务
pending = queue.get_pending_jobs()
for job in pending:
    print(f"{job.job_id}: {job.name} (优先级: {job.priority})")

1.3 优先级调度

# scheduler.py - 任务调度器

import subprocess
import threading
import time
from typing import Optional

class JobScheduler:
    """任务调度器"""
    def __init__(self, gpu_allocator: GPUAllocator, job_queue: JobQueue):
        self.gpu_allocator = gpu_allocator
        self.job_queue = job_queue
        self.running = False
        self.thread = None
        self.running_jobs = {}  # job_id -> process

    def start(self):
        """启动调度器"""
        if self.running:
            print("调度器已经在运行")
            return

        self.running = True
        self.thread = threading.Thread(target=self._schedule_loop, daemon=True)
        self.thread.start()
        print("调度器已启动")

    def stop(self):
        """停止调度器"""
        self.running = False
        if self.thread:
            self.thread.join()
        print("调度器已停止")

    def _schedule_loop(self):
        """调度循环"""
        while self.running:
            try:
                # 检查运行中的任务
                self._check_running_jobs()

                # 尝试调度新任务
                self._schedule_pending_jobs()

                # 等待一段时间
                time.sleep(5)

            except Exception as e:
                print(f"调度器错误: {e}")

    def _check_running_jobs(self):
        """检查运行中的任务状态"""
        completed_jobs = []

        for job_id, process in self.running_jobs.items():
            # 检查进程是否结束
            returncode = process.poll()

            if returncode is not None:
                # 任务完成
                if returncode == 0:
                    self.job_queue.update_job_status(job_id, JobStatus.COMPLETED)
                    print(f"任务完成: {job_id}")
                else:
                    self.job_queue.update_job_status(
                        job_id,
                        JobStatus.FAILED,
                        error_message=f"Exit code: {returncode}"
                    )
                    print(f"任务失败: {job_id} (exit code: {returncode})")

                completed_jobs.append(job_id)

        # 移除完成的任务
        for job_id in completed_jobs:
            del self.running_jobs[job_id]

    def _schedule_pending_jobs(self):
        """调度待处理任务"""
        pending_jobs = self.job_queue.get_pending_jobs()

        for job in pending_jobs:
            # 尝试分配GPU
            gpu_ids = self.gpu_allocator.find_free_gpus(
                num_gpus=job.num_gpus,
                min_memory=job.memory_required
            )

            if gpu_ids is None:
                # 没有足够的GPU资源
                continue

            # 启动任务
            success = self._launch_job(job, gpu_ids)

            if success:
                cuda_devices = ','.join(map(str, gpu_ids))
                self.job_queue.update_job_status(
                    job.job_id,
                    JobStatus.RUNNING,
                    gpu_ids=cuda_devices
                )
                print(f"任务启动: {job.job_id} on GPU {cuda_devices}")

    def _launch_job(self, job: TrainingJob, gpu_ids: List[int]) -> bool:
        """启动任务"""
        try:
            # 设置环境变量
            env = os.environ.copy()
            env['CUDA_VISIBLE_DEVICES'] = ','.join(map(str, gpu_ids))

            # 创建日志目录
            log_dir = f'logs/{job.job_id}'
            os.makedirs(log_dir, exist_ok=True)

            # 启动进程
            log_file = open(f'{log_dir}/output.log', 'w')

            process = subprocess.Popen(
                job.command,
                shell=True,
                env=env,
                stdout=log_file,
                stderr=subprocess.STDOUT
            )

            self.running_jobs[job.job_id] = process

            return True

        except Exception as e:
            print(f"启动任务失败 {job.job_id}: {e}")
            return False

# 使用示例
allocator = GPUAllocator()
queue = JobQueue()
scheduler = JobScheduler(allocator, queue)

# 启动调度器
scheduler.start()

# 提交多个任务
jobs = [
    TrainingJob(f'job_{i}', f'Training {i}', f'python train.py --id {i}',
                num_gpus=2, memory_required=10000, priority=i)
    for i in range(10)
]

for job in jobs:
    queue.submit_job(job)

# 调度器会自动调度这些任务
# 让程序运行一段时间
time.sleep(60)

# 停止调度器
scheduler.stop()

2. Kubernetes + GPU

2.1 GPU Operator安装

# gpu-operator-values.yaml
# NVIDIA GPU Operator配置

operator:
  defaultRuntime: containerd

driver:
  enabled: true
  version: "525.105.17"

toolkit:
  enabled: true
  version: v1.13.5

devicePlugin:
  enabled: true
  version: v0.14.0

dcgm:
  enabled: true

dcgmExporter:
  enabled: true
  serviceMonitor:
    enabled: true

gfd:
  enabled: true

migManager:
  enabled: true

nodeStatusExporter:
  enabled: true

validator:
  plugin:
    env:
    - name: WITH_WORKLOAD
      value: "true"
# 安装GPU Operator
helm repo add nvidia https://nvidia.github.io/gpu-operator
helm repo update

helm install gpu-operator nvidia/gpu-operator \
  --namespace gpu-operator \
  --create-namespace \
  --values gpu-operator-values.yaml

# 验证安装
kubectl get pods -n gpu-operator

# 检查节点GPU标签
kubectl get nodes -o json | jq '.items[].metadata.labels' | grep nvidia

2.2 NVIDIA Device Plugin

# nvidia-device-plugin-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: nvidia-device-plugin-daemonset
  namespace: kube-system
spec:
  selector:
    matchLabels:
      name: nvidia-device-plugin-ds
  updateStrategy:
    type: RollingUpdate
  template:
    metadata:
      labels:
        name: nvidia-device-plugin-ds
    spec:
      tolerations:
      - key: nvidia.com/gpu
        operator: Exists
        effect: NoSchedule
      priorityClassName: "system-node-critical"
      containers:
      - image: nvcr.io/nvidia/k8s-device-plugin:v0.14.0
        name: nvidia-device-plugin-ctr
        args:
        - "--fail-on-init-error=false"
        - "--pass-device-specs=true"
        - "--device-list-strategy=envvar"
        - "--nvidia-driver-root=/"
        securityContext:
          allowPrivilegeEscalation: false
          capabilities:
            drop: ["ALL"]
        volumeMounts:
        - name: device-plugin
          mountPath: /var/lib/kubelet/device-plugins
      volumes:
      - name: device-plugin
        hostPath:
          path: /var/lib/kubelet/device-plugins

2.3 GPU资源限制

# gpu-training-job.yaml
# GPU训练任务配置

apiVersion: batch/v1
kind: Job
metadata:
  name: resnet50-training
spec:
  template:
    metadata:
      labels:
        app: resnet50-training
    spec:
      restartPolicy: OnFailure
      containers:
      - name: pytorch
        image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
        command:
        - python
        - train.py
        args:
        - --batch-size=128
        - --epochs=90
        resources:
          limits:
            nvidia.com/gpu: 4  # 请求4个GPU
            memory: "64Gi"
            cpu: "16"
          requests:
            nvidia.com/gpu: 4
            memory: "32Gi"
            cpu: "8"
        env:
        - name: NVIDIA_VISIBLE_DEVICES
          value: "all"
        - name: NVIDIA_DRIVER_CAPABILITIES
          value: "compute,utility"
        - name: NCCL_DEBUG
          value: "INFO"
        volumeMounts:
        - name: dataset
          mountPath: /data
        - name: output
          mountPath: /output
        - name: shm
          mountPath: /dev/shm
      volumes:
      - name: dataset
        persistentVolumeClaim:
          claimName: imagenet-pvc
      - name: output
        persistentVolumeClaim:
          claimName: training-output-pvc
      - name: shm
        emptyDir:
          medium: Memory
          sizeLimit: "32Gi"
      nodeSelector:
        nvidia.com/gpu.product: "NVIDIA-A100-SXM4-40GB"

2.4 GPU共享和隔离

# gpu-sharing-config.yaml
# GPU共享配置(使用NVIDIA MPS)

apiVersion: v1
kind: ConfigMap
metadata:
  name: nvidia-mps-config
  namespace: kube-system
data:
  mps.conf: |
    # MPS配置
    CUDA_VISIBLE_DEVICES=0
    CUDA_MPS_PIPE_DIRECTORY=/tmp/nvidia-mps
    CUDA_MPS_LOG_DIRECTORY=/var/log/nvidia-mps

---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: nvidia-mps
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: nvidia-mps
  template:
    metadata:
      labels:
        app: nvidia-mps
    spec:
      hostPID: true
      containers:
      - name: mps
        image: nvidia/cuda:11.7.1-base-ubuntu20.04
        command:
        - /bin/bash
        - -c
        - |
          # 启动MPS
          nvidia-cuda-mps-control -d

          # 设置MPS配置
          echo "set_default_active_thread_percentage 50" | nvidia-cuda-mps-control

          # 保持运行
          tail -f /dev/null
        securityContext:
          privileged: true
        resources:
          limits:
            nvidia.com/gpu: 1
        volumeMounts:
        - name: mps-pipe
          mountPath: /tmp/nvidia-mps
        - name: mps-log
          mountPath: /var/log/nvidia-mps
      volumes:
      - name: mps-pipe
        hostPath:
          path: /tmp/nvidia-mps
      - name: mps-log
        hostPath:
          path: /var/log/nvidia-mps
# shared-gpu-pod.yaml
# 使用共享GPU的Pod

apiVersion: v1
kind: Pod
metadata:
  name: inference-service-1
spec:
  containers:
  - name: inference
    image: inference:latest
    resources:
      limits:
        nvidia.com/gpu: 1
    env:
    - name: CUDA_MPS_PIPE_DIRECTORY
      value: /tmp/nvidia-mps
    - name: CUDA_MPS_LOG_DIRECTORY
      value: /var/log/nvidia-mps
    volumeMounts:
    - name: mps-pipe
      mountPath: /tmp/nvidia-mps
  volumes:
  - name: mps-pipe
    hostPath:
      path: /tmp/nvidia-mps

3. 训练平台

3.1 Kubeflow架构

# kubeflow-installation.sh
#!/bin/bash

# 安装Kubeflow

# 1. 安装kustomize
curl -s "https://raw.githubusercontent.com/kubernetes-sigs/kustomize/master/hack/install_kustomize.sh" | bash

# 2. 克隆Kubeflow manifests
git clone https://github.com/kubeflow/manifests.git
cd manifests

# 3. 安装Kubeflow组件
while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done

# 4. 等待所有Pod就绪
kubectl wait --for=condition=Ready pods --all -n kubeflow --timeout=600s

# 5. 获取访问地址
kubectl port-forward -n istio-system svc/istio-ingressgateway 8080:80
# pytorch-training-job.yaml
# Kubeflow PyTorch训练任务

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-dist-training
  namespace: kubeflow
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          containers:
          - name: pytorch
            image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
            command:
            - python
            - -m
            - torch.distributed.run
            - --nproc_per_node=8
            - train.py
            args:
            - --batch-size=32
            - --epochs=90
            resources:
              limits:
                nvidia.com/gpu: 8
                memory: "128Gi"
            volumeMounts:
            - name: data
              mountPath: /data
            - name: shm
              mountPath: /dev/shm
          volumes:
          - name: data
            persistentVolumeClaim:
              claimName: training-data
          - name: shm
            emptyDir:
              medium: Memory
              sizeLimit: "64Gi"

    Worker:
      replicas: 3
      restartPolicy: OnFailure
      template:
        spec:
          containers:
          - name: pytorch
            image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
            command:
            - python
            - -m
            - torch.distributed.run
            - --nproc_per_node=8
            - train.py
            args:
            - --batch-size=32
            - --epochs=90
            resources:
              limits:
                nvidia.com/gpu: 8
                memory: "128Gi"
            volumeMounts:
            - name: data
              mountPath: /data
            - name: shm
              mountPath: /dev/shm
          volumes:
          - name: data
            persistentVolumeClaim:
              claimName: training-data
          - name: shm
            emptyDir:
              medium: Memory
              sizeLimit: "64Gi"

3.2 Ray分布式训练

# ray_training.py - 使用Ray进行分布式训练

import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, DistributedSampler

def train_func(config):
    """训练函数(在每个worker上运行)"""
    # 获取分布式信息
    rank = train.get_context().get_world_rank()
    world_size = train.get_context().get_world_size()
    local_rank = train.get_context().get_local_rank()

    # 设置设备
    device = torch.device(f"cuda:{local_rank}")

    # 创建模型
    model = create_model()
    model = model.to(device)

    # 包装为DDP
    model = torch.nn.parallel.DistributedDataParallel(
        model,
        device_ids=[local_rank]
    )

    # 创建数据加载器
    dataset = create_dataset()
    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank
    )
    dataloader = DataLoader(
        dataset,
        batch_size=config['batch_size'],
        sampler=sampler
    )

    # 优化器
    optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
    criterion = nn.CrossEntropyLoss()

    # 训练循环
    for epoch in range(config['epochs']):
        sampler.set_epoch(epoch)
        model.train()

        total_loss = 0
        for batch_idx, (data, target) in enumerate(dataloader):
            data = data.to(device)
            target = target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)

        # 向Ray报告指标
        train.report({"loss": avg_loss, "epoch": epoch})

        if rank == 0:
            print(f"Epoch {epoch}: Loss = {avg_loss:.4f}")

# 配置Ray集群
ray.init(address="auto")  # 连接到Ray集群

# 创建训练器
trainer = TorchTrainer(
    train_func,
    train_loop_config={
        "batch_size": 32,
        "lr": 0.001,
        "epochs": 10
    },
    scaling_config=ScalingConfig(
        num_workers=4,          # 4个worker
        use_gpu=True,
        resources_per_worker={"GPU": 2}  # 每个worker 2个GPU
    )
)

# 运行训练
result = trainer.fit()

print(f"Training completed: {result.metrics}")
# ray-cluster.yaml
# Ray集群配置

apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
  name: ray-cluster
spec:
  rayVersion: '2.5.0'
  headGroupSpec:
    serviceType: ClusterIP
    rayStartParams:
      dashboard-host: '0.0.0.0'
      num-cpus: '0'
    template:
      spec:
        containers:
        - name: ray-head
          image: rayproject/ray:2.5.0-gpu
          resources:
            limits:
              cpu: "4"
              memory: "16Gi"
            requests:
              cpu: "2"
              memory: "8Gi"
          ports:
          - containerPort: 6379
            name: gcs
          - containerPort: 8265
            name: dashboard
          - containerPort: 10001
            name: client

  workerGroupSpecs:
  - replicas: 4
    minReplicas: 1
    maxReplicas: 10
    groupName: gpu-workers
    rayStartParams: {}
    template:
      spec:
        containers:
        - name: ray-worker
          image: rayproject/ray:2.5.0-gpu
          resources:
            limits:
              nvidia.com/gpu: 4
              cpu: "16"
              memory: "64Gi"
            requests:
              nvidia.com/gpu: 4
              cpu: "8"
              memory: "32Gi"

3.3 MLOps流程

# mlops_pipeline.py - MLOps训练流水线

from kfp import dsl, compiler
from kfp.dsl import InputPath, OutputPath

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['boto3', 'pandas']
)
def download_data(
    s3_path: str,
    output_path: OutputPath('Dataset')
):
    """下载数据集"""
    import boto3
    import os

    s3 = boto3.client('s3')

    # 解析S3路径
    bucket, key = s3_path.replace('s3://', '').split('/', 1)

    # 下载文件
    os.makedirs(output_path, exist_ok=True)
    s3.download_file(bucket, key, f'{output_path}/data.tar.gz')

    print(f"数据已下载到: {output_path}")

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['pandas', 'scikit-learn']
)
def preprocess_data(
    input_path: InputPath('Dataset'),
    output_path: OutputPath('Dataset')
):
    """预处理数据"""
    import pandas as pd
    import os

    # 读取数据
    df = pd.read_csv(f'{input_path}/data.csv')

    # 数据清洗
    df = df.dropna()

    # 特征工程
    # ...

    # 保存处理后的数据
    os.makedirs(output_path, exist_ok=True)
    df.to_csv(f'{output_path}/processed.csv', index=False)

    print(f"数据预处理完成,样本数: {len(df)}")

@dsl.component(
    base_image='pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime'
)
def train_model(
    data_path: InputPath('Dataset'),
    model_path: OutputPath('Model'),
    epochs: int,
    batch_size: int,
    learning_rate: float
):
    """训练模型"""
    import torch
    import torch.nn as nn

    # 加载数据
    # ...

    # 创建模型
    model = create_model()

    # 训练
    # ...

    # 保存模型
    torch.save(model.state_dict(), f'{model_path}/model.pt')

    print("模型训练完成")

@dsl.component(
    base_image='pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime'
)
def evaluate_model(
    model_path: InputPath('Model'),
    test_data_path: InputPath('Dataset'),
    metrics_path: OutputPath('Metrics')
):
    """评估模型"""
    import torch
    import json

    # 加载模型
    model = create_model()
    model.load_state_dict(torch.load(f'{model_path}/model.pt'))

    # 评估
    accuracy = 0.95  # 示例
    f1_score = 0.93

    # 保存指标
    metrics = {
        'accuracy': accuracy,
        'f1_score': f1_score
    }

    with open(f'{metrics_path}/metrics.json', 'w') as f:
        json.dump(metrics, f)

    print(f"模型评估完成: Accuracy={accuracy:.4f}")

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['boto3']
)
def deploy_model(
    model_path: InputPath('Model'),
    metrics_path: InputPath('Metrics'),
    deployment_target: str
):
    """部署模型"""
    import json

    # 读取指标
    with open(f'{metrics_path}/metrics.json', 'r') as f:
        metrics = json.load(f)

    # 检查指标是否满足部署条件
    if metrics['accuracy'] < 0.90:
        print("模型精度不足,不进行部署")
        return

    # 部署模型
    print(f"正在部署模型到: {deployment_target}")
    # ...

    print("模型部署完成")

@dsl.pipeline(
    name='Training Pipeline',
    description='Complete ML training pipeline'
)
def training_pipeline(
    s3_data_path: str = 's3://bucket/data.tar.gz',
    epochs: int = 10,
    batch_size: int = 32,
    learning_rate: float = 0.001
):
    """训练流水线"""
    # 下载数据
    download_task = download_data(s3_path=s3_data_path)

    # 预处理
    preprocess_task = preprocess_data(
        input_path=download_task.outputs['output_path']
    )

    # 训练
    train_task = train_model(
        data_path=preprocess_task.outputs['output_path'],
        epochs=epochs,
        batch_size=batch_size,
        learning_rate=learning_rate
    )

    # GPU节点选择器
    train_task.set_gpu_limit(4)
    train_task.add_node_selector_constraint('nvidia.com/gpu.product', 'A100')

    # 评估
    eval_task = evaluate_model(
        model_path=train_task.outputs['model_path'],
        test_data_path=preprocess_task.outputs['output_path']
    )

    # 部署
    deploy_task = deploy_model(
        model_path=train_task.outputs['model_path'],
        metrics_path=eval_task.outputs['metrics_path'],
        deployment_target='production'
    )

# 编译流水线
compiler.Compiler().compile(
    pipeline_func=training_pipeline,
    package_path='training_pipeline.yaml'
)

4. 模型推理调度

4.1 vLLM动态批处理

# vllm_server.py - vLLM推理服务

from vllm import LLM, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from fastapi import FastAPI, Request
from typing import List
import asyncio

app = FastAPI()

# 创建vLLM引擎
engine_args = AsyncEngineArgs(
    model="meta-llama/Llama-2-7b-hf",
    tensor_parallel_size=4,  # 4卡模型并行
    dtype="float16",
    max_num_seqs=256,        # 最大并发序列数
    max_num_batched_tokens=8192,  # 最大批处理token数
    gpu_memory_utilization=0.9,
    swap_space=4,
)

engine = AsyncLLMEngine.from_engine_args(engine_args)

@app.post("/generate")
async def generate(request: Request):
    """生成接口"""
    data = await request.json()

    prompt = data.get("prompt", "")
    max_tokens = data.get("max_tokens", 100)
    temperature = data.get("temperature", 0.7)

    # 采样参数
    sampling_params = SamplingParams(
        temperature=temperature,
        top_p=0.9,
        max_tokens=max_tokens
    )

    # 添加到请求队列
    request_id = f"req_{asyncio.current_task().get_name()}"
    results_generator = engine.generate(prompt, sampling_params, request_id)

    # 收集结果
    final_output = None
    async for request_output in results_generator:
        final_output = request_output

    # 提取生成的文本
    generated_text = final_output.outputs[0].text

    return {
        "generated_text": generated_text,
        "request_id": request_id
    }

@app.post("/batch_generate")
async def batch_generate(request: Request):
    """批量生成接口"""
    data = await request.json()

    prompts = data.get("prompts", [])
    max_tokens = data.get("max_tokens", 100)

    sampling_params = SamplingParams(
        temperature=0.7,
        max_tokens=max_tokens
    )

    # 批量请求
    tasks = []
    for i, prompt in enumerate(prompts):
        request_id = f"batch_req_{i}"
        task = engine.generate(prompt, sampling_params, request_id)
        tasks.append(task)

    # 并发处理
    results = []
    for task in tasks:
        final_output = None
        async for request_output in task:
            final_output = request_output
        results.append(final_output.outputs[0].text)

    return {"results": results}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

4.2 TensorRT-LLM推理

# tensorrt_llm_engine.py - TensorRT-LLM推理引擎

import tensorrt_llm
import torch
from tensorrt_llm.runtime import ModelRunner
from tensorrt_llm.builder import Builder
from pathlib import Path

class TRTLLMEngine:
    """TensorRT-LLM推理引擎"""

    def __init__(self, engine_dir: str, tokenizer_dir: str):
        self.engine_dir = Path(engine_dir)
        self.runner = ModelRunner.from_dir(
            engine_dir=str(self.engine_dir),
            rank=torch.cuda.current_device()
        )

        # 加载tokenizer
        from transformers import AutoTokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir)

    def generate(
        self,
        prompts: List[str],
        max_new_tokens: int = 100,
        temperature: float = 1.0,
        top_k: int = 50,
        top_p: float = 0.9
    ) -> List[str]:
        """
        生成文本

        Args:
            prompts: 输入提示列表
            max_new_tokens: 最大生成token数
            temperature: 温度参数
            top_k: Top-K采样
            top_p: Top-P采样

        Returns:
            生成的文本列表
        """
        # Tokenize输入
        batch_input_ids = []
        for prompt in prompts:
            input_ids = self.tokenizer.encode(prompt, return_tensors="pt")
            batch_input_ids.append(input_ids[0].tolist())

        # 推理
        outputs = self.runner.generate(
            batch_input_ids=batch_input_ids,
            max_new_tokens=max_new_tokens,
            temperature=temperature,
            top_k=top_k,
            top_p=top_p,
            end_id=self.tokenizer.eos_token_id,
            pad_id=self.tokenizer.pad_token_id
        )

        # Decode输出
        output_texts = []
        for output_ids in outputs:
            output_text = self.tokenizer.decode(
                output_ids[0],
                skip_special_tokens=True
            )
            output_texts.append(output_text)

        return output_texts

# 使用示例
engine = TRTLLMEngine(
    engine_dir="/path/to/trt/engine",
    tokenizer_dir="meta-llama/Llama-2-7b-hf"
)

prompts = [
    "What is machine learning?",
    "Explain neural networks.",
]

outputs = engine.generate(prompts, max_new_tokens=100)

for prompt, output in zip(prompts, outputs):
    print(f"Input: {prompt}")
    print(f"Output: {output}\n")

4.3 模型并发和排队

# inference_scheduler.py - 推理请求调度器

import asyncio
from asyncio import Queue, PriorityQueue
from typing import Optional, Dict
from dataclasses import dataclass, field
from enum import Enum
import time

class Priority(Enum):
    """请求优先级"""
    HIGH = 1
    MEDIUM = 2
    LOW = 3

@dataclass(order=True)
class InferenceRequest:
    """推理请求"""
    priority: int = field(compare=True)
    timestamp: float = field(compare=True)
    request_id: str = field(compare=False)
    prompt: str = field(compare=False)
    max_tokens: int = field(compare=False)
    future: asyncio.Future = field(compare=False, default=None)

class InferenceScheduler:
    """推理调度器"""

    def __init__(self, engine, max_batch_size: int = 32, max_wait_ms: int = 50):
        self.engine = engine
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms

        # 请求队列(优先级队列)
        self.request_queue = PriorityQueue()

        # 运行中的批次
        self.running_batches = {}

        # 统计信息
        self.stats = {
            'total_requests': 0,
            'completed_requests': 0,
            'batch_count': 0,
            'avg_batch_size': 0,
        }

        # 启动调度循环
        self.running = True
        asyncio.create_task(self._schedule_loop())

    async def submit_request(
        self,
        request_id: str,
        prompt: str,
        max_tokens: int = 100,
        priority: Priority = Priority.MEDIUM
    ) -> str:
        """
        提交推理请求

        Args:
            request_id: 请求ID
            prompt: 输入提示
            max_tokens: 最大生成token数
            priority: 优先级

        Returns:
            生成的文本
        """
        # 创建future用于等待结果
        future = asyncio.Future()

        # 创建请求
        request = InferenceRequest(
            priority=priority.value,
            timestamp=time.time(),
            request_id=request_id,
            prompt=prompt,
            max_tokens=max_tokens,
            future=future
        )

        # 加入队列
        await self.request_queue.put(request)

        self.stats['total_requests'] += 1

        # 等待结果
        result = await future
        return result

    async def _schedule_loop(self):
        """调度循环"""
        while self.running:
            try:
                # 收集一个批次的请求
                batch = await self._collect_batch()

                if batch:
                    # 处理批次
                    asyncio.create_task(self._process_batch(batch))

            except Exception as e:
                print(f"调度错误: {e}")

    async def _collect_batch(self) -> List[InferenceRequest]:
        """收集一个批次的请求"""
        batch = []
        start_time = time.time()

        while len(batch) < self.max_batch_size:
            # 计算剩余等待时间
            elapsed_ms = (time.time() - start_time) * 1000
            remaining_ms = self.max_wait_ms - elapsed_ms

            if remaining_ms <= 0 and batch:
                # 超时,返回当前批次
                break

            try:
                # 等待新请求
                timeout = remaining_ms / 1000 if remaining_ms > 0 else 0.001
                request = await asyncio.wait_for(
                    self.request_queue.get(),
                    timeout=timeout
                )
                batch.append(request)

            except asyncio.TimeoutError:
                # 超时,返回当前批次
                if batch:
                    break

        return batch

    async def _process_batch(self, batch: List[InferenceRequest]):
        """处理一个批次"""
        try:
            # 提取prompts
            prompts = [req.prompt for req in batch]
            max_tokens = max(req.max_tokens for req in batch)

            # 批量推理
            outputs = await asyncio.to_thread(
                self.engine.generate,
                prompts=prompts,
                max_new_tokens=max_tokens
            )

            # 返回结果
            for request, output in zip(batch, outputs):
                request.future.set_result(output)

            # 更新统计
            self.stats['completed_requests'] += len(batch)
            self.stats['batch_count'] += 1
            self.stats['avg_batch_size'] = (
                self.stats['completed_requests'] / self.stats['batch_count']
            )

        except Exception as e:
            # 批次失败,通知所有请求
            for request in batch:
                request.future.set_exception(e)

    def get_stats(self) -> Dict:
        """获取统计信息"""
        return self.stats.copy()

    async def shutdown(self):
        """关闭调度器"""
        self.running = False

# 使用示例
async def main():
    # 创建引擎(示例)
    engine = TRTLLMEngine(...)

    # 创建调度器
    scheduler = InferenceScheduler(
        engine=engine,
        max_batch_size=32,
        max_wait_ms=50
    )

    # 提交多个请求
    tasks = []
    for i in range(100):
        task = scheduler.submit_request(
            request_id=f"req_{i}",
            prompt=f"Question {i}: What is AI?",
            max_tokens=100,
            priority=Priority.MEDIUM
        )
        tasks.append(task)

    # 等待所有请求完成
    results = await asyncio.gather(*tasks)

    # 查看统计
    stats = scheduler.get_stats()
    print(f"统计信息: {stats}")

    # 关闭
    await scheduler.shutdown()

asyncio.run(main())

4.4 KV Cache管理

# kv_cache_manager.py - KV Cache管理器

import torch
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass

@dataclass
class CacheBlock:
    """Cache块"""
    block_id: int
    ref_count: int
    data: Optional[torch.Tensor]

class KVCacheManager:
    """KV Cache管理器"""

    def __init__(
        self,
        num_blocks: int,
        block_size: int,
        num_layers: int,
        num_heads: int,
        head_dim: int,
        device: str = 'cuda'
    ):
        """
        初始化KV Cache管理器

        Args:
            num_blocks: Cache块数量
            block_size: 每个块的token数
            num_layers: 模型层数
            num_heads: 注意力头数
            head_dim: 每个头的维度
            device: 设备
        """
        self.num_blocks = num_blocks
        self.block_size = block_size
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.head_dim = head_dim
        self.device = device

        # 预分配所有cache块
        self.cache_blocks = self._allocate_cache_blocks()

        # 空闲块队列
        self.free_blocks = list(range(num_blocks))

        # 已分配块 {request_id -> [block_ids]}
        self.allocated_blocks: Dict[str, List[int]] = {}

    def _allocate_cache_blocks(self) -> List[CacheBlock]:
        """预分配所有cache块"""
        blocks = []

        for block_id in range(self.num_blocks):
            # 每个块存储 [2, num_layers, num_heads, block_size, head_dim]
            # 2 表示 K 和 V
            data = torch.zeros(
                2, self.num_layers, self.num_heads, self.block_size, self.head_dim,
                dtype=torch.float16,
                device=self.device
            )

            block = CacheBlock(
                block_id=block_id,
                ref_count=0,
                data=data
            )

            blocks.append(block)

        return blocks

    def allocate(self, request_id: str, num_tokens: int) -> Optional[List[int]]:
        """
        为请求分配cache块

        Args:
            request_id: 请求ID
            num_tokens: 需要的token数

        Returns:
            分配的块ID列表,如果失败返回None
        """
        # 计算需要的块数
        num_blocks_needed = (num_tokens + self.block_size - 1) // self.block_size

        if len(self.free_blocks) < num_blocks_needed:
            # 尝试驱逐
            evicted = self._evict_blocks(num_blocks_needed)
            if not evicted:
                return None

        # 分配块
        allocated = []
        for _ in range(num_blocks_needed):
            block_id = self.free_blocks.pop(0)
            self.cache_blocks[block_id].ref_count = 1
            allocated.append(block_id)

        self.allocated_blocks[request_id] = allocated

        return allocated

    def free(self, request_id: str):
        """释放请求的cache"""
        if request_id not in self.allocated_blocks:
            return

        block_ids = self.allocated_blocks[request_id]

        for block_id in block_ids:
            block = self.cache_blocks[block_id]
            block.ref_count -= 1

            if block.ref_count == 0:
                self.free_blocks.append(block_id)

        del self.allocated_blocks[request_id]

    def _evict_blocks(self, num_blocks: int) -> bool:
        """
        驱逐块(LRU策略)

        Args:
            num_blocks: 需要驱逐的块数

        Returns:
            是否成功
        """
        # 简化版:找到ref_count=0的块
        evictable = [
            block.block_id
            for block in self.cache_blocks
            if block.ref_count == 0 and block.block_id not in self.free_blocks
        ]

        if len(evictable) < num_blocks:
            return False

        # 驱逐块
        for block_id in evictable[:num_blocks]:
            self.free_blocks.append(block_id)

        return True

    def get_cache(self, request_id: str) -> Optional[torch.Tensor]:
        """获取请求的cache数据"""
        if request_id not in self.allocated_blocks:
            return None

        block_ids = self.allocated_blocks[request_id]

        # 拼接所有块的数据
        caches = [self.cache_blocks[bid].data for bid in block_ids]
        return torch.cat(caches, dim=3)  # 在seq_len维度拼接

    def get_stats(self) -> Dict:
        """获取统计信息"""
        total_blocks = self.num_blocks
        used_blocks = total_blocks - len(self.free_blocks)
        utilization = used_blocks / total_blocks

        return {
            'total_blocks': total_blocks,
            'used_blocks': used_blocks,
            'free_blocks': len(self.free_blocks),
            'utilization': f"{utilization*100:.2f}%",
            'active_requests': len(self.allocated_blocks)
        }

# 使用示例
cache_manager = KVCacheManager(
    num_blocks=1000,
    block_size=16,
    num_layers=32,
    num_heads=32,
    head_dim=128,
    device='cuda:0'
)

# 分配cache
request_id = "req_001"
block_ids = cache_manager.allocate(request_id, num_tokens=100)

if block_ids:
    print(f"为请求 {request_id} 分配了 {len(block_ids)} 个块")

    # 获取cache
    cache = cache_manager.get_cache(request_id)
    print(f"Cache形状: {cache.shape}")

    # 释放cache
    cache_manager.free(request_id)

# 查看统计
stats = cache_manager.get_stats()
print(f"Cache统计: {stats}")

5. 资源监控

5.1 GPU利用率监控

# gpu_monitor.py - GPU实时监控

import pynvml
from prometheus_client import start_http_server, Gauge
import time

class GPUMonitor:
    """GPU监控器(Prometheus导出器)"""

    def __init__(self, port: int = 8000):
        # 初始化NVML
        pynvml.nvmlInit()

        self.device_count = pynvml.nvmlDeviceGetCount()

        # Prometheus指标
        self.gpu_utilization = Gauge(
            'gpu_utilization_percent',
            'GPU utilization percentage',
            ['gpu_id', 'gpu_name']
        )

        self.gpu_memory_used = Gauge(
            'gpu_memory_used_bytes',
            'GPU memory used in bytes',
            ['gpu_id', 'gpu_name']
        )

        self.gpu_memory_total = Gauge(
            'gpu_memory_total_bytes',
            'GPU memory total in bytes',
            ['gpu_id', 'gpu_name']
        )

        self.gpu_temperature = Gauge(
            'gpu_temperature_celsius',
            'GPU temperature in Celsius',
            ['gpu_id', 'gpu_name']
        )

        self.gpu_power = Gauge(
            'gpu_power_watts',
            'GPU power consumption in watts',
            ['gpu_id', 'gpu_name']
        )

        # 启动HTTP服务器
        start_http_server(port)
        print(f"Prometheus exporter started on port {port}")

    def collect_metrics(self):
        """收集GPU指标"""
        for i in range(self.device_count):
            handle = pynvml.nvmlDeviceGetHandleByIndex(i)

            # GPU名称
            name = pynvml.nvmlDeviceGetName(handle)
            if isinstance(name, bytes):
                name = name.decode('utf-8')

            # 利用率
            util = pynvml.nvmlDeviceGetUtilizationRates(handle)
            self.gpu_utilization.labels(gpu_id=str(i), gpu_name=name).set(util.gpu)

            # 显存
            mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
            self.gpu_memory_used.labels(gpu_id=str(i), gpu_name=name).set(mem_info.used)
            self.gpu_memory_total.labels(gpu_id=str(i), gpu_name=name).set(mem_info.total)

            # 温度
            temp = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU)
            self.gpu_temperature.labels(gpu_id=str(i), gpu_name=name).set(temp)

            # 功耗
            power = pynvml.nvmlDeviceGetPowerUsage(handle) / 1000.0  # 转换为W
            self.gpu_power.labels(gpu_id=str(i), gpu_name=name).set(power)

    def run(self, interval: int = 5):
        """运行监控"""
        try:
            while True:
                self.collect_metrics()
                time.sleep(interval)
        except KeyboardInterrupt:
            print("\n停止监控")
        finally:
            pynvml.nvmlShutdown()

if __name__ == "__main__":
    monitor = GPUMonitor(port=9100)
    monitor.run(interval=5)
# prometheus-config.yaml
# Prometheus配置

global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'gpu-metrics'
    static_configs:
      - targets: ['localhost:9100']
        labels:
          instance: 'gpu-node-1'

  - job_name: 'dcgm-exporter'
    static_configs:
      - targets: ['localhost:9400']
# grafana-dashboard.json
# Grafana仪表板配置(部分)

{
  "dashboard": {
    "title": "GPU Monitoring",
    "panels": [
      {
        "title": "GPU Utilization",
        "targets": [
          {
            "expr": "gpu_utilization_percent"
          }
        ],
        "type": "graph"
      },
      {
        "title": "GPU Memory Usage",
        "targets": [
          {
            "expr": "gpu_memory_used_bytes / gpu_memory_total_bytes * 100"
          }
        ],
        "type": "graph"
      },
      {
        "title": "GPU Temperature",
        "targets": [
          {
            "expr": "gpu_temperature_celsius"
          }
        ],
        "type": "graph"
      }
    ]
  }
}

5.2 显存使用跟踪

# memory_profiler.py - 显存使用分析

import torch
import functools
from collections import defaultdict
import time

class MemoryProfiler:
    """显存分析器"""

    def __init__(self, device='cuda:0'):
        self.device = torch.device(device)
        self.snapshots = []
        self.peak_memory = 0

    def snapshot(self, tag: str = ""):
        """记录当前显存快照"""
        torch.cuda.synchronize()

        allocated = torch.cuda.memory_allocated(self.device)
        reserved = torch.cuda.memory_reserved(self.device)
        peak = torch.cuda.max_memory_allocated(self.device)

        self.snapshots.append({
            'tag': tag,
            'timestamp': time.time(),
            'allocated_mb': allocated / 1024**2,
            'reserved_mb': reserved / 1024**2,
            'peak_mb': peak / 1024**2
        })

        self.peak_memory = max(self.peak_memory, peak)

    def print_summary(self):
        """打印摘要"""
        if not self.snapshots:
            print("没有快照数据")
            return

        print("\n显存使用摘要:")
        print("-" * 80)
        print(f"{'标签':<20} {'已分配(MB)':<15} {'已预留(MB)':<15} {'峰值(MB)':<15}")
        print("-" * 80)

        for snap in self.snapshots:
            print(f"{snap['tag']:<20} "
                  f"{snap['allocated_mb']:<15.2f} "
                  f"{snap['reserved_mb']:<15.2f} "
                  f"{snap['peak_mb']:<15.2f}")

        print("-" * 80)
        print(f"全局峰值显存: {self.peak_memory / 1024**2:.2f} MB\n")

    def reset(self):
        """重置统计"""
        self.snapshots = []
        self.peak_memory = 0
        torch.cuda.reset_peak_memory_stats(self.device)

def profile_memory(tag: str = ""):
    """装饰器:分析函数显存使用"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            profiler = getattr(wrapper, '_profiler', None)
            if profiler is None:
                profiler = MemoryProfiler()
                wrapper._profiler = profiler

            # 记录前快照
            torch.cuda.reset_peak_memory_stats()
            profiler.snapshot(f"{tag or func.__name__}_start")

            # 执行函数
            result = func(*args, **kwargs)

            # 记录后快照
            profiler.snapshot(f"{tag or func.__name__}_end")

            return result

        return wrapper
    return decorator

# 使用示例
profiler = MemoryProfiler()

@profile_memory("forward")
def forward_pass(model, batch):
    """前向传播"""
    output = model(batch)
    return output

@profile_memory("backward")
def backward_pass(loss):
    """反向传播"""
    loss.backward()

# 训练循环
model = create_large_model()
model.cuda()

profiler.snapshot("初始化后")

for epoch in range(3):
    for batch in dataloader:
        batch = batch.cuda()

        profiler.snapshot(f"epoch{epoch}_batch_start")

        # Forward
        output = forward_pass(model, batch)
        loss = criterion(output, labels)

        # Backward
        backward_pass(loss)

        profiler.snapshot(f"epoch{epoch}_batch_end")

        # 清空缓存
        torch.cuda.empty_cache()

# 打印摘要
profiler.print_summary()

5.3 训练吞吐量分析

# throughput_analyzer.py - 吞吐量分析

import time
import torch
from collections import deque
from typing import Dict, List
import json

class ThroughputAnalyzer:
    """吞吐量分析器"""

    def __init__(self, window_size: int = 100):
        self.window_size = window_size

        # 时间窗口
        self.batch_times = deque(maxlen=window_size)
        self.samples_processed = deque(maxlen=window_size)

        # 累计统计
        self.total_samples = 0
        self.total_batches = 0
        self.total_time = 0

        self.start_time = None

    def start_batch(self):
        """开始一个batch"""
        self.batch_start_time = time.time()

    def end_batch(self, batch_size: int):
        """结束一个batch"""
        if not hasattr(self, 'batch_start_time'):
            return

        elapsed = time.time() - self.batch_start_time

        self.batch_times.append(elapsed)
        self.samples_processed.append(batch_size)

        self.total_samples += batch_size
        self.total_batches += 1
        self.total_time += elapsed

        if self.start_time is None:
            self.start_time = self.batch_start_time

    def get_current_throughput(self) -> Dict:
        """获取当前吞吐量(基于窗口)"""
        if not self.batch_times:
            return {}

        total_samples = sum(self.samples_processed)
        total_time = sum(self.batch_times)

        return {
            'samples_per_sec': total_samples / total_time if total_time > 0 else 0,
            'batches_per_sec': len(self.batch_times) / total_time if total_time > 0 else 0,
            'avg_batch_time_ms': (total_time / len(self.batch_times)) * 1000
        }

    def get_global_stats(self) -> Dict:
        """获取全局统计"""
        if self.start_time is None:
            return {}

        elapsed = time.time() - self.start_time

        return {
            'total_samples': self.total_samples,
            'total_batches': self.total_batches,
            'total_time_sec': elapsed,
            'global_samples_per_sec': self.total_samples / elapsed if elapsed > 0 else 0,
            'global_batches_per_sec': self.total_batches / elapsed if elapsed > 0 else 0
        }

    def print_stats(self):
        """打印统计信息"""
        current = self.get_current_throughput()
        global_stats = self.get_global_stats()

        print(f"\n当前吞吐量(最近{len(self.batch_times)}个batch):")
        print(f"  样本/秒: {current.get('samples_per_sec', 0):.2f}")
        print(f"  Batch/秒: {current.get('batches_per_sec', 0):.2f}")
        print(f"  平均Batch时间: {current.get('avg_batch_time_ms', 0):.2f} ms")

        print(f"\n全局统计:")
        print(f"  总样本数: {global_stats.get('total_samples', 0)}")
        print(f"  总Batch数: {global_stats.get('total_batches', 0)}")
        print(f"  总时间: {global_stats.get('total_time_sec', 0):.2f} 秒")
        print(f"  平均样本/秒: {global_stats.get('global_samples_per_sec', 0):.2f}")

    def save_stats(self, filename: str):
        """保存统计到文件"""
        stats = {
            'current': self.get_current_throughput(),
            'global': self.get_global_stats()
        }

        with open(filename, 'w') as f:
            json.dump(stats, f, indent=2)

# 使用示例
analyzer = ThroughputAnalyzer(window_size=100)

for epoch in range(10):
    for batch_idx, (data, target) in enumerate(dataloader):
        analyzer.start_batch()

        # 训练代码
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        analyzer.end_batch(batch_size=data.size(0))

        # 每100个batch打印一次
        if batch_idx % 100 == 0:
            analyzer.print_stats()

# 保存最终统计
analyzer.save_stats('training_throughput.json')
Prev
06-分布式训练-多GPU与多机
Next
08-Transformer架构深度解析