HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于

MLOps:模型的 CI/CD

代码有 DevOps,模型有 MLOps。

这篇讲模型的开发、训练、部署怎么做工程化。


MLOps 是什么

MLOps = Machine Learning + Operations

把软件工程的最佳实践应用到机器学习:

  • 版本控制
  • 自动化测试
  • 持续集成/部署
  • 监控告警

MLOps 流程

数据准备 → 特征工程 → 模型训练 → 模型评估 → 模型部署 → 监控反馈
    ↑                                                    ↓
    ←←←←←←←←←←←← 持续迭代 ←←←←←←←←←←←←←

每个环节都需要工程化管理。


版本管理

代码版本

用 Git 管理训练代码:

git init
git add train.py model.py
git commit -m "Initial training code"

数据版本

用 DVC(Data Version Control)管理数据:

# 初始化
dvc init

# 添加数据
dvc add data/training_data.parquet

# 推送到远程存储
dvc push

DVC 文件记录数据的 hash,配合 Git 管理。

模型版本

用 MLflow 管理模型:

import mlflow

mlflow.set_experiment("llm-training")

with mlflow.start_run():
    # 记录参数
    mlflow.log_param("learning_rate", 1e-4)
    mlflow.log_param("batch_size", 32)

    # 训练...

    # 记录指标
    mlflow.log_metric("loss", final_loss)
    mlflow.log_metric("accuracy", accuracy)

    # 保存模型
    mlflow.pytorch.log_model(model, "model")

配置版本

用 Hydra 或 YAML 管理配置:

# config/train.yaml
model:
  name: llama-7b
  hidden_size: 4096

training:
  learning_rate: 1e-4
  batch_size: 32
  epochs: 3

data:
  train_path: data/train.parquet
  val_path: data/val.parquet

实验管理

实验追踪

每次训练记录:

  • 超参数
  • 数据版本
  • 代码版本
  • 评估指标
  • 模型文件

工具选择

工具特点
MLflow开源,功能全
Weights & BiasesSaaS,界面好
NeptuneSaaS,团队协作
TensorBoard简单,PyTorch 自带

MLflow 示例

import mlflow
from mlflow.tracking import MlflowClient

# 设置服务器
mlflow.set_tracking_uri("http://mlflow-server:5000")

# 创建实验
mlflow.create_experiment("llm-finetuning")

# 运行实验
with mlflow.start_run(run_name="lr-1e-4-bs-32"):
    mlflow.log_params({
        "learning_rate": 1e-4,
        "batch_size": 32,
        "model": "llama-7b",
    })

    for epoch in range(epochs):
        train_loss = train_epoch()
        mlflow.log_metric("train_loss", train_loss, step=epoch)

    # 注册模型
    mlflow.register_model(
        f"runs:/{mlflow.active_run().info.run_id}/model",
        "llama-7b-finetuned"
    )

训练流水线

流水线定义

用 Kubeflow Pipelines 定义训练流水线:

from kfp import dsl

@dsl.component
def prepare_data(input_path: str, output_path: str):
    # 数据预处理
    pass

@dsl.component
def train_model(data_path: str, model_path: str):
    # 模型训练
    pass

@dsl.component
def evaluate_model(model_path: str) -> float:
    # 模型评估
    return accuracy

@dsl.pipeline(name="llm-training-pipeline")
def training_pipeline(input_data: str):
    data_op = prepare_data(input_path=input_data, output_path="processed")
    train_op = train_model(data_path=data_op.output, model_path="model")
    eval_op = evaluate_model(model_path=train_op.output)

触发机制

# 定时触发
schedule: "0 0 * * *"  # 每天零点

# 数据更新触发
on_data_change:
  path: s3://bucket/training_data/

# 手动触发
manual: true

模型部署

部署流程

模型训练完成
    ↓
模型评估(自动化测试)
    ↓
模型打包(Docker 镜像)
    ↓
部署到测试环境
    ↓
人工验收
    ↓
部署到生产环境
    ↓
监控

模型打包

FROM nvidia/cuda:12.0-base

# 安装依赖
RUN pip install vllm transformers

# 复制模型
COPY model/ /app/model/

# 启动服务
CMD ["python", "-m", "vllm.entrypoints.openai.api_server", \
     "--model", "/app/model", "--port", "8000"]

部署方式

蓝绿部署:

生产环境有两套:蓝和绿
当前流量在蓝
部署新版本到绿
测试通过后切换流量到绿

金丝雀部署:

新版本先接 5% 流量
监控指标正常
逐步增加到 100%

K8s 滚动更新:

spec:
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0

模型测试

单元测试

def test_model_output_shape():
    model = load_model()
    input_ids = torch.randint(0, 1000, (1, 10))
    output = model(input_ids)
    assert output.shape == (1, 10, vocab_size)

def test_model_generation():
    model = load_model()
    output = model.generate("Hello")
    assert len(output) > 0
    assert isinstance(output, str)

评估测试

def test_model_accuracy():
    model = load_model()
    accuracy = evaluate(model, test_dataset)
    assert accuracy > 0.8  # 准确率门槛

性能测试

def test_inference_latency():
    model = load_model()
    latencies = []
    for _ in range(100):
        start = time.time()
        model.generate("Hello")
        latencies.append(time.time() - start)

    p99 = np.percentile(latencies, 99)
    assert p99 < 1.0  # P99 延迟小于 1 秒

回归测试

def test_no_regression():
    new_model = load_model("new")
    old_model = load_model("old")

    for prompt in test_prompts:
        new_score = evaluate_response(new_model.generate(prompt))
        old_score = evaluate_response(old_model.generate(prompt))
        # 新模型不能比旧模型差太多
        assert new_score >= old_score * 0.95

CI/CD 配置

GitHub Actions 示例

name: Model CI/CD

on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run tests
        run: pytest tests/

  train:
    needs: test
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v3
      - name: Train model
        run: python train.py --config config/prod.yaml
      - name: Evaluate
        run: python evaluate.py
      - name: Upload model
        run: python upload_model.py

  deploy:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to staging
        run: kubectl apply -f k8s/staging/
      - name: Run integration tests
        run: pytest tests/integration/
      - name: Deploy to production
        run: kubectl apply -f k8s/prod/

监控和反馈

线上监控

# 记录推理结果
def inference(prompt):
    response = model.generate(prompt)

    # 记录到日志
    log_inference(prompt, response)

    # 上报指标
    metrics.inference_count.inc()
    metrics.inference_latency.observe(latency)

    return response

数据飞轮

用户反馈(点赞/点踩)
    ↓
收集低质量回答
    ↓
人工标注/修正
    ↓
加入训练数据
    ↓
重新训练
    ↓
模型改进

漂移检测

模型效果可能随时间下降:

def detect_drift():
    recent_metrics = get_recent_metrics(days=7)
    baseline_metrics = get_baseline_metrics()

    if recent_metrics.accuracy < baseline_metrics.accuracy * 0.95:
        alert("Model performance degradation detected")

小结

MLOps 核心实践:

版本管理:

  • 代码:Git
  • 数据:DVC
  • 模型:MLflow
  • 配置:YAML/Hydra

流水线:

  • 数据准备 → 训练 → 评估 → 部署
  • 自动化触发

测试:

  • 单元测试
  • 评估测试
  • 性能测试
  • 回归测试

部署:

  • 蓝绿/金丝雀/滚动
  • 自动化 CI/CD

监控:

  • 线上指标
  • 漂移检测
  • 数据飞轮

工程化篇结束。下一部分讲职业发展。