HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

01-MLOps 成熟度模型

概述

MLOps(Machine Learning Operations)是将 DevOps 原则应用于机器学习系统的实践。本文介绍 MLOps 成熟度模型,帮助组织评估当前状态并规划演进路径。

MLOps 核心挑战

为什么 ML 系统与传统软件不同

┌─────────────────────────────────────────────────────────────────────┐
│                    ML 系统的独特挑战                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                   传统软件 vs ML 系统                        │   │
│  │                                                             │   │
│  │  传统软件:                      ML 系统:                    │   │
│  │  ┌──────────┐                  ┌──────────┐                │   │
│  │  │   代码   │──► 测试 ──► 部署  │ 代码+数据 │──► ? ──► 部署  │   │
│  │  └──────────┘                  │ +模型+配置│                │   │
│  │                                └──────────┘                │   │
│  │  • 确定性行为                   • 概率性行为                 │   │
│  │  • 单一产物                     • 多种产物                   │   │
│  │  • 版本控制成熟                 • 数据/模型版本复杂          │   │
│  │  • 测试方法明确                 • 测试方法不明确             │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                   ML 特有的技术债务                          │   │
│  │                                                             │   │
│  │  数据依赖      特征工程      模型复杂性    配置管理          │   │
│  │     │            │             │            │               │   │
│  │     ▼            ▼             ▼            ▼               │   │
│  │  数据质量      特征存储      模型版本    超参配置            │   │
│  │  数据漂移      特征血缘      模型漂移    环境依赖            │   │
│  │  数据测试      特征重用      A/B测试     实验追踪            │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

ML 系统的构成要素

┌─────────────────────────────────────────────────────────────────────┐
│                    ML 系统构成 (Google)                             │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│                    ┌─────────────────┐                             │
│                    │    ML Code      │ ← 只占很小一部分             │
│                    │    (5%)         │                             │
│                    └─────────────────┘                             │
│                              │                                      │
│    ┌────────────────────────┴────────────────────────┐             │
│    │                                                  │             │
│    ▼                                                  ▼             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌──────────┐  │
│  │Data         │  │Feature      │  │Configuration│  │Resource  │  │
│  │Collection   │  │Extraction   │  │             │  │Management│  │
│  └─────────────┘  └─────────────┘  └─────────────┘  └──────────┘  │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌──────────┐  │
│  │Data         │  │Analysis     │  │Process      │  │Serving   │  │
│  │Verification │  │Tools        │  │Management   │  │Infra     │  │
│  └─────────────┘  └─────────────┘  └─────────────┘  └──────────┘  │
│  ┌─────────────┐  ┌─────────────┐                                  │
│  │Machine      │  │Monitoring   │  ← 这些才是系统的主体             │
│  │Resource Mgmt│  │             │                                  │
│  └─────────────┘  └─────────────┘                                  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

MLOps 成熟度等级

成熟度模型概览

┌─────────────────────────────────────────────────────────────────────┐
│                    MLOps 成熟度等级                                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Level 0: 手动流程                                                  │
│  ├── 数据科学家手动训练模型                                         │
│  ├── 模型通过脚本或笔记本训练                                       │
│  ├── 手动部署到生产环境                                             │
│  └── 无持续监控                                                     │
│                                                                     │
│  Level 1: ML Pipeline 自动化                                        │
│  ├── 自动化的训练 Pipeline                                          │
│  ├── 实验追踪和版本管理                                             │
│  ├── 模型注册中心                                                   │
│  └── 基础监控                                                       │
│                                                                     │
│  Level 2: CI/CD Pipeline 自动化                                     │
│  ├── 完整的 CI/CD 流程                                              │
│  ├── 自动化测试(数据、模型、代码)                                  │
│  ├── 自动化部署                                                     │
│  └── 特征存储                                                       │
│                                                                     │
│  Level 3: 持续训练 (CT)                                             │
│  ├── 数据和模型漂移检测                                             │
│  ├── 自动触发重训练                                                 │
│  ├── 在线学习能力                                                   │
│  └── 完整的可观测性                                                 │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Level 0: 手动流程

# level-0-characteristics.yaml
level: 0
name: "手动流程 (Manual Process)"

characteristics:
  data_management:
    - 数据手动收集和处理
    - 无版本控制
    - 本地存储为主

  experimentation:
    - Jupyter Notebook 开发
    - 手动记录实验结果
    - 参数调优凭经验

  model_training:
    - 手动执行训练脚本
    - 无标准化流程
    - 依赖个人环境

  deployment:
    - 手动部署模型
    - 无标准化打包
    - 部署周期长(数周-数月)

  monitoring:
    - 无系统性监控
    - 问题靠用户反馈发现
    - 无性能基线

  team:
    - 数据科学家主导
    - 与工程团队脱节
    - 知识难以传递

challenges:
  - 模型难以复现
  - 部署周期长
  - 缺乏可追溯性
  - 无法快速迭代
  - 生产事故响应慢

typical_scenario: |
  数据科学家在 Jupyter Notebook 中开发模型,
  完成后将模型文件和部署脚本打包发送给工程团队,
  工程团队手动部署到生产服务器。
  当需要更新模型时,重复整个流程。

Level 1: ML Pipeline 自动化

# level_1_implementation.py
"""
Level 1 MLOps 实现示例
ML Pipeline 自动化
"""

from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


@dataclass
class PipelineConfig:
    """Pipeline 配置"""
    experiment_name: str
    model_name: str
    data_path: str
    target_column: str
    test_size: float = 0.2
    random_state: int = 42


@dataclass
class ModelMetrics:
    """模型指标"""
    accuracy: float
    precision: float
    recall: float
    f1: float


class Level1Pipeline:
    """Level 1 MLOps Pipeline"""

    def __init__(
        self,
        config: PipelineConfig,
        tracking_uri: str = "http://mlflow:5000"
    ):
        self.config = config
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(config.experiment_name)
        self.client = MlflowClient()

    def load_data(self) -> pd.DataFrame:
        """加载数据"""
        return pd.read_parquet(self.config.data_path)

    def prepare_data(self, df: pd.DataFrame) -> tuple:
        """数据准备"""
        X = df.drop(columns=[self.config.target_column])
        y = df[self.config.target_column]

        return train_test_split(
            X, y,
            test_size=self.config.test_size,
            random_state=self.config.random_state
        )

    def train_model(
        self,
        X_train: pd.DataFrame,
        y_train: pd.Series,
        params: Dict[str, Any]
    ) -> RandomForestClassifier:
        """训练模型"""
        model = RandomForestClassifier(**params)
        model.fit(X_train, y_train)
        return model

    def evaluate_model(
        self,
        model: RandomForestClassifier,
        X_test: pd.DataFrame,
        y_test: pd.Series
    ) -> ModelMetrics:
        """评估模型"""
        y_pred = model.predict(X_test)

        return ModelMetrics(
            accuracy=accuracy_score(y_test, y_pred),
            precision=precision_score(y_test, y_pred, average='weighted'),
            recall=recall_score(y_test, y_pred, average='weighted'),
            f1=f1_score(y_test, y_pred, average='weighted')
        )

    def run(self, params: Dict[str, Any]) -> str:
        """运行 Pipeline"""
        with mlflow.start_run() as run:
            # 记录参数
            mlflow.log_params(params)
            mlflow.log_param("data_path", self.config.data_path)
            mlflow.log_param("test_size", self.config.test_size)

            # 数据加载和准备
            df = self.load_data()
            X_train, X_test, y_train, y_test = self.prepare_data(df)

            mlflow.log_param("train_samples", len(X_train))
            mlflow.log_param("test_samples", len(X_test))

            # 训练
            model = self.train_model(X_train, y_train, params)

            # 评估
            metrics = self.evaluate_model(model, X_test, y_test)

            # 记录指标
            mlflow.log_metrics({
                "accuracy": metrics.accuracy,
                "precision": metrics.precision,
                "recall": metrics.recall,
                "f1": metrics.f1
            })

            # 记录模型
            mlflow.sklearn.log_model(
                model,
                "model",
                registered_model_name=self.config.model_name
            )

            return run.info.run_id

    def promote_model(self, run_id: str, stage: str = "Staging"):
        """推广模型到指定阶段"""
        # 获取模型版本
        filter_string = f"run_id='{run_id}'"
        versions = self.client.search_model_versions(filter_string)

        if versions:
            version = versions[0].version
            self.client.transition_model_version_stage(
                name=self.config.model_name,
                version=version,
                stage=stage
            )
            return version
        return None


class ExperimentComparator:
    """实验对比器"""

    def __init__(self, tracking_uri: str = "http://mlflow:5000"):
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()

    def compare_runs(
        self,
        experiment_name: str,
        metric: str = "f1",
        top_n: int = 10
    ) -> pd.DataFrame:
        """对比实验运行"""
        experiment = self.client.get_experiment_by_name(experiment_name)
        if not experiment:
            return pd.DataFrame()

        runs = self.client.search_runs(
            experiment_ids=[experiment.experiment_id],
            order_by=[f"metrics.{metric} DESC"],
            max_results=top_n
        )

        results = []
        for run in runs:
            results.append({
                "run_id": run.info.run_id,
                "run_name": run.info.run_name,
                **run.data.params,
                **run.data.metrics
            })

        return pd.DataFrame(results)

    def get_best_run(
        self,
        experiment_name: str,
        metric: str = "f1"
    ) -> Optional[str]:
        """获取最佳运行"""
        df = self.compare_runs(experiment_name, metric, top_n=1)
        if not df.empty:
            return df.iloc[0]["run_id"]
        return None


# 使用示例
if __name__ == "__main__":
    config = PipelineConfig(
        experiment_name="fraud-detection",
        model_name="fraud-classifier",
        data_path="s3://data/fraud-dataset.parquet",
        target_column="is_fraud"
    )

    pipeline = Level1Pipeline(config)

    # 运行多组参数实验
    param_grid = [
        {"n_estimators": 100, "max_depth": 5},
        {"n_estimators": 200, "max_depth": 10},
        {"n_estimators": 300, "max_depth": 15}
    ]

    for params in param_grid:
        run_id = pipeline.run(params)
        print(f"Completed run: {run_id}")

    # 对比实验
    comparator = ExperimentComparator()
    results = comparator.compare_runs("fraud-detection")
    print(results)

    # 推广最佳模型
    best_run = comparator.get_best_run("fraud-detection")
    if best_run:
        version = pipeline.promote_model(best_run, "Staging")
        print(f"Promoted version {version} to Staging")

Level 2: CI/CD Pipeline 自动化

# level-2-cicd-pipeline.yaml
# GitLab CI/CD Pipeline 配置
stages:
  - validate
  - test
  - train
  - evaluate
  - deploy

variables:
  MLFLOW_TRACKING_URI: "http://mlflow:5000"
  MODEL_NAME: "fraud-classifier"
  EXPERIMENT_NAME: "fraud-detection"

# 数据验证
validate_data:
  stage: validate
  image: python:3.9
  script:
    - pip install great_expectations pandas
    - python scripts/validate_data.py
  artifacts:
    reports:
      - data_validation_report.html

# 代码测试
test_code:
  stage: test
  image: python:3.9
  script:
    - pip install pytest pytest-cov
    - pytest tests/ --cov=src --cov-report=xml
  coverage: '/TOTAL.*\s+(\d+%)/'
  artifacts:
    reports:
      coverage_report:
        coverage_format: cobertura
        path: coverage.xml

# 模型测试
test_model:
  stage: test
  image: python:3.9
  script:
    - pip install -r requirements.txt
    - python scripts/test_model.py
  artifacts:
    reports:
      - model_test_report.json

# 训练模型
train_model:
  stage: train
  image: python:3.9
  tags:
    - gpu
  script:
    - pip install -r requirements.txt
    - python scripts/train.py --config configs/production.yaml
  artifacts:
    paths:
      - models/
      - metrics/

# 评估模型
evaluate_model:
  stage: evaluate
  image: python:3.9
  script:
    - pip install -r requirements.txt
    - python scripts/evaluate.py
    - python scripts/compare_with_baseline.py
  artifacts:
    reports:
      - evaluation_report.json
  rules:
    - if: $CI_COMMIT_BRANCH == "main"

# 部署到 Staging
deploy_staging:
  stage: deploy
  image: python:3.9
  script:
    - pip install mlflow
    - python scripts/deploy.py --stage staging
  environment:
    name: staging
    url: https://staging.api.example.com
  rules:
    - if: $CI_COMMIT_BRANCH == "main"
      when: manual

# 部署到 Production
deploy_production:
  stage: deploy
  image: python:3.9
  script:
    - pip install mlflow
    - python scripts/deploy.py --stage production
  environment:
    name: production
    url: https://api.example.com
  rules:
    - if: $CI_COMMIT_BRANCH == "main"
      when: manual
  needs:
    - deploy_staging
# level_2_implementation.py
"""
Level 2 MLOps 实现
CI/CD Pipeline 自动化
"""

import os
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import great_expectations as ge
from great_expectations.data_context import DataContext
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
import numpy as np


@dataclass
class DataValidationResult:
    """数据验证结果"""
    success: bool
    statistics: Dict[str, Any]
    failed_expectations: List[str]


@dataclass
class ModelTestResult:
    """模型测试结果"""
    passed: bool
    tests: List[Dict[str, Any]]
    coverage: float


class DataValidator:
    """数据验证器"""

    def __init__(self, context_root: str = "great_expectations"):
        self.context = DataContext(context_root_dir=context_root)

    def validate_dataset(
        self,
        df: pd.DataFrame,
        expectation_suite: str
    ) -> DataValidationResult:
        """验证数据集"""
        # 转换为 GE DataFrame
        ge_df = ge.from_pandas(df)

        # 运行验证
        results = ge_df.validate(
            expectation_suite=self.context.get_expectation_suite(expectation_suite)
        )

        failed = [
            r['expectation_config']['expectation_type']
            for r in results['results']
            if not r['success']
        ]

        return DataValidationResult(
            success=results['success'],
            statistics=results['statistics'],
            failed_expectations=failed
        )

    def create_expectations(
        self,
        df: pd.DataFrame,
        suite_name: str
    ):
        """创建期望套件"""
        ge_df = ge.from_pandas(df)

        # 自动生成基础期望
        ge_df.expect_table_row_count_to_be_between(min_value=1000)

        for column in df.columns:
            # 非空检查
            ge_df.expect_column_values_to_not_be_null(column)

            if df[column].dtype in ['int64', 'float64']:
                # 数值范围检查
                ge_df.expect_column_values_to_be_between(
                    column,
                    min_value=df[column].min(),
                    max_value=df[column].max()
                )
            elif df[column].dtype == 'object':
                # 类别值检查
                ge_df.expect_column_values_to_be_in_set(
                    column,
                    value_set=df[column].unique().tolist()
                )

        # 保存期望套件
        suite = ge_df.get_expectation_suite()
        self.context.save_expectation_suite(suite, suite_name)


class ModelTester:
    """模型测试器"""

    def __init__(self, model_uri: str):
        self.model = mlflow.pyfunc.load_model(model_uri)

    def test_prediction_shape(self, X: pd.DataFrame) -> Dict[str, Any]:
        """测试预测输出形状"""
        predictions = self.model.predict(X)

        return {
            "name": "prediction_shape",
            "passed": len(predictions) == len(X),
            "expected": len(X),
            "actual": len(predictions)
        }

    def test_prediction_range(
        self,
        X: pd.DataFrame,
        min_val: float = 0,
        max_val: float = 1
    ) -> Dict[str, Any]:
        """测试预测值范围"""
        predictions = self.model.predict(X)

        in_range = np.all((predictions >= min_val) & (predictions <= max_val))

        return {
            "name": "prediction_range",
            "passed": in_range,
            "min": float(predictions.min()),
            "max": float(predictions.max())
        }

    def test_latency(
        self,
        X: pd.DataFrame,
        max_latency_ms: float = 100
    ) -> Dict[str, Any]:
        """测试推理延迟"""
        import time

        # 单条预测
        single_times = []
        for i in range(min(100, len(X))):
            start = time.time()
            self.model.predict(X.iloc[[i]])
            single_times.append((time.time() - start) * 1000)

        avg_latency = np.mean(single_times)

        return {
            "name": "latency",
            "passed": avg_latency <= max_latency_ms,
            "avg_latency_ms": avg_latency,
            "p99_latency_ms": np.percentile(single_times, 99),
            "threshold_ms": max_latency_ms
        }

    def test_consistency(
        self,
        X: pd.DataFrame,
        n_runs: int = 5
    ) -> Dict[str, Any]:
        """测试预测一致性"""
        predictions = [self.model.predict(X) for _ in range(n_runs)]

        # 检查所有运行结果是否一致
        consistent = all(
            np.array_equal(predictions[0], p)
            for p in predictions[1:]
        )

        return {
            "name": "consistency",
            "passed": consistent,
            "runs": n_runs
        }

    def test_invariance(
        self,
        X: pd.DataFrame,
        invariant_columns: List[str]
    ) -> Dict[str, Any]:
        """测试不变性(扰动不应改变预测)"""
        original_pred = self.model.predict(X)

        # 对不变列添加噪声
        X_perturbed = X.copy()
        for col in invariant_columns:
            if col in X.columns:
                X_perturbed[col] = X_perturbed[col] + np.random.normal(0, 0.01, len(X))

        perturbed_pred = self.model.predict(X_perturbed)

        # 预测应该保持不变
        invariant = np.array_equal(original_pred, perturbed_pred)

        return {
            "name": "invariance",
            "passed": invariant,
            "columns_tested": invariant_columns
        }

    def run_all_tests(
        self,
        X: pd.DataFrame,
        config: Dict[str, Any]
    ) -> ModelTestResult:
        """运行所有测试"""
        tests = [
            self.test_prediction_shape(X),
            self.test_prediction_range(
                X,
                config.get("min_prediction", 0),
                config.get("max_prediction", 1)
            ),
            self.test_latency(X, config.get("max_latency_ms", 100)),
            self.test_consistency(X, config.get("consistency_runs", 5))
        ]

        if "invariant_columns" in config:
            tests.append(self.test_invariance(X, config["invariant_columns"]))

        passed = all(t["passed"] for t in tests)
        coverage = sum(1 for t in tests if t["passed"]) / len(tests)

        return ModelTestResult(
            passed=passed,
            tests=tests,
            coverage=coverage
        )


class ModelComparator:
    """模型对比器"""

    def __init__(self, tracking_uri: str = "http://mlflow:5000"):
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()

    def compare_with_baseline(
        self,
        model_name: str,
        new_version: str,
        metrics: List[str] = ["accuracy", "f1"]
    ) -> Dict[str, Any]:
        """与基线模型对比"""
        # 获取生产版本作为基线
        baseline_versions = self.client.get_latest_versions(
            model_name,
            stages=["Production"]
        )

        if not baseline_versions:
            return {
                "has_baseline": False,
                "comparison": None,
                "recommendation": "promote"
            }

        baseline = baseline_versions[0]

        # 获取两个版本的运行指标
        new_run = self.client.get_run(
            self.client.get_model_version(model_name, new_version).run_id
        )
        baseline_run = self.client.get_run(baseline.run_id)

        comparison = {}
        improvements = 0
        regressions = 0

        for metric in metrics:
            new_val = new_run.data.metrics.get(metric, 0)
            baseline_val = baseline_run.data.metrics.get(metric, 0)
            diff = new_val - baseline_val
            diff_pct = (diff / baseline_val * 100) if baseline_val != 0 else 0

            comparison[metric] = {
                "new": new_val,
                "baseline": baseline_val,
                "diff": diff,
                "diff_pct": diff_pct
            }

            if diff > 0:
                improvements += 1
            elif diff < 0:
                regressions += 1

        # 推荐决策
        if regressions == 0 and improvements > 0:
            recommendation = "promote"
        elif regressions > improvements:
            recommendation = "reject"
        else:
            recommendation = "review"

        return {
            "has_baseline": True,
            "baseline_version": baseline.version,
            "new_version": new_version,
            "comparison": comparison,
            "improvements": improvements,
            "regressions": regressions,
            "recommendation": recommendation
        }


class DeploymentManager:
    """部署管理器"""

    def __init__(self, tracking_uri: str = "http://mlflow:5000"):
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()

    def deploy_to_staging(
        self,
        model_name: str,
        version: str
    ) -> bool:
        """部署到 Staging"""
        try:
            self.client.transition_model_version_stage(
                name=model_name,
                version=version,
                stage="Staging"
            )
            return True
        except Exception as e:
            print(f"Staging deployment failed: {e}")
            return False

    def deploy_to_production(
        self,
        model_name: str,
        version: str,
        archive_existing: bool = True
    ) -> bool:
        """部署到 Production"""
        try:
            self.client.transition_model_version_stage(
                name=model_name,
                version=version,
                stage="Production",
                archive_existing_versions=archive_existing
            )
            return True
        except Exception as e:
            print(f"Production deployment failed: {e}")
            return False

    def rollback(
        self,
        model_name: str,
        to_version: str
    ) -> bool:
        """回滚到指定版本"""
        # 归档当前 Production 版本
        current_prod = self.client.get_latest_versions(
            model_name,
            stages=["Production"]
        )

        if current_prod:
            self.client.transition_model_version_stage(
                name=model_name,
                version=current_prod[0].version,
                stage="Archived"
            )

        # 提升目标版本到 Production
        return self.deploy_to_production(model_name, to_version, archive_existing=False)


# 使用示例
if __name__ == "__main__":
    # 数据验证
    validator = DataValidator()
    df = pd.read_parquet("data/test.parquet")
    validation_result = validator.validate_dataset(df, "training_data_suite")

    if not validation_result.success:
        print(f"Data validation failed: {validation_result.failed_expectations}")
        exit(1)

    # 模型测试
    tester = ModelTester("models:/fraud-classifier/Staging")
    test_result = tester.run_all_tests(df.drop(columns=["label"]), {
        "max_latency_ms": 50,
        "min_prediction": 0,
        "max_prediction": 1
    })

    if not test_result.passed:
        print(f"Model tests failed: {test_result.tests}")
        exit(1)

    # 基线对比
    comparator = ModelComparator()
    comparison = comparator.compare_with_baseline(
        "fraud-classifier",
        "5",
        metrics=["accuracy", "f1", "precision", "recall"]
    )

    print(f"Comparison result: {json.dumps(comparison, indent=2)}")

    # 部署
    if comparison["recommendation"] == "promote":
        deployer = DeploymentManager()
        deployer.deploy_to_production("fraud-classifier", "5")
        print("Model promoted to Production")

Level 3: 持续训练 (CT)

# level_3_implementation.py
"""
Level 3 MLOps 实现
持续训练 (Continuous Training)
"""

import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
from scipy import stats
from sklearn.metrics import accuracy_score
import mlflow
from mlflow.tracking import MlflowClient


@dataclass
class DriftDetectionResult:
    """漂移检测结果"""
    feature: str
    drift_detected: bool
    drift_score: float
    p_value: float
    threshold: float


@dataclass
class MonitoringMetrics:
    """监控指标"""
    timestamp: datetime
    prediction_count: int
    avg_latency_ms: float
    error_rate: float
    drift_scores: Dict[str, float]


class DataDriftDetector:
    """数据漂移检测器"""

    def __init__(self, reference_data: pd.DataFrame):
        self.reference = reference_data
        self.reference_stats = self._compute_stats(reference_data)

    def _compute_stats(self, df: pd.DataFrame) -> Dict[str, Dict]:
        """计算数据统计"""
        stats_dict = {}
        for col in df.columns:
            if df[col].dtype in ['int64', 'float64']:
                stats_dict[col] = {
                    "type": "numeric",
                    "mean": df[col].mean(),
                    "std": df[col].std(),
                    "min": df[col].min(),
                    "max": df[col].max(),
                    "distribution": df[col].values
                }
            else:
                stats_dict[col] = {
                    "type": "categorical",
                    "value_counts": df[col].value_counts(normalize=True).to_dict()
                }
        return stats_dict

    def detect_drift_ks(
        self,
        current_data: pd.DataFrame,
        column: str,
        threshold: float = 0.05
    ) -> DriftDetectionResult:
        """使用 KS 检验检测漂移"""
        if column not in self.reference_stats:
            raise ValueError(f"Column {column} not in reference data")

        ref_stats = self.reference_stats[column]
        if ref_stats["type"] != "numeric":
            raise ValueError(f"KS test only for numeric columns")

        # KS 检验
        statistic, p_value = stats.ks_2samp(
            ref_stats["distribution"],
            current_data[column].values
        )

        return DriftDetectionResult(
            feature=column,
            drift_detected=p_value < threshold,
            drift_score=statistic,
            p_value=p_value,
            threshold=threshold
        )

    def detect_drift_psi(
        self,
        current_data: pd.DataFrame,
        column: str,
        buckets: int = 10,
        threshold: float = 0.2
    ) -> DriftDetectionResult:
        """使用 PSI (Population Stability Index) 检测漂移"""
        if column not in self.reference_stats:
            raise ValueError(f"Column {column} not in reference data")

        ref_stats = self.reference_stats[column]

        if ref_stats["type"] == "numeric":
            # 数值型:分桶计算
            ref_values = ref_stats["distribution"]
            cur_values = current_data[column].values

            # 创建分桶
            _, bin_edges = np.histogram(ref_values, bins=buckets)

            # 计算各桶比例
            ref_counts, _ = np.histogram(ref_values, bins=bin_edges)
            cur_counts, _ = np.histogram(cur_values, bins=bin_edges)

            ref_pct = ref_counts / len(ref_values)
            cur_pct = cur_counts / len(cur_values)

        else:
            # 类别型:直接计算
            ref_dist = ref_stats["value_counts"]
            cur_dist = current_data[column].value_counts(normalize=True).to_dict()

            all_categories = set(ref_dist.keys()) | set(cur_dist.keys())
            ref_pct = np.array([ref_dist.get(c, 0.0001) for c in all_categories])
            cur_pct = np.array([cur_dist.get(c, 0.0001) for c in all_categories])

        # 计算 PSI
        ref_pct = np.clip(ref_pct, 0.0001, None)
        cur_pct = np.clip(cur_pct, 0.0001, None)

        psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))

        return DriftDetectionResult(
            feature=column,
            drift_detected=psi > threshold,
            drift_score=psi,
            p_value=0,  # PSI 不产生 p-value
            threshold=threshold
        )

    def detect_all_drift(
        self,
        current_data: pd.DataFrame,
        method: str = "psi"
    ) -> List[DriftDetectionResult]:
        """检测所有特征的漂移"""
        results = []
        for column in self.reference.columns:
            if column in current_data.columns:
                try:
                    if method == "ks" and self.reference_stats[column]["type"] == "numeric":
                        result = self.detect_drift_ks(current_data, column)
                    else:
                        result = self.detect_drift_psi(current_data, column)
                    results.append(result)
                except Exception as e:
                    print(f"Error detecting drift for {column}: {e}")

        return results


class ModelPerformanceMonitor:
    """模型性能监控器"""

    def __init__(
        self,
        model_name: str,
        tracking_uri: str = "http://mlflow:5000"
    ):
        self.model_name = model_name
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()
        self.baseline_metrics = self._get_baseline_metrics()

    def _get_baseline_metrics(self) -> Dict[str, float]:
        """获取基线指标"""
        versions = self.client.get_latest_versions(
            self.model_name,
            stages=["Production"]
        )

        if not versions:
            return {}

        run = self.client.get_run(versions[0].run_id)
        return run.data.metrics

    def compute_online_metrics(
        self,
        predictions: np.ndarray,
        actuals: np.ndarray
    ) -> Dict[str, float]:
        """计算在线指标"""
        return {
            "online_accuracy": accuracy_score(actuals, predictions),
            "prediction_positive_rate": np.mean(predictions),
            "actual_positive_rate": np.mean(actuals)
        }

    def detect_performance_degradation(
        self,
        current_metrics: Dict[str, float],
        threshold_pct: float = 0.05
    ) -> Dict[str, Any]:
        """检测性能下降"""
        degradations = {}

        for metric, baseline_value in self.baseline_metrics.items():
            if metric in current_metrics:
                current_value = current_metrics[metric]
                degradation = (baseline_value - current_value) / baseline_value

                if degradation > threshold_pct:
                    degradations[metric] = {
                        "baseline": baseline_value,
                        "current": current_value,
                        "degradation_pct": degradation * 100
                    }

        return {
            "degradation_detected": len(degradations) > 0,
            "degraded_metrics": degradations
        }


class ContinuousTrainingPipeline:
    """持续训练 Pipeline"""

    def __init__(
        self,
        model_name: str,
        experiment_name: str,
        training_fn: Callable,
        tracking_uri: str = "http://mlflow:5000"
    ):
        self.model_name = model_name
        self.experiment_name = experiment_name
        self.training_fn = training_fn
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()

    def should_retrain(
        self,
        drift_results: List[DriftDetectionResult],
        performance_result: Dict[str, Any],
        drift_threshold: int = 3,
        performance_threshold: bool = True
    ) -> Dict[str, Any]:
        """判断是否需要重训练"""
        drifted_features = [r for r in drift_results if r.drift_detected]
        drift_triggered = len(drifted_features) >= drift_threshold

        performance_triggered = (
            performance_threshold and
            performance_result.get("degradation_detected", False)
        )

        should_retrain = drift_triggered or performance_triggered

        return {
            "should_retrain": should_retrain,
            "triggers": {
                "data_drift": drift_triggered,
                "performance_degradation": performance_triggered
            },
            "details": {
                "drifted_features": [r.feature for r in drifted_features],
                "degraded_metrics": list(
                    performance_result.get("degraded_metrics", {}).keys()
                )
            }
        }

    def trigger_retraining(
        self,
        training_data: pd.DataFrame,
        reason: str,
        params: Optional[Dict[str, Any]] = None
    ) -> str:
        """触发重训练"""
        mlflow.set_experiment(self.experiment_name)

        with mlflow.start_run() as run:
            # 记录触发原因
            mlflow.set_tag("retrain_reason", reason)
            mlflow.set_tag("retrain_timestamp", datetime.now().isoformat())

            # 执行训练
            model, metrics = self.training_fn(training_data, params or {})

            # 记录参数和指标
            mlflow.log_params(params or {})
            mlflow.log_metrics(metrics)

            # 注册模型
            mlflow.sklearn.log_model(
                model,
                "model",
                registered_model_name=self.model_name
            )

            return run.info.run_id

    def auto_promote(
        self,
        run_id: str,
        validation_metrics: Dict[str, float],
        threshold_metrics: Dict[str, float]
    ) -> bool:
        """自动推广模型"""
        # 检查是否满足阈值
        for metric, threshold in threshold_metrics.items():
            if validation_metrics.get(metric, 0) < threshold:
                print(f"Metric {metric} below threshold: {validation_metrics.get(metric)} < {threshold}")
                return False

        # 获取新版本
        filter_string = f"run_id='{run_id}'"
        versions = self.client.search_model_versions(filter_string)

        if not versions:
            return False

        version = versions[0].version

        # 先推到 Staging
        self.client.transition_model_version_stage(
            name=self.model_name,
            version=version,
            stage="Staging"
        )

        # 如果通过验证,推到 Production
        self.client.transition_model_version_stage(
            name=self.model_name,
            version=version,
            stage="Production",
            archive_existing_versions=True
        )

        return True


class RetrainingOrchestrator:
    """重训练编排器"""

    def __init__(
        self,
        model_name: str,
        reference_data: pd.DataFrame,
        training_fn: Callable,
        tracking_uri: str = "http://mlflow:5000"
    ):
        self.model_name = model_name
        self.drift_detector = DataDriftDetector(reference_data)
        self.monitor = ModelPerformanceMonitor(model_name, tracking_uri)
        self.ct_pipeline = ContinuousTrainingPipeline(
            model_name,
            f"{model_name}-ct",
            training_fn,
            tracking_uri
        )

    def run_monitoring_cycle(
        self,
        current_data: pd.DataFrame,
        predictions: np.ndarray,
        actuals: np.ndarray
    ) -> Dict[str, Any]:
        """运行监控周期"""
        # 检测数据漂移
        drift_results = self.drift_detector.detect_all_drift(current_data)

        # 计算在线指标
        online_metrics = self.monitor.compute_online_metrics(predictions, actuals)

        # 检测性能下降
        performance_result = self.monitor.detect_performance_degradation(online_metrics)

        # 判断是否需要重训练
        retrain_decision = self.ct_pipeline.should_retrain(
            drift_results,
            performance_result
        )

        return {
            "timestamp": datetime.now().isoformat(),
            "drift_results": [
                {
                    "feature": r.feature,
                    "drift_detected": r.drift_detected,
                    "drift_score": r.drift_score
                }
                for r in drift_results
            ],
            "online_metrics": online_metrics,
            "performance_result": performance_result,
            "retrain_decision": retrain_decision
        }

    def execute_retraining_if_needed(
        self,
        monitoring_result: Dict[str, Any],
        training_data: pd.DataFrame,
        validation_data: pd.DataFrame,
        params: Optional[Dict[str, Any]] = None
    ) -> Optional[str]:
        """如果需要则执行重训练"""
        if not monitoring_result["retrain_decision"]["should_retrain"]:
            return None

        # 构建触发原因
        triggers = monitoring_result["retrain_decision"]["triggers"]
        reasons = []
        if triggers["data_drift"]:
            reasons.append("data_drift")
        if triggers["performance_degradation"]:
            reasons.append("performance_degradation")

        reason = ",".join(reasons)

        # 触发重训练
        run_id = self.ct_pipeline.trigger_retraining(
            training_data,
            reason,
            params
        )

        # 验证并自动推广
        # 这里需要实际的验证逻辑
        validation_metrics = {"accuracy": 0.95, "f1": 0.92}
        threshold_metrics = {"accuracy": 0.90, "f1": 0.85}

        self.ct_pipeline.auto_promote(run_id, validation_metrics, threshold_metrics)

        return run_id


# 使用示例
if __name__ == "__main__":
    # 模拟训练函数
    def train_model(data: pd.DataFrame, params: Dict[str, Any]):
        from sklearn.ensemble import RandomForestClassifier
        X = data.drop(columns=["label"])
        y = data["label"]
        model = RandomForestClassifier(**params)
        model.fit(X, y)
        metrics = {"accuracy": 0.95, "f1": 0.92}
        return model, metrics

    # 参考数据
    reference_data = pd.read_parquet("data/reference.parquet")

    # 创建编排器
    orchestrator = RetrainingOrchestrator(
        model_name="fraud-classifier",
        reference_data=reference_data,
        training_fn=train_model
    )

    # 模拟监控周期
    current_data = pd.read_parquet("data/current.parquet")
    predictions = np.random.randint(0, 2, 1000)
    actuals = np.random.randint(0, 2, 1000)

    # 运行监控
    result = orchestrator.run_monitoring_cycle(
        current_data.drop(columns=["label"]),
        predictions,
        actuals
    )

    print(f"Monitoring result: {result}")

    # 如果需要,执行重训练
    if result["retrain_decision"]["should_retrain"]:
        run_id = orchestrator.execute_retraining_if_needed(
            result,
            reference_data,
            current_data,
            params={"n_estimators": 100, "max_depth": 10}
        )
        print(f"Retrained model: {run_id}")

成熟度评估框架

评估维度

# maturity_assessment.py
"""
MLOps 成熟度评估框架
"""

from typing import Dict, List
from dataclasses import dataclass
from enum import Enum


class MaturityLevel(Enum):
    LEVEL_0 = 0
    LEVEL_1 = 1
    LEVEL_2 = 2
    LEVEL_3 = 3


@dataclass
class AssessmentQuestion:
    """评估问题"""
    dimension: str
    question: str
    level_0: str
    level_1: str
    level_2: str
    level_3: str


ASSESSMENT_QUESTIONS = [
    AssessmentQuestion(
        dimension="数据管理",
        question="数据版本控制",
        level_0="无版本控制",
        level_1="基础版本标记",
        level_2="DVC/LakeFS 系统化管理",
        level_3="自动化数据血缘追踪"
    ),
    AssessmentQuestion(
        dimension="数据管理",
        question="数据质量",
        level_0="无系统检查",
        level_1="手动检查",
        level_2="自动化验证 Pipeline",
        level_3="实时数据质量监控"
    ),
    AssessmentQuestion(
        dimension="实验管理",
        question="实验追踪",
        level_0="手动记录/无记录",
        level_1="MLflow/W&B 追踪",
        level_2="自动化实验对比",
        level_3="AutoML 集成"
    ),
    AssessmentQuestion(
        dimension="模型开发",
        question="特征工程",
        level_0="临时脚本",
        level_1="模块化代码",
        level_2="特征存储",
        level_3="实时特征服务"
    ),
    AssessmentQuestion(
        dimension="模型训练",
        question="训练流程",
        level_0="手动脚本执行",
        level_1="自动化 Pipeline",
        level_2="可复现训练",
        level_3="分布式/增量训练"
    ),
    AssessmentQuestion(
        dimension="模型注册",
        question="模型管理",
        level_0="文件系统存储",
        level_1="模型注册中心",
        level_2="版本+阶段管理",
        level_3="审批流程+血缘"
    ),
    AssessmentQuestion(
        dimension="部署",
        question="部署流程",
        level_0="手动部署",
        level_1="脚本化部署",
        level_2="CI/CD 自动部署",
        level_3="渐进式发布"
    ),
    AssessmentQuestion(
        dimension="监控",
        question="模型监控",
        level_0="无监控",
        level_1="基础指标监控",
        level_2="性能+漂移监控",
        level_3="自动告警+重训练"
    ),
    AssessmentQuestion(
        dimension="治理",
        question="模型治理",
        level_0="无治理",
        level_1="基础文档",
        level_2="模型卡+审计",
        level_3="合规自动化"
    ),
    AssessmentQuestion(
        dimension="团队",
        question="协作模式",
        level_0="孤立工作",
        level_1="代码共享",
        level_2="平台化协作",
        level_3="自助服务平台"
    )
]


class MaturityAssessor:
    """成熟度评估器"""

    def __init__(self):
        self.questions = ASSESSMENT_QUESTIONS

    def assess(self, answers: Dict[str, int]) -> Dict[str, any]:
        """执行评估"""
        dimension_scores = {}

        for q in self.questions:
            key = f"{q.dimension}_{q.question}"
            score = answers.get(key, 0)

            if q.dimension not in dimension_scores:
                dimension_scores[q.dimension] = []
            dimension_scores[q.dimension].append(score)

        # 计算各维度平均分
        dimension_averages = {
            dim: sum(scores) / len(scores)
            for dim, scores in dimension_scores.items()
        }

        # 整体成熟度
        overall_score = sum(dimension_averages.values()) / len(dimension_averages)
        overall_level = MaturityLevel(int(overall_score))

        return {
            "overall_level": overall_level.name,
            "overall_score": overall_score,
            "dimension_scores": dimension_averages,
            "recommendations": self._generate_recommendations(dimension_averages)
        }

    def _generate_recommendations(
        self,
        scores: Dict[str, float]
    ) -> List[str]:
        """生成改进建议"""
        recommendations = []

        # 找出最低分维度
        sorted_dims = sorted(scores.items(), key=lambda x: x[1])

        for dim, score in sorted_dims[:3]:  # 前3个最低分
            if score < 1:
                recommendations.append(f"{dim}: 建立基础自动化流程")
            elif score < 2:
                recommendations.append(f"{dim}: 实现完整的 CI/CD 集成")
            elif score < 3:
                recommendations.append(f"{dim}: 引入持续监控和自动化治理")

        return recommendations


# 使用示例
if __name__ == "__main__":
    assessor = MaturityAssessor()

    # 模拟回答
    answers = {
        "数据管理_数据版本控制": 2,
        "数据管理_数据质量": 1,
        "实验管理_实验追踪": 2,
        "模型开发_特征工程": 1,
        "模型训练_训练流程": 2,
        "模型注册_模型管理": 2,
        "部署_部署流程": 1,
        "监控_模型监控": 1,
        "治理_模型治理": 0,
        "团队_协作模式": 1
    }

    result = assessor.assess(answers)
    print(f"Overall Level: {result['overall_level']}")
    print(f"Overall Score: {result['overall_score']:.2f}")
    print("\nDimension Scores:")
    for dim, score in result['dimension_scores'].items():
        print(f"  {dim}: {score:.2f}")
    print("\nRecommendations:")
    for rec in result['recommendations']:
        print(f"  - {rec}")

总结

MLOps 成熟度模型帮助组织:

  1. 评估现状 - 了解当前 ML 系统的成熟度
  2. 规划路径 - 制定明确的演进计划
  3. 优先级排序 - 识别最需要改进的领域
  4. 度量进展 - 跟踪改进效果

关键要点:

  • Level 0→1:建立基础自动化
  • Level 1→2:实现完整 CI/CD
  • Level 2→3:引入持续训练

下一章节将探讨 Feature Store(特征存储)的设计与实现。

Prev
07-MLOps实践
Next
02-数据集工程