07-模型调度与资源管理
引言
在生产环境中,AI训练和推理不仅需要高效的算法,还需要完善的资源管理和调度系统。本章将介绍如何构建企业级的AI训练平台和推理服务。
1. 训练任务调度
1.1 GPU资源分配
资源需求评估
# resource_calculator.py - 计算训练资源需求
import math
class ResourceCalculator:
"""计算模型训练所需资源"""
@staticmethod
def estimate_memory(model_params, batch_size, seq_length=512, precision='fp32'):
"""
估算显存需求
Args:
model_params: 模型参数量(单位:M)
batch_size: 批量大小
seq_length: 序列长度
precision: 精度(fp32/fp16/int8)
Returns:
显存需求(单位:GB)
"""
# 参数占用
bytes_per_param = {'fp32': 4, 'fp16': 2, 'int8': 1}[precision]
param_memory = model_params * 1e6 * bytes_per_param
# 梯度占用(与参数相同)
grad_memory = param_memory
# 优化器状态(Adam需要2倍参数)
optimizer_memory = param_memory * 2
# 激活值占用(粗略估计)
# 假设每层激活值为 batch_size * seq_length * hidden_size
hidden_size = int(math.sqrt(model_params * 1e6 / 12)) # 粗略估计
activation_memory = batch_size * seq_length * hidden_size * bytes_per_param * 12
# 总显存
total_memory = param_memory + grad_memory + optimizer_memory + activation_memory
# 转换为GB,并加20%缓冲
total_memory_gb = total_memory / (1024 ** 3) * 1.2
return {
'param_memory_gb': param_memory / (1024 ** 3),
'grad_memory_gb': grad_memory / (1024 ** 3),
'optimizer_memory_gb': optimizer_memory / (1024 ** 3),
'activation_memory_gb': activation_memory / (1024 ** 3),
'total_memory_gb': total_memory_gb
}
@staticmethod
def estimate_training_time(dataset_size, batch_size, num_gpus, samples_per_sec_per_gpu):
"""
估算训练时间
Args:
dataset_size: 数据集大小
batch_size: 每个GPU的batch size
num_gpus: GPU数量
samples_per_sec_per_gpu: 每个GPU每秒处理的样本数
Returns:
训练时间(小时)
"""
effective_batch_size = batch_size * num_gpus
iterations_per_epoch = dataset_size / effective_batch_size
samples_per_sec = samples_per_sec_per_gpu * num_gpus
seconds_per_epoch = dataset_size / samples_per_sec
return {
'iterations_per_epoch': iterations_per_epoch,
'seconds_per_epoch': seconds_per_epoch,
'hours_per_epoch': seconds_per_epoch / 3600
}
@staticmethod
def recommend_gpu(model_params, batch_size):
"""
推荐GPU型号
Args:
model_params: 模型参数量(M)
batch_size: 批量大小
Returns:
推荐的GPU配置
"""
memory_req = ResourceCalculator.estimate_memory(
model_params, batch_size, precision='fp16'
)['total_memory_gb']
gpu_specs = {
'V100-16GB': {'memory': 16, 'tflops': 125},
'V100-32GB': {'memory': 32, 'tflops': 125},
'A100-40GB': {'memory': 40, 'tflops': 312},
'A100-80GB': {'memory': 80, 'tflops': 312},
'H100-80GB': {'memory': 80, 'tflops': 1000},
}
recommendations = []
for gpu_name, spec in gpu_specs.items():
if spec['memory'] >= memory_req:
recommendations.append({
'gpu': gpu_name,
'memory': spec['memory'],
'memory_util': f"{memory_req / spec['memory'] * 100:.1f}%",
'tflops': spec['tflops']
})
return recommendations
# 使用示例
calculator = ResourceCalculator()
# GPT-2 Medium (345M参数)
result = calculator.estimate_memory(
model_params=345,
batch_size=16,
seq_length=1024,
precision='fp16'
)
print("GPT-2 Medium (345M) 显存需求:")
for key, value in result.items():
print(f" {key}: {value:.2f} GB")
# 推荐GPU
gpus = calculator.recommend_gpu(model_params=345, batch_size=16)
print("\n推荐GPU:")
for gpu in gpus:
print(f" {gpu['gpu']}: {gpu['memory']}GB (利用率 {gpu['memory_util']})")
GPU分配策略
# gpu_allocator.py - GPU分配器
import subprocess
import re
from typing import List, Dict, Optional
class GPUAllocator:
"""GPU资源分配器"""
def __init__(self):
self.gpu_info = self._get_gpu_info()
def _get_gpu_info(self) -> List[Dict]:
"""获取GPU信息"""
try:
result = subprocess.run(
['nvidia-smi', '--query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu',
'--format=csv,noheader,nounits'],
capture_output=True,
text=True
)
gpus = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = [p.strip() for p in line.split(',')]
gpus.append({
'index': int(parts[0]),
'name': parts[1],
'memory_total': int(parts[2]),
'memory_used': int(parts[3]),
'memory_free': int(parts[4]),
'utilization': int(parts[5])
})
return gpus
except Exception as e:
print(f"Error getting GPU info: {e}")
return []
def find_free_gpus(self, num_gpus: int, min_memory: int = 0,
max_utilization: int = 10) -> Optional[List[int]]:
"""
查找空闲GPU
Args:
num_gpus: 需要的GPU数量
min_memory: 最小空闲显存(MB)
max_utilization: 最大利用率(%)
Returns:
GPU索引列表,如果找不到则返回None
"""
# 过滤满足条件的GPU
available_gpus = [
gpu for gpu in self.gpu_info
if gpu['memory_free'] >= min_memory and gpu['utilization'] <= max_utilization
]
# 按空闲显存排序
available_gpus.sort(key=lambda x: x['memory_free'], reverse=True)
if len(available_gpus) >= num_gpus:
return [gpu['index'] for gpu in available_gpus[:num_gpus]]
else:
return None
def allocate_gpus(self, job_id: str, num_gpus: int, memory_required: int) -> Optional[str]:
"""
为任务分配GPU
Args:
job_id: 任务ID
num_gpus: 需要的GPU数量
memory_required: 需要的显存(MB)
Returns:
CUDA_VISIBLE_DEVICES环境变量值
"""
gpu_indices = self.find_free_gpus(num_gpus, min_memory=memory_required)
if gpu_indices is None:
print(f"无法为任务 {job_id} 分配 {num_gpus} 个GPU")
return None
cuda_visible_devices = ','.join(map(str, gpu_indices))
print(f"为任务 {job_id} 分配GPU: {cuda_visible_devices}")
return cuda_visible_devices
def get_gpu_topology(self) -> str:
"""获取GPU拓扑信息"""
result = subprocess.run(
['nvidia-smi', 'topo', '-m'],
capture_output=True,
text=True
)
return result.stdout
def print_status(self):
"""打印GPU状态"""
print("\nGPU状态:")
print("-" * 80)
print(f"{'ID':<4} {'名称':<20} {'显存':<20} {'利用率':<10}")
print("-" * 80)
for gpu in self.gpu_info:
memory_str = f"{gpu['memory_used']}/{gpu['memory_total']} MB"
util_str = f"{gpu['utilization']}%"
print(f"{gpu['index']:<4} {gpu['name']:<20} {memory_str:<20} {util_str:<10}")
print("-" * 80)
# 使用示例
allocator = GPUAllocator()
allocator.print_status()
# 为训练任务分配GPU
cuda_devices = allocator.allocate_gpus(
job_id='train_job_001',
num_gpus=4,
memory_required=20000 # 20GB
)
if cuda_devices:
print(f"设置环境变量: CUDA_VISIBLE_DEVICES={cuda_devices}")
1.2 任务队列管理
# job_queue.py - 训练任务队列管理
import sqlite3
import json
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict
import threading
import time
class JobStatus(Enum):
"""任务状态"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TrainingJob:
"""训练任务"""
def __init__(self, job_id: str, name: str, command: str,
num_gpus: int, memory_required: int, priority: int = 0):
self.job_id = job_id
self.name = name
self.command = command
self.num_gpus = num_gpus
self.memory_required = memory_required
self.priority = priority
self.status = JobStatus.PENDING
self.created_at = datetime.now()
self.started_at = None
self.completed_at = None
self.gpu_ids = None
def to_dict(self):
return {
'job_id': self.job_id,
'name': self.name,
'command': self.command,
'num_gpus': self.num_gpus,
'memory_required': self.memory_required,
'priority': self.priority,
'status': self.status.value,
'created_at': self.created_at.isoformat(),
'started_at': self.started_at.isoformat() if self.started_at else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
'gpu_ids': self.gpu_ids
}
class JobQueue:
"""任务队列"""
def __init__(self, db_path: str = 'jobs.db'):
self.db_path = db_path
self._init_db()
self.lock = threading.Lock()
def _init_db(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
job_id TEXT PRIMARY KEY,
name TEXT,
command TEXT,
num_gpus INTEGER,
memory_required INTEGER,
priority INTEGER,
status TEXT,
created_at TEXT,
started_at TEXT,
completed_at TEXT,
gpu_ids TEXT,
error_message TEXT
)
''')
conn.commit()
conn.close()
def submit_job(self, job: TrainingJob) -> str:
"""提交任务"""
with self.lock:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO jobs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
job.job_id,
job.name,
job.command,
job.num_gpus,
job.memory_required,
job.priority,
job.status.value,
job.created_at.isoformat(),
None,
None,
None,
None
))
conn.commit()
conn.close()
print(f"任务已提交: {job.job_id}")
return job.job_id
def get_pending_jobs(self) -> List[TrainingJob]:
"""获取待处理任务(按优先级排序)"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM jobs
WHERE status = ?
ORDER BY priority DESC, created_at ASC
''', (JobStatus.PENDING.value,))
jobs = []
for row in cursor.fetchall():
job = TrainingJob(
job_id=row[0],
name=row[1],
command=row[2],
num_gpus=row[3],
memory_required=row[4],
priority=row[5]
)
job.status = JobStatus(row[6])
job.created_at = datetime.fromisoformat(row[7])
jobs.append(job)
conn.close()
return jobs
def update_job_status(self, job_id: str, status: JobStatus,
gpu_ids: Optional[str] = None,
error_message: Optional[str] = None):
"""更新任务状态"""
with self.lock:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
now = datetime.now().isoformat()
if status == JobStatus.RUNNING:
cursor.execute('''
UPDATE jobs
SET status = ?, started_at = ?, gpu_ids = ?
WHERE job_id = ?
''', (status.value, now, gpu_ids, job_id))
elif status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
cursor.execute('''
UPDATE jobs
SET status = ?, completed_at = ?, error_message = ?
WHERE job_id = ?
''', (status.value, now, error_message, job_id))
else:
cursor.execute('''
UPDATE jobs
SET status = ?
WHERE job_id = ?
''', (status.value, job_id))
conn.commit()
conn.close()
def get_job(self, job_id: str) -> Optional[TrainingJob]:
"""获取任务信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT * FROM jobs WHERE job_id = ?', (job_id,))
row = cursor.fetchone()
if row:
job = TrainingJob(
job_id=row[0],
name=row[1],
command=row[2],
num_gpus=row[3],
memory_required=row[4],
priority=row[5]
)
job.status = JobStatus(row[6])
job.created_at = datetime.fromisoformat(row[7])
job.started_at = datetime.fromisoformat(row[8]) if row[8] else None
job.completed_at = datetime.fromisoformat(row[9]) if row[9] else None
job.gpu_ids = row[10]
conn.close()
return job
conn.close()
return None
def list_jobs(self, status: Optional[JobStatus] = None) -> List[Dict]:
"""列出所有任务"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if status:
cursor.execute('SELECT * FROM jobs WHERE status = ? ORDER BY created_at DESC',
(status.value,))
else:
cursor.execute('SELECT * FROM jobs ORDER BY created_at DESC')
jobs = []
for row in cursor.fetchall():
jobs.append({
'job_id': row[0],
'name': row[1],
'num_gpus': row[3],
'priority': row[5],
'status': row[6],
'created_at': row[7],
'started_at': row[8],
'gpu_ids': row[10]
})
conn.close()
return jobs
def cancel_job(self, job_id: str):
"""取消任务"""
job = self.get_job(job_id)
if job and job.status == JobStatus.PENDING:
self.update_job_status(job_id, JobStatus.CANCELLED)
print(f"任务已取消: {job_id}")
return True
return False
# 使用示例
queue = JobQueue()
# 提交任务
job1 = TrainingJob(
job_id='job_001',
name='ResNet50训练',
command='python train.py --model resnet50',
num_gpus=4,
memory_required=20000,
priority=10
)
job2 = TrainingJob(
job_id='job_002',
name='GPT-2训练',
command='python train_gpt.py',
num_gpus=8,
memory_required=40000,
priority=5
)
queue.submit_job(job1)
queue.submit_job(job2)
# 查看待处理任务
pending = queue.get_pending_jobs()
for job in pending:
print(f"{job.job_id}: {job.name} (优先级: {job.priority})")
1.3 优先级调度
# scheduler.py - 任务调度器
import subprocess
import threading
import time
from typing import Optional
class JobScheduler:
"""任务调度器"""
def __init__(self, gpu_allocator: GPUAllocator, job_queue: JobQueue):
self.gpu_allocator = gpu_allocator
self.job_queue = job_queue
self.running = False
self.thread = None
self.running_jobs = {} # job_id -> process
def start(self):
"""启动调度器"""
if self.running:
print("调度器已经在运行")
return
self.running = True
self.thread = threading.Thread(target=self._schedule_loop, daemon=True)
self.thread.start()
print("调度器已启动")
def stop(self):
"""停止调度器"""
self.running = False
if self.thread:
self.thread.join()
print("调度器已停止")
def _schedule_loop(self):
"""调度循环"""
while self.running:
try:
# 检查运行中的任务
self._check_running_jobs()
# 尝试调度新任务
self._schedule_pending_jobs()
# 等待一段时间
time.sleep(5)
except Exception as e:
print(f"调度器错误: {e}")
def _check_running_jobs(self):
"""检查运行中的任务状态"""
completed_jobs = []
for job_id, process in self.running_jobs.items():
# 检查进程是否结束
returncode = process.poll()
if returncode is not None:
# 任务完成
if returncode == 0:
self.job_queue.update_job_status(job_id, JobStatus.COMPLETED)
print(f"任务完成: {job_id}")
else:
self.job_queue.update_job_status(
job_id,
JobStatus.FAILED,
error_message=f"Exit code: {returncode}"
)
print(f"任务失败: {job_id} (exit code: {returncode})")
completed_jobs.append(job_id)
# 移除完成的任务
for job_id in completed_jobs:
del self.running_jobs[job_id]
def _schedule_pending_jobs(self):
"""调度待处理任务"""
pending_jobs = self.job_queue.get_pending_jobs()
for job in pending_jobs:
# 尝试分配GPU
gpu_ids = self.gpu_allocator.find_free_gpus(
num_gpus=job.num_gpus,
min_memory=job.memory_required
)
if gpu_ids is None:
# 没有足够的GPU资源
continue
# 启动任务
success = self._launch_job(job, gpu_ids)
if success:
cuda_devices = ','.join(map(str, gpu_ids))
self.job_queue.update_job_status(
job.job_id,
JobStatus.RUNNING,
gpu_ids=cuda_devices
)
print(f"任务启动: {job.job_id} on GPU {cuda_devices}")
def _launch_job(self, job: TrainingJob, gpu_ids: List[int]) -> bool:
"""启动任务"""
try:
# 设置环境变量
env = os.environ.copy()
env['CUDA_VISIBLE_DEVICES'] = ','.join(map(str, gpu_ids))
# 创建日志目录
log_dir = f'logs/{job.job_id}'
os.makedirs(log_dir, exist_ok=True)
# 启动进程
log_file = open(f'{log_dir}/output.log', 'w')
process = subprocess.Popen(
job.command,
shell=True,
env=env,
stdout=log_file,
stderr=subprocess.STDOUT
)
self.running_jobs[job.job_id] = process
return True
except Exception as e:
print(f"启动任务失败 {job.job_id}: {e}")
return False
# 使用示例
allocator = GPUAllocator()
queue = JobQueue()
scheduler = JobScheduler(allocator, queue)
# 启动调度器
scheduler.start()
# 提交多个任务
jobs = [
TrainingJob(f'job_{i}', f'Training {i}', f'python train.py --id {i}',
num_gpus=2, memory_required=10000, priority=i)
for i in range(10)
]
for job in jobs:
queue.submit_job(job)
# 调度器会自动调度这些任务
# 让程序运行一段时间
time.sleep(60)
# 停止调度器
scheduler.stop()
2. Kubernetes + GPU
2.1 GPU Operator安装
# gpu-operator-values.yaml
# NVIDIA GPU Operator配置
operator:
defaultRuntime: containerd
driver:
enabled: true
version: "525.105.17"
toolkit:
enabled: true
version: v1.13.5
devicePlugin:
enabled: true
version: v0.14.0
dcgm:
enabled: true
dcgmExporter:
enabled: true
serviceMonitor:
enabled: true
gfd:
enabled: true
migManager:
enabled: true
nodeStatusExporter:
enabled: true
validator:
plugin:
env:
- name: WITH_WORKLOAD
value: "true"
# 安装GPU Operator
helm repo add nvidia https://nvidia.github.io/gpu-operator
helm repo update
helm install gpu-operator nvidia/gpu-operator \
--namespace gpu-operator \
--create-namespace \
--values gpu-operator-values.yaml
# 验证安装
kubectl get pods -n gpu-operator
# 检查节点GPU标签
kubectl get nodes -o json | jq '.items[].metadata.labels' | grep nvidia
2.2 NVIDIA Device Plugin
# nvidia-device-plugin-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: nvidia-device-plugin-daemonset
namespace: kube-system
spec:
selector:
matchLabels:
name: nvidia-device-plugin-ds
updateStrategy:
type: RollingUpdate
template:
metadata:
labels:
name: nvidia-device-plugin-ds
spec:
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
priorityClassName: "system-node-critical"
containers:
- image: nvcr.io/nvidia/k8s-device-plugin:v0.14.0
name: nvidia-device-plugin-ctr
args:
- "--fail-on-init-error=false"
- "--pass-device-specs=true"
- "--device-list-strategy=envvar"
- "--nvidia-driver-root=/"
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
2.3 GPU资源限制
# gpu-training-job.yaml
# GPU训练任务配置
apiVersion: batch/v1
kind: Job
metadata:
name: resnet50-training
spec:
template:
metadata:
labels:
app: resnet50-training
spec:
restartPolicy: OnFailure
containers:
- name: pytorch
image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
command:
- python
- train.py
args:
- --batch-size=128
- --epochs=90
resources:
limits:
nvidia.com/gpu: 4 # 请求4个GPU
memory: "64Gi"
cpu: "16"
requests:
nvidia.com/gpu: 4
memory: "32Gi"
cpu: "8"
env:
- name: NVIDIA_VISIBLE_DEVICES
value: "all"
- name: NVIDIA_DRIVER_CAPABILITIES
value: "compute,utility"
- name: NCCL_DEBUG
value: "INFO"
volumeMounts:
- name: dataset
mountPath: /data
- name: output
mountPath: /output
- name: shm
mountPath: /dev/shm
volumes:
- name: dataset
persistentVolumeClaim:
claimName: imagenet-pvc
- name: output
persistentVolumeClaim:
claimName: training-output-pvc
- name: shm
emptyDir:
medium: Memory
sizeLimit: "32Gi"
nodeSelector:
nvidia.com/gpu.product: "NVIDIA-A100-SXM4-40GB"
2.4 GPU共享和隔离
# gpu-sharing-config.yaml
# GPU共享配置(使用NVIDIA MPS)
apiVersion: v1
kind: ConfigMap
metadata:
name: nvidia-mps-config
namespace: kube-system
data:
mps.conf: |
# MPS配置
CUDA_VISIBLE_DEVICES=0
CUDA_MPS_PIPE_DIRECTORY=/tmp/nvidia-mps
CUDA_MPS_LOG_DIRECTORY=/var/log/nvidia-mps
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: nvidia-mps
namespace: kube-system
spec:
selector:
matchLabels:
app: nvidia-mps
template:
metadata:
labels:
app: nvidia-mps
spec:
hostPID: true
containers:
- name: mps
image: nvidia/cuda:11.7.1-base-ubuntu20.04
command:
- /bin/bash
- -c
- |
# 启动MPS
nvidia-cuda-mps-control -d
# 设置MPS配置
echo "set_default_active_thread_percentage 50" | nvidia-cuda-mps-control
# 保持运行
tail -f /dev/null
securityContext:
privileged: true
resources:
limits:
nvidia.com/gpu: 1
volumeMounts:
- name: mps-pipe
mountPath: /tmp/nvidia-mps
- name: mps-log
mountPath: /var/log/nvidia-mps
volumes:
- name: mps-pipe
hostPath:
path: /tmp/nvidia-mps
- name: mps-log
hostPath:
path: /var/log/nvidia-mps
# shared-gpu-pod.yaml
# 使用共享GPU的Pod
apiVersion: v1
kind: Pod
metadata:
name: inference-service-1
spec:
containers:
- name: inference
image: inference:latest
resources:
limits:
nvidia.com/gpu: 1
env:
- name: CUDA_MPS_PIPE_DIRECTORY
value: /tmp/nvidia-mps
- name: CUDA_MPS_LOG_DIRECTORY
value: /var/log/nvidia-mps
volumeMounts:
- name: mps-pipe
mountPath: /tmp/nvidia-mps
volumes:
- name: mps-pipe
hostPath:
path: /tmp/nvidia-mps
3. 训练平台
3.1 Kubeflow架构
# kubeflow-installation.sh
#!/bin/bash
# 安装Kubeflow
# 1. 安装kustomize
curl -s "https://raw.githubusercontent.com/kubernetes-sigs/kustomize/master/hack/install_kustomize.sh" | bash
# 2. 克隆Kubeflow manifests
git clone https://github.com/kubeflow/manifests.git
cd manifests
# 3. 安装Kubeflow组件
while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done
# 4. 等待所有Pod就绪
kubectl wait --for=condition=Ready pods --all -n kubeflow --timeout=600s
# 5. 获取访问地址
kubectl port-forward -n istio-system svc/istio-ingressgateway 8080:80
# pytorch-training-job.yaml
# Kubeflow PyTorch训练任务
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-dist-training
namespace: kubeflow
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
command:
- python
- -m
- torch.distributed.run
- --nproc_per_node=8
- train.py
args:
- --batch-size=32
- --epochs=90
resources:
limits:
nvidia.com/gpu: 8
memory: "128Gi"
volumeMounts:
- name: data
mountPath: /data
- name: shm
mountPath: /dev/shm
volumes:
- name: data
persistentVolumeClaim:
claimName: training-data
- name: shm
emptyDir:
medium: Memory
sizeLimit: "64Gi"
Worker:
replicas: 3
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
command:
- python
- -m
- torch.distributed.run
- --nproc_per_node=8
- train.py
args:
- --batch-size=32
- --epochs=90
resources:
limits:
nvidia.com/gpu: 8
memory: "128Gi"
volumeMounts:
- name: data
mountPath: /data
- name: shm
mountPath: /dev/shm
volumes:
- name: data
persistentVolumeClaim:
claimName: training-data
- name: shm
emptyDir:
medium: Memory
sizeLimit: "64Gi"
3.2 Ray分布式训练
# ray_training.py - 使用Ray进行分布式训练
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, DistributedSampler
def train_func(config):
"""训练函数(在每个worker上运行)"""
# 获取分布式信息
rank = train.get_context().get_world_rank()
world_size = train.get_context().get_world_size()
local_rank = train.get_context().get_local_rank()
# 设置设备
device = torch.device(f"cuda:{local_rank}")
# 创建模型
model = create_model()
model = model.to(device)
# 包装为DDP
model = torch.nn.parallel.DistributedDataParallel(
model,
device_ids=[local_rank]
)
# 创建数据加载器
dataset = create_dataset()
sampler = DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank
)
dataloader = DataLoader(
dataset,
batch_size=config['batch_size'],
sampler=sampler
)
# 优化器
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
criterion = nn.CrossEntropyLoss()
# 训练循环
for epoch in range(config['epochs']):
sampler.set_epoch(epoch)
model.train()
total_loss = 0
for batch_idx, (data, target) in enumerate(dataloader):
data = data.to(device)
target = target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(dataloader)
# 向Ray报告指标
train.report({"loss": avg_loss, "epoch": epoch})
if rank == 0:
print(f"Epoch {epoch}: Loss = {avg_loss:.4f}")
# 配置Ray集群
ray.init(address="auto") # 连接到Ray集群
# 创建训练器
trainer = TorchTrainer(
train_func,
train_loop_config={
"batch_size": 32,
"lr": 0.001,
"epochs": 10
},
scaling_config=ScalingConfig(
num_workers=4, # 4个worker
use_gpu=True,
resources_per_worker={"GPU": 2} # 每个worker 2个GPU
)
)
# 运行训练
result = trainer.fit()
print(f"Training completed: {result.metrics}")
# ray-cluster.yaml
# Ray集群配置
apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
name: ray-cluster
spec:
rayVersion: '2.5.0'
headGroupSpec:
serviceType: ClusterIP
rayStartParams:
dashboard-host: '0.0.0.0'
num-cpus: '0'
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.5.0-gpu
resources:
limits:
cpu: "4"
memory: "16Gi"
requests:
cpu: "2"
memory: "8Gi"
ports:
- containerPort: 6379
name: gcs
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
workerGroupSpecs:
- replicas: 4
minReplicas: 1
maxReplicas: 10
groupName: gpu-workers
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.5.0-gpu
resources:
limits:
nvidia.com/gpu: 4
cpu: "16"
memory: "64Gi"
requests:
nvidia.com/gpu: 4
cpu: "8"
memory: "32Gi"
3.3 MLOps流程
# mlops_pipeline.py - MLOps训练流水线
from kfp import dsl, compiler
from kfp.dsl import InputPath, OutputPath
@dsl.component(
base_image='python:3.9',
packages_to_install=['boto3', 'pandas']
)
def download_data(
s3_path: str,
output_path: OutputPath('Dataset')
):
"""下载数据集"""
import boto3
import os
s3 = boto3.client('s3')
# 解析S3路径
bucket, key = s3_path.replace('s3://', '').split('/', 1)
# 下载文件
os.makedirs(output_path, exist_ok=True)
s3.download_file(bucket, key, f'{output_path}/data.tar.gz')
print(f"数据已下载到: {output_path}")
@dsl.component(
base_image='python:3.9',
packages_to_install=['pandas', 'scikit-learn']
)
def preprocess_data(
input_path: InputPath('Dataset'),
output_path: OutputPath('Dataset')
):
"""预处理数据"""
import pandas as pd
import os
# 读取数据
df = pd.read_csv(f'{input_path}/data.csv')
# 数据清洗
df = df.dropna()
# 特征工程
# ...
# 保存处理后的数据
os.makedirs(output_path, exist_ok=True)
df.to_csv(f'{output_path}/processed.csv', index=False)
print(f"数据预处理完成,样本数: {len(df)}")
@dsl.component(
base_image='pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime'
)
def train_model(
data_path: InputPath('Dataset'),
model_path: OutputPath('Model'),
epochs: int,
batch_size: int,
learning_rate: float
):
"""训练模型"""
import torch
import torch.nn as nn
# 加载数据
# ...
# 创建模型
model = create_model()
# 训练
# ...
# 保存模型
torch.save(model.state_dict(), f'{model_path}/model.pt')
print("模型训练完成")
@dsl.component(
base_image='pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime'
)
def evaluate_model(
model_path: InputPath('Model'),
test_data_path: InputPath('Dataset'),
metrics_path: OutputPath('Metrics')
):
"""评估模型"""
import torch
import json
# 加载模型
model = create_model()
model.load_state_dict(torch.load(f'{model_path}/model.pt'))
# 评估
accuracy = 0.95 # 示例
f1_score = 0.93
# 保存指标
metrics = {
'accuracy': accuracy,
'f1_score': f1_score
}
with open(f'{metrics_path}/metrics.json', 'w') as f:
json.dump(metrics, f)
print(f"模型评估完成: Accuracy={accuracy:.4f}")
@dsl.component(
base_image='python:3.9',
packages_to_install=['boto3']
)
def deploy_model(
model_path: InputPath('Model'),
metrics_path: InputPath('Metrics'),
deployment_target: str
):
"""部署模型"""
import json
# 读取指标
with open(f'{metrics_path}/metrics.json', 'r') as f:
metrics = json.load(f)
# 检查指标是否满足部署条件
if metrics['accuracy'] < 0.90:
print("模型精度不足,不进行部署")
return
# 部署模型
print(f"正在部署模型到: {deployment_target}")
# ...
print("模型部署完成")
@dsl.pipeline(
name='Training Pipeline',
description='Complete ML training pipeline'
)
def training_pipeline(
s3_data_path: str = 's3://bucket/data.tar.gz',
epochs: int = 10,
batch_size: int = 32,
learning_rate: float = 0.001
):
"""训练流水线"""
# 下载数据
download_task = download_data(s3_path=s3_data_path)
# 预处理
preprocess_task = preprocess_data(
input_path=download_task.outputs['output_path']
)
# 训练
train_task = train_model(
data_path=preprocess_task.outputs['output_path'],
epochs=epochs,
batch_size=batch_size,
learning_rate=learning_rate
)
# GPU节点选择器
train_task.set_gpu_limit(4)
train_task.add_node_selector_constraint('nvidia.com/gpu.product', 'A100')
# 评估
eval_task = evaluate_model(
model_path=train_task.outputs['model_path'],
test_data_path=preprocess_task.outputs['output_path']
)
# 部署
deploy_task = deploy_model(
model_path=train_task.outputs['model_path'],
metrics_path=eval_task.outputs['metrics_path'],
deployment_target='production'
)
# 编译流水线
compiler.Compiler().compile(
pipeline_func=training_pipeline,
package_path='training_pipeline.yaml'
)
4. 模型推理调度
4.1 vLLM动态批处理
# vllm_server.py - vLLM推理服务
from vllm import LLM, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from fastapi import FastAPI, Request
from typing import List
import asyncio
app = FastAPI()
# 创建vLLM引擎
engine_args = AsyncEngineArgs(
model="meta-llama/Llama-2-7b-hf",
tensor_parallel_size=4, # 4卡模型并行
dtype="float16",
max_num_seqs=256, # 最大并发序列数
max_num_batched_tokens=8192, # 最大批处理token数
gpu_memory_utilization=0.9,
swap_space=4,
)
engine = AsyncLLMEngine.from_engine_args(engine_args)
@app.post("/generate")
async def generate(request: Request):
"""生成接口"""
data = await request.json()
prompt = data.get("prompt", "")
max_tokens = data.get("max_tokens", 100)
temperature = data.get("temperature", 0.7)
# 采样参数
sampling_params = SamplingParams(
temperature=temperature,
top_p=0.9,
max_tokens=max_tokens
)
# 添加到请求队列
request_id = f"req_{asyncio.current_task().get_name()}"
results_generator = engine.generate(prompt, sampling_params, request_id)
# 收集结果
final_output = None
async for request_output in results_generator:
final_output = request_output
# 提取生成的文本
generated_text = final_output.outputs[0].text
return {
"generated_text": generated_text,
"request_id": request_id
}
@app.post("/batch_generate")
async def batch_generate(request: Request):
"""批量生成接口"""
data = await request.json()
prompts = data.get("prompts", [])
max_tokens = data.get("max_tokens", 100)
sampling_params = SamplingParams(
temperature=0.7,
max_tokens=max_tokens
)
# 批量请求
tasks = []
for i, prompt in enumerate(prompts):
request_id = f"batch_req_{i}"
task = engine.generate(prompt, sampling_params, request_id)
tasks.append(task)
# 并发处理
results = []
for task in tasks:
final_output = None
async for request_output in task:
final_output = request_output
results.append(final_output.outputs[0].text)
return {"results": results}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
4.2 TensorRT-LLM推理
# tensorrt_llm_engine.py - TensorRT-LLM推理引擎
import tensorrt_llm
import torch
from tensorrt_llm.runtime import ModelRunner
from tensorrt_llm.builder import Builder
from pathlib import Path
class TRTLLMEngine:
"""TensorRT-LLM推理引擎"""
def __init__(self, engine_dir: str, tokenizer_dir: str):
self.engine_dir = Path(engine_dir)
self.runner = ModelRunner.from_dir(
engine_dir=str(self.engine_dir),
rank=torch.cuda.current_device()
)
# 加载tokenizer
from transformers import AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir)
def generate(
self,
prompts: List[str],
max_new_tokens: int = 100,
temperature: float = 1.0,
top_k: int = 50,
top_p: float = 0.9
) -> List[str]:
"""
生成文本
Args:
prompts: 输入提示列表
max_new_tokens: 最大生成token数
temperature: 温度参数
top_k: Top-K采样
top_p: Top-P采样
Returns:
生成的文本列表
"""
# Tokenize输入
batch_input_ids = []
for prompt in prompts:
input_ids = self.tokenizer.encode(prompt, return_tensors="pt")
batch_input_ids.append(input_ids[0].tolist())
# 推理
outputs = self.runner.generate(
batch_input_ids=batch_input_ids,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_k=top_k,
top_p=top_p,
end_id=self.tokenizer.eos_token_id,
pad_id=self.tokenizer.pad_token_id
)
# Decode输出
output_texts = []
for output_ids in outputs:
output_text = self.tokenizer.decode(
output_ids[0],
skip_special_tokens=True
)
output_texts.append(output_text)
return output_texts
# 使用示例
engine = TRTLLMEngine(
engine_dir="/path/to/trt/engine",
tokenizer_dir="meta-llama/Llama-2-7b-hf"
)
prompts = [
"What is machine learning?",
"Explain neural networks.",
]
outputs = engine.generate(prompts, max_new_tokens=100)
for prompt, output in zip(prompts, outputs):
print(f"Input: {prompt}")
print(f"Output: {output}\n")
4.3 模型并发和排队
# inference_scheduler.py - 推理请求调度器
import asyncio
from asyncio import Queue, PriorityQueue
from typing import Optional, Dict
from dataclasses import dataclass, field
from enum import Enum
import time
class Priority(Enum):
"""请求优先级"""
HIGH = 1
MEDIUM = 2
LOW = 3
@dataclass(order=True)
class InferenceRequest:
"""推理请求"""
priority: int = field(compare=True)
timestamp: float = field(compare=True)
request_id: str = field(compare=False)
prompt: str = field(compare=False)
max_tokens: int = field(compare=False)
future: asyncio.Future = field(compare=False, default=None)
class InferenceScheduler:
"""推理调度器"""
def __init__(self, engine, max_batch_size: int = 32, max_wait_ms: int = 50):
self.engine = engine
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
# 请求队列(优先级队列)
self.request_queue = PriorityQueue()
# 运行中的批次
self.running_batches = {}
# 统计信息
self.stats = {
'total_requests': 0,
'completed_requests': 0,
'batch_count': 0,
'avg_batch_size': 0,
}
# 启动调度循环
self.running = True
asyncio.create_task(self._schedule_loop())
async def submit_request(
self,
request_id: str,
prompt: str,
max_tokens: int = 100,
priority: Priority = Priority.MEDIUM
) -> str:
"""
提交推理请求
Args:
request_id: 请求ID
prompt: 输入提示
max_tokens: 最大生成token数
priority: 优先级
Returns:
生成的文本
"""
# 创建future用于等待结果
future = asyncio.Future()
# 创建请求
request = InferenceRequest(
priority=priority.value,
timestamp=time.time(),
request_id=request_id,
prompt=prompt,
max_tokens=max_tokens,
future=future
)
# 加入队列
await self.request_queue.put(request)
self.stats['total_requests'] += 1
# 等待结果
result = await future
return result
async def _schedule_loop(self):
"""调度循环"""
while self.running:
try:
# 收集一个批次的请求
batch = await self._collect_batch()
if batch:
# 处理批次
asyncio.create_task(self._process_batch(batch))
except Exception as e:
print(f"调度错误: {e}")
async def _collect_batch(self) -> List[InferenceRequest]:
"""收集一个批次的请求"""
batch = []
start_time = time.time()
while len(batch) < self.max_batch_size:
# 计算剩余等待时间
elapsed_ms = (time.time() - start_time) * 1000
remaining_ms = self.max_wait_ms - elapsed_ms
if remaining_ms <= 0 and batch:
# 超时,返回当前批次
break
try:
# 等待新请求
timeout = remaining_ms / 1000 if remaining_ms > 0 else 0.001
request = await asyncio.wait_for(
self.request_queue.get(),
timeout=timeout
)
batch.append(request)
except asyncio.TimeoutError:
# 超时,返回当前批次
if batch:
break
return batch
async def _process_batch(self, batch: List[InferenceRequest]):
"""处理一个批次"""
try:
# 提取prompts
prompts = [req.prompt for req in batch]
max_tokens = max(req.max_tokens for req in batch)
# 批量推理
outputs = await asyncio.to_thread(
self.engine.generate,
prompts=prompts,
max_new_tokens=max_tokens
)
# 返回结果
for request, output in zip(batch, outputs):
request.future.set_result(output)
# 更新统计
self.stats['completed_requests'] += len(batch)
self.stats['batch_count'] += 1
self.stats['avg_batch_size'] = (
self.stats['completed_requests'] / self.stats['batch_count']
)
except Exception as e:
# 批次失败,通知所有请求
for request in batch:
request.future.set_exception(e)
def get_stats(self) -> Dict:
"""获取统计信息"""
return self.stats.copy()
async def shutdown(self):
"""关闭调度器"""
self.running = False
# 使用示例
async def main():
# 创建引擎(示例)
engine = TRTLLMEngine(...)
# 创建调度器
scheduler = InferenceScheduler(
engine=engine,
max_batch_size=32,
max_wait_ms=50
)
# 提交多个请求
tasks = []
for i in range(100):
task = scheduler.submit_request(
request_id=f"req_{i}",
prompt=f"Question {i}: What is AI?",
max_tokens=100,
priority=Priority.MEDIUM
)
tasks.append(task)
# 等待所有请求完成
results = await asyncio.gather(*tasks)
# 查看统计
stats = scheduler.get_stats()
print(f"统计信息: {stats}")
# 关闭
await scheduler.shutdown()
asyncio.run(main())
4.4 KV Cache管理
# kv_cache_manager.py - KV Cache管理器
import torch
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
@dataclass
class CacheBlock:
"""Cache块"""
block_id: int
ref_count: int
data: Optional[torch.Tensor]
class KVCacheManager:
"""KV Cache管理器"""
def __init__(
self,
num_blocks: int,
block_size: int,
num_layers: int,
num_heads: int,
head_dim: int,
device: str = 'cuda'
):
"""
初始化KV Cache管理器
Args:
num_blocks: Cache块数量
block_size: 每个块的token数
num_layers: 模型层数
num_heads: 注意力头数
head_dim: 每个头的维度
device: 设备
"""
self.num_blocks = num_blocks
self.block_size = block_size
self.num_layers = num_layers
self.num_heads = num_heads
self.head_dim = head_dim
self.device = device
# 预分配所有cache块
self.cache_blocks = self._allocate_cache_blocks()
# 空闲块队列
self.free_blocks = list(range(num_blocks))
# 已分配块 {request_id -> [block_ids]}
self.allocated_blocks: Dict[str, List[int]] = {}
def _allocate_cache_blocks(self) -> List[CacheBlock]:
"""预分配所有cache块"""
blocks = []
for block_id in range(self.num_blocks):
# 每个块存储 [2, num_layers, num_heads, block_size, head_dim]
# 2 表示 K 和 V
data = torch.zeros(
2, self.num_layers, self.num_heads, self.block_size, self.head_dim,
dtype=torch.float16,
device=self.device
)
block = CacheBlock(
block_id=block_id,
ref_count=0,
data=data
)
blocks.append(block)
return blocks
def allocate(self, request_id: str, num_tokens: int) -> Optional[List[int]]:
"""
为请求分配cache块
Args:
request_id: 请求ID
num_tokens: 需要的token数
Returns:
分配的块ID列表,如果失败返回None
"""
# 计算需要的块数
num_blocks_needed = (num_tokens + self.block_size - 1) // self.block_size
if len(self.free_blocks) < num_blocks_needed:
# 尝试驱逐
evicted = self._evict_blocks(num_blocks_needed)
if not evicted:
return None
# 分配块
allocated = []
for _ in range(num_blocks_needed):
block_id = self.free_blocks.pop(0)
self.cache_blocks[block_id].ref_count = 1
allocated.append(block_id)
self.allocated_blocks[request_id] = allocated
return allocated
def free(self, request_id: str):
"""释放请求的cache"""
if request_id not in self.allocated_blocks:
return
block_ids = self.allocated_blocks[request_id]
for block_id in block_ids:
block = self.cache_blocks[block_id]
block.ref_count -= 1
if block.ref_count == 0:
self.free_blocks.append(block_id)
del self.allocated_blocks[request_id]
def _evict_blocks(self, num_blocks: int) -> bool:
"""
驱逐块(LRU策略)
Args:
num_blocks: 需要驱逐的块数
Returns:
是否成功
"""
# 简化版:找到ref_count=0的块
evictable = [
block.block_id
for block in self.cache_blocks
if block.ref_count == 0 and block.block_id not in self.free_blocks
]
if len(evictable) < num_blocks:
return False
# 驱逐块
for block_id in evictable[:num_blocks]:
self.free_blocks.append(block_id)
return True
def get_cache(self, request_id: str) -> Optional[torch.Tensor]:
"""获取请求的cache数据"""
if request_id not in self.allocated_blocks:
return None
block_ids = self.allocated_blocks[request_id]
# 拼接所有块的数据
caches = [self.cache_blocks[bid].data for bid in block_ids]
return torch.cat(caches, dim=3) # 在seq_len维度拼接
def get_stats(self) -> Dict:
"""获取统计信息"""
total_blocks = self.num_blocks
used_blocks = total_blocks - len(self.free_blocks)
utilization = used_blocks / total_blocks
return {
'total_blocks': total_blocks,
'used_blocks': used_blocks,
'free_blocks': len(self.free_blocks),
'utilization': f"{utilization*100:.2f}%",
'active_requests': len(self.allocated_blocks)
}
# 使用示例
cache_manager = KVCacheManager(
num_blocks=1000,
block_size=16,
num_layers=32,
num_heads=32,
head_dim=128,
device='cuda:0'
)
# 分配cache
request_id = "req_001"
block_ids = cache_manager.allocate(request_id, num_tokens=100)
if block_ids:
print(f"为请求 {request_id} 分配了 {len(block_ids)} 个块")
# 获取cache
cache = cache_manager.get_cache(request_id)
print(f"Cache形状: {cache.shape}")
# 释放cache
cache_manager.free(request_id)
# 查看统计
stats = cache_manager.get_stats()
print(f"Cache统计: {stats}")
5. 资源监控
5.1 GPU利用率监控
# gpu_monitor.py - GPU实时监控
import pynvml
from prometheus_client import start_http_server, Gauge
import time
class GPUMonitor:
"""GPU监控器(Prometheus导出器)"""
def __init__(self, port: int = 8000):
# 初始化NVML
pynvml.nvmlInit()
self.device_count = pynvml.nvmlDeviceGetCount()
# Prometheus指标
self.gpu_utilization = Gauge(
'gpu_utilization_percent',
'GPU utilization percentage',
['gpu_id', 'gpu_name']
)
self.gpu_memory_used = Gauge(
'gpu_memory_used_bytes',
'GPU memory used in bytes',
['gpu_id', 'gpu_name']
)
self.gpu_memory_total = Gauge(
'gpu_memory_total_bytes',
'GPU memory total in bytes',
['gpu_id', 'gpu_name']
)
self.gpu_temperature = Gauge(
'gpu_temperature_celsius',
'GPU temperature in Celsius',
['gpu_id', 'gpu_name']
)
self.gpu_power = Gauge(
'gpu_power_watts',
'GPU power consumption in watts',
['gpu_id', 'gpu_name']
)
# 启动HTTP服务器
start_http_server(port)
print(f"Prometheus exporter started on port {port}")
def collect_metrics(self):
"""收集GPU指标"""
for i in range(self.device_count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
# GPU名称
name = pynvml.nvmlDeviceGetName(handle)
if isinstance(name, bytes):
name = name.decode('utf-8')
# 利用率
util = pynvml.nvmlDeviceGetUtilizationRates(handle)
self.gpu_utilization.labels(gpu_id=str(i), gpu_name=name).set(util.gpu)
# 显存
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
self.gpu_memory_used.labels(gpu_id=str(i), gpu_name=name).set(mem_info.used)
self.gpu_memory_total.labels(gpu_id=str(i), gpu_name=name).set(mem_info.total)
# 温度
temp = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU)
self.gpu_temperature.labels(gpu_id=str(i), gpu_name=name).set(temp)
# 功耗
power = pynvml.nvmlDeviceGetPowerUsage(handle) / 1000.0 # 转换为W
self.gpu_power.labels(gpu_id=str(i), gpu_name=name).set(power)
def run(self, interval: int = 5):
"""运行监控"""
try:
while True:
self.collect_metrics()
time.sleep(interval)
except KeyboardInterrupt:
print("\n停止监控")
finally:
pynvml.nvmlShutdown()
if __name__ == "__main__":
monitor = GPUMonitor(port=9100)
monitor.run(interval=5)
# prometheus-config.yaml
# Prometheus配置
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'gpu-metrics'
static_configs:
- targets: ['localhost:9100']
labels:
instance: 'gpu-node-1'
- job_name: 'dcgm-exporter'
static_configs:
- targets: ['localhost:9400']
# grafana-dashboard.json
# Grafana仪表板配置(部分)
{
"dashboard": {
"title": "GPU Monitoring",
"panels": [
{
"title": "GPU Utilization",
"targets": [
{
"expr": "gpu_utilization_percent"
}
],
"type": "graph"
},
{
"title": "GPU Memory Usage",
"targets": [
{
"expr": "gpu_memory_used_bytes / gpu_memory_total_bytes * 100"
}
],
"type": "graph"
},
{
"title": "GPU Temperature",
"targets": [
{
"expr": "gpu_temperature_celsius"
}
],
"type": "graph"
}
]
}
}
5.2 显存使用跟踪
# memory_profiler.py - 显存使用分析
import torch
import functools
from collections import defaultdict
import time
class MemoryProfiler:
"""显存分析器"""
def __init__(self, device='cuda:0'):
self.device = torch.device(device)
self.snapshots = []
self.peak_memory = 0
def snapshot(self, tag: str = ""):
"""记录当前显存快照"""
torch.cuda.synchronize()
allocated = torch.cuda.memory_allocated(self.device)
reserved = torch.cuda.memory_reserved(self.device)
peak = torch.cuda.max_memory_allocated(self.device)
self.snapshots.append({
'tag': tag,
'timestamp': time.time(),
'allocated_mb': allocated / 1024**2,
'reserved_mb': reserved / 1024**2,
'peak_mb': peak / 1024**2
})
self.peak_memory = max(self.peak_memory, peak)
def print_summary(self):
"""打印摘要"""
if not self.snapshots:
print("没有快照数据")
return
print("\n显存使用摘要:")
print("-" * 80)
print(f"{'标签':<20} {'已分配(MB)':<15} {'已预留(MB)':<15} {'峰值(MB)':<15}")
print("-" * 80)
for snap in self.snapshots:
print(f"{snap['tag']:<20} "
f"{snap['allocated_mb']:<15.2f} "
f"{snap['reserved_mb']:<15.2f} "
f"{snap['peak_mb']:<15.2f}")
print("-" * 80)
print(f"全局峰值显存: {self.peak_memory / 1024**2:.2f} MB\n")
def reset(self):
"""重置统计"""
self.snapshots = []
self.peak_memory = 0
torch.cuda.reset_peak_memory_stats(self.device)
def profile_memory(tag: str = ""):
"""装饰器:分析函数显存使用"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
profiler = getattr(wrapper, '_profiler', None)
if profiler is None:
profiler = MemoryProfiler()
wrapper._profiler = profiler
# 记录前快照
torch.cuda.reset_peak_memory_stats()
profiler.snapshot(f"{tag or func.__name__}_start")
# 执行函数
result = func(*args, **kwargs)
# 记录后快照
profiler.snapshot(f"{tag or func.__name__}_end")
return result
return wrapper
return decorator
# 使用示例
profiler = MemoryProfiler()
@profile_memory("forward")
def forward_pass(model, batch):
"""前向传播"""
output = model(batch)
return output
@profile_memory("backward")
def backward_pass(loss):
"""反向传播"""
loss.backward()
# 训练循环
model = create_large_model()
model.cuda()
profiler.snapshot("初始化后")
for epoch in range(3):
for batch in dataloader:
batch = batch.cuda()
profiler.snapshot(f"epoch{epoch}_batch_start")
# Forward
output = forward_pass(model, batch)
loss = criterion(output, labels)
# Backward
backward_pass(loss)
profiler.snapshot(f"epoch{epoch}_batch_end")
# 清空缓存
torch.cuda.empty_cache()
# 打印摘要
profiler.print_summary()
5.3 训练吞吐量分析
# throughput_analyzer.py - 吞吐量分析
import time
import torch
from collections import deque
from typing import Dict, List
import json
class ThroughputAnalyzer:
"""吞吐量分析器"""
def __init__(self, window_size: int = 100):
self.window_size = window_size
# 时间窗口
self.batch_times = deque(maxlen=window_size)
self.samples_processed = deque(maxlen=window_size)
# 累计统计
self.total_samples = 0
self.total_batches = 0
self.total_time = 0
self.start_time = None
def start_batch(self):
"""开始一个batch"""
self.batch_start_time = time.time()
def end_batch(self, batch_size: int):
"""结束一个batch"""
if not hasattr(self, 'batch_start_time'):
return
elapsed = time.time() - self.batch_start_time
self.batch_times.append(elapsed)
self.samples_processed.append(batch_size)
self.total_samples += batch_size
self.total_batches += 1
self.total_time += elapsed
if self.start_time is None:
self.start_time = self.batch_start_time
def get_current_throughput(self) -> Dict:
"""获取当前吞吐量(基于窗口)"""
if not self.batch_times:
return {}
total_samples = sum(self.samples_processed)
total_time = sum(self.batch_times)
return {
'samples_per_sec': total_samples / total_time if total_time > 0 else 0,
'batches_per_sec': len(self.batch_times) / total_time if total_time > 0 else 0,
'avg_batch_time_ms': (total_time / len(self.batch_times)) * 1000
}
def get_global_stats(self) -> Dict:
"""获取全局统计"""
if self.start_time is None:
return {}
elapsed = time.time() - self.start_time
return {
'total_samples': self.total_samples,
'total_batches': self.total_batches,
'total_time_sec': elapsed,
'global_samples_per_sec': self.total_samples / elapsed if elapsed > 0 else 0,
'global_batches_per_sec': self.total_batches / elapsed if elapsed > 0 else 0
}
def print_stats(self):
"""打印统计信息"""
current = self.get_current_throughput()
global_stats = self.get_global_stats()
print(f"\n当前吞吐量(最近{len(self.batch_times)}个batch):")
print(f" 样本/秒: {current.get('samples_per_sec', 0):.2f}")
print(f" Batch/秒: {current.get('batches_per_sec', 0):.2f}")
print(f" 平均Batch时间: {current.get('avg_batch_time_ms', 0):.2f} ms")
print(f"\n全局统计:")
print(f" 总样本数: {global_stats.get('total_samples', 0)}")
print(f" 总Batch数: {global_stats.get('total_batches', 0)}")
print(f" 总时间: {global_stats.get('total_time_sec', 0):.2f} 秒")
print(f" 平均样本/秒: {global_stats.get('global_samples_per_sec', 0):.2f}")
def save_stats(self, filename: str):
"""保存统计到文件"""
stats = {
'current': self.get_current_throughput(),
'global': self.get_global_stats()
}
with open(filename, 'w') as f:
json.dump(stats, f, indent=2)
# 使用示例
analyzer = ThroughputAnalyzer(window_size=100)
for epoch in range(10):
for batch_idx, (data, target) in enumerate(dataloader):
analyzer.start_batch()
# 训练代码
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
analyzer.end_batch(batch_size=data.size(0))
# 每100个batch打印一次
if batch_idx % 100 == 0:
analyzer.print_stats()
# 保存最终统计
analyzer.save_stats('training_throughput.json')