01-MLOps 成熟度模型
概述
MLOps(Machine Learning Operations)是将 DevOps 原则应用于机器学习系统的实践。本文介绍 MLOps 成熟度模型,帮助组织评估当前状态并规划演进路径。
MLOps 核心挑战
为什么 ML 系统与传统软件不同
┌─────────────────────────────────────────────────────────────────────┐
│ ML 系统的独特挑战 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 传统软件 vs ML 系统 │ │
│ │ │ │
│ │ 传统软件: ML 系统: │ │
│ │ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 代码 │──► 测试 ──► 部署 │ 代码+数据 │──► ? ──► 部署 │ │
│ │ └──────────┘ │ +模型+配置│ │ │
│ │ └──────────┘ │ │
│ │ • 确定性行为 • 概率性行为 │ │
│ │ • 单一产物 • 多种产物 │ │
│ │ • 版本控制成熟 • 数据/模型版本复杂 │ │
│ │ • 测试方法明确 • 测试方法不明确 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ ML 特有的技术债务 │ │
│ │ │ │
│ │ 数据依赖 特征工程 模型复杂性 配置管理 │ │
│ │ │ │ │ │ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ 数据质量 特征存储 模型版本 超参配置 │ │
│ │ 数据漂移 特征血缘 模型漂移 环境依赖 │ │
│ │ 数据测试 特征重用 A/B测试 实验追踪 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
ML 系统的构成要素
┌─────────────────────────────────────────────────────────────────────┐
│ ML 系统构成 (Google) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ ML Code │ ← 只占很小一部分 │
│ │ (5%) │ │
│ └─────────────────┘ │
│ │ │
│ ┌────────────────────────┴────────────────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────────┐ │
│ │Data │ │Feature │ │Configuration│ │Resource │ │
│ │Collection │ │Extraction │ │ │ │Management│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └──────────┘ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────────┐ │
│ │Data │ │Analysis │ │Process │ │Serving │ │
│ │Verification │ │Tools │ │Management │ │Infra │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └──────────┘ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │Machine │ │Monitoring │ ← 这些才是系统的主体 │
│ │Resource Mgmt│ │ │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
MLOps 成熟度等级
成熟度模型概览
┌─────────────────────────────────────────────────────────────────────┐
│ MLOps 成熟度等级 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Level 0: 手动流程 │
│ ├── 数据科学家手动训练模型 │
│ ├── 模型通过脚本或笔记本训练 │
│ ├── 手动部署到生产环境 │
│ └── 无持续监控 │
│ │
│ Level 1: ML Pipeline 自动化 │
│ ├── 自动化的训练 Pipeline │
│ ├── 实验追踪和版本管理 │
│ ├── 模型注册中心 │
│ └── 基础监控 │
│ │
│ Level 2: CI/CD Pipeline 自动化 │
│ ├── 完整的 CI/CD 流程 │
│ ├── 自动化测试(数据、模型、代码) │
│ ├── 自动化部署 │
│ └── 特征存储 │
│ │
│ Level 3: 持续训练 (CT) │
│ ├── 数据和模型漂移检测 │
│ ├── 自动触发重训练 │
│ ├── 在线学习能力 │
│ └── 完整的可观测性 │
│ │
└─────────────────────────────────────────────────────────────────────┘
Level 0: 手动流程
# level-0-characteristics.yaml
level: 0
name: "手动流程 (Manual Process)"
characteristics:
data_management:
- 数据手动收集和处理
- 无版本控制
- 本地存储为主
experimentation:
- Jupyter Notebook 开发
- 手动记录实验结果
- 参数调优凭经验
model_training:
- 手动执行训练脚本
- 无标准化流程
- 依赖个人环境
deployment:
- 手动部署模型
- 无标准化打包
- 部署周期长(数周-数月)
monitoring:
- 无系统性监控
- 问题靠用户反馈发现
- 无性能基线
team:
- 数据科学家主导
- 与工程团队脱节
- 知识难以传递
challenges:
- 模型难以复现
- 部署周期长
- 缺乏可追溯性
- 无法快速迭代
- 生产事故响应慢
typical_scenario: |
数据科学家在 Jupyter Notebook 中开发模型,
完成后将模型文件和部署脚本打包发送给工程团队,
工程团队手动部署到生产服务器。
当需要更新模型时,重复整个流程。
Level 1: ML Pipeline 自动化
# level_1_implementation.py
"""
Level 1 MLOps 实现示例
ML Pipeline 自动化
"""
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
@dataclass
class PipelineConfig:
"""Pipeline 配置"""
experiment_name: str
model_name: str
data_path: str
target_column: str
test_size: float = 0.2
random_state: int = 42
@dataclass
class ModelMetrics:
"""模型指标"""
accuracy: float
precision: float
recall: float
f1: float
class Level1Pipeline:
"""Level 1 MLOps Pipeline"""
def __init__(
self,
config: PipelineConfig,
tracking_uri: str = "http://mlflow:5000"
):
self.config = config
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(config.experiment_name)
self.client = MlflowClient()
def load_data(self) -> pd.DataFrame:
"""加载数据"""
return pd.read_parquet(self.config.data_path)
def prepare_data(self, df: pd.DataFrame) -> tuple:
"""数据准备"""
X = df.drop(columns=[self.config.target_column])
y = df[self.config.target_column]
return train_test_split(
X, y,
test_size=self.config.test_size,
random_state=self.config.random_state
)
def train_model(
self,
X_train: pd.DataFrame,
y_train: pd.Series,
params: Dict[str, Any]
) -> RandomForestClassifier:
"""训练模型"""
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
return model
def evaluate_model(
self,
model: RandomForestClassifier,
X_test: pd.DataFrame,
y_test: pd.Series
) -> ModelMetrics:
"""评估模型"""
y_pred = model.predict(X_test)
return ModelMetrics(
accuracy=accuracy_score(y_test, y_pred),
precision=precision_score(y_test, y_pred, average='weighted'),
recall=recall_score(y_test, y_pred, average='weighted'),
f1=f1_score(y_test, y_pred, average='weighted')
)
def run(self, params: Dict[str, Any]) -> str:
"""运行 Pipeline"""
with mlflow.start_run() as run:
# 记录参数
mlflow.log_params(params)
mlflow.log_param("data_path", self.config.data_path)
mlflow.log_param("test_size", self.config.test_size)
# 数据加载和准备
df = self.load_data()
X_train, X_test, y_train, y_test = self.prepare_data(df)
mlflow.log_param("train_samples", len(X_train))
mlflow.log_param("test_samples", len(X_test))
# 训练
model = self.train_model(X_train, y_train, params)
# 评估
metrics = self.evaluate_model(model, X_test, y_test)
# 记录指标
mlflow.log_metrics({
"accuracy": metrics.accuracy,
"precision": metrics.precision,
"recall": metrics.recall,
"f1": metrics.f1
})
# 记录模型
mlflow.sklearn.log_model(
model,
"model",
registered_model_name=self.config.model_name
)
return run.info.run_id
def promote_model(self, run_id: str, stage: str = "Staging"):
"""推广模型到指定阶段"""
# 获取模型版本
filter_string = f"run_id='{run_id}'"
versions = self.client.search_model_versions(filter_string)
if versions:
version = versions[0].version
self.client.transition_model_version_stage(
name=self.config.model_name,
version=version,
stage=stage
)
return version
return None
class ExperimentComparator:
"""实验对比器"""
def __init__(self, tracking_uri: str = "http://mlflow:5000"):
mlflow.set_tracking_uri(tracking_uri)
self.client = MlflowClient()
def compare_runs(
self,
experiment_name: str,
metric: str = "f1",
top_n: int = 10
) -> pd.DataFrame:
"""对比实验运行"""
experiment = self.client.get_experiment_by_name(experiment_name)
if not experiment:
return pd.DataFrame()
runs = self.client.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=[f"metrics.{metric} DESC"],
max_results=top_n
)
results = []
for run in runs:
results.append({
"run_id": run.info.run_id,
"run_name": run.info.run_name,
**run.data.params,
**run.data.metrics
})
return pd.DataFrame(results)
def get_best_run(
self,
experiment_name: str,
metric: str = "f1"
) -> Optional[str]:
"""获取最佳运行"""
df = self.compare_runs(experiment_name, metric, top_n=1)
if not df.empty:
return df.iloc[0]["run_id"]
return None
# 使用示例
if __name__ == "__main__":
config = PipelineConfig(
experiment_name="fraud-detection",
model_name="fraud-classifier",
data_path="s3://data/fraud-dataset.parquet",
target_column="is_fraud"
)
pipeline = Level1Pipeline(config)
# 运行多组参数实验
param_grid = [
{"n_estimators": 100, "max_depth": 5},
{"n_estimators": 200, "max_depth": 10},
{"n_estimators": 300, "max_depth": 15}
]
for params in param_grid:
run_id = pipeline.run(params)
print(f"Completed run: {run_id}")
# 对比实验
comparator = ExperimentComparator()
results = comparator.compare_runs("fraud-detection")
print(results)
# 推广最佳模型
best_run = comparator.get_best_run("fraud-detection")
if best_run:
version = pipeline.promote_model(best_run, "Staging")
print(f"Promoted version {version} to Staging")
Level 2: CI/CD Pipeline 自动化
# level-2-cicd-pipeline.yaml
# GitLab CI/CD Pipeline 配置
stages:
- validate
- test
- train
- evaluate
- deploy
variables:
MLFLOW_TRACKING_URI: "http://mlflow:5000"
MODEL_NAME: "fraud-classifier"
EXPERIMENT_NAME: "fraud-detection"
# 数据验证
validate_data:
stage: validate
image: python:3.9
script:
- pip install great_expectations pandas
- python scripts/validate_data.py
artifacts:
reports:
- data_validation_report.html
# 代码测试
test_code:
stage: test
image: python:3.9
script:
- pip install pytest pytest-cov
- pytest tests/ --cov=src --cov-report=xml
coverage: '/TOTAL.*\s+(\d+%)/'
artifacts:
reports:
coverage_report:
coverage_format: cobertura
path: coverage.xml
# 模型测试
test_model:
stage: test
image: python:3.9
script:
- pip install -r requirements.txt
- python scripts/test_model.py
artifacts:
reports:
- model_test_report.json
# 训练模型
train_model:
stage: train
image: python:3.9
tags:
- gpu
script:
- pip install -r requirements.txt
- python scripts/train.py --config configs/production.yaml
artifacts:
paths:
- models/
- metrics/
# 评估模型
evaluate_model:
stage: evaluate
image: python:3.9
script:
- pip install -r requirements.txt
- python scripts/evaluate.py
- python scripts/compare_with_baseline.py
artifacts:
reports:
- evaluation_report.json
rules:
- if: $CI_COMMIT_BRANCH == "main"
# 部署到 Staging
deploy_staging:
stage: deploy
image: python:3.9
script:
- pip install mlflow
- python scripts/deploy.py --stage staging
environment:
name: staging
url: https://staging.api.example.com
rules:
- if: $CI_COMMIT_BRANCH == "main"
when: manual
# 部署到 Production
deploy_production:
stage: deploy
image: python:3.9
script:
- pip install mlflow
- python scripts/deploy.py --stage production
environment:
name: production
url: https://api.example.com
rules:
- if: $CI_COMMIT_BRANCH == "main"
when: manual
needs:
- deploy_staging
# level_2_implementation.py
"""
Level 2 MLOps 实现
CI/CD Pipeline 自动化
"""
import os
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import great_expectations as ge
from great_expectations.data_context import DataContext
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
import numpy as np
@dataclass
class DataValidationResult:
"""数据验证结果"""
success: bool
statistics: Dict[str, Any]
failed_expectations: List[str]
@dataclass
class ModelTestResult:
"""模型测试结果"""
passed: bool
tests: List[Dict[str, Any]]
coverage: float
class DataValidator:
"""数据验证器"""
def __init__(self, context_root: str = "great_expectations"):
self.context = DataContext(context_root_dir=context_root)
def validate_dataset(
self,
df: pd.DataFrame,
expectation_suite: str
) -> DataValidationResult:
"""验证数据集"""
# 转换为 GE DataFrame
ge_df = ge.from_pandas(df)
# 运行验证
results = ge_df.validate(
expectation_suite=self.context.get_expectation_suite(expectation_suite)
)
failed = [
r['expectation_config']['expectation_type']
for r in results['results']
if not r['success']
]
return DataValidationResult(
success=results['success'],
statistics=results['statistics'],
failed_expectations=failed
)
def create_expectations(
self,
df: pd.DataFrame,
suite_name: str
):
"""创建期望套件"""
ge_df = ge.from_pandas(df)
# 自动生成基础期望
ge_df.expect_table_row_count_to_be_between(min_value=1000)
for column in df.columns:
# 非空检查
ge_df.expect_column_values_to_not_be_null(column)
if df[column].dtype in ['int64', 'float64']:
# 数值范围检查
ge_df.expect_column_values_to_be_between(
column,
min_value=df[column].min(),
max_value=df[column].max()
)
elif df[column].dtype == 'object':
# 类别值检查
ge_df.expect_column_values_to_be_in_set(
column,
value_set=df[column].unique().tolist()
)
# 保存期望套件
suite = ge_df.get_expectation_suite()
self.context.save_expectation_suite(suite, suite_name)
class ModelTester:
"""模型测试器"""
def __init__(self, model_uri: str):
self.model = mlflow.pyfunc.load_model(model_uri)
def test_prediction_shape(self, X: pd.DataFrame) -> Dict[str, Any]:
"""测试预测输出形状"""
predictions = self.model.predict(X)
return {
"name": "prediction_shape",
"passed": len(predictions) == len(X),
"expected": len(X),
"actual": len(predictions)
}
def test_prediction_range(
self,
X: pd.DataFrame,
min_val: float = 0,
max_val: float = 1
) -> Dict[str, Any]:
"""测试预测值范围"""
predictions = self.model.predict(X)
in_range = np.all((predictions >= min_val) & (predictions <= max_val))
return {
"name": "prediction_range",
"passed": in_range,
"min": float(predictions.min()),
"max": float(predictions.max())
}
def test_latency(
self,
X: pd.DataFrame,
max_latency_ms: float = 100
) -> Dict[str, Any]:
"""测试推理延迟"""
import time
# 单条预测
single_times = []
for i in range(min(100, len(X))):
start = time.time()
self.model.predict(X.iloc[[i]])
single_times.append((time.time() - start) * 1000)
avg_latency = np.mean(single_times)
return {
"name": "latency",
"passed": avg_latency <= max_latency_ms,
"avg_latency_ms": avg_latency,
"p99_latency_ms": np.percentile(single_times, 99),
"threshold_ms": max_latency_ms
}
def test_consistency(
self,
X: pd.DataFrame,
n_runs: int = 5
) -> Dict[str, Any]:
"""测试预测一致性"""
predictions = [self.model.predict(X) for _ in range(n_runs)]
# 检查所有运行结果是否一致
consistent = all(
np.array_equal(predictions[0], p)
for p in predictions[1:]
)
return {
"name": "consistency",
"passed": consistent,
"runs": n_runs
}
def test_invariance(
self,
X: pd.DataFrame,
invariant_columns: List[str]
) -> Dict[str, Any]:
"""测试不变性(扰动不应改变预测)"""
original_pred = self.model.predict(X)
# 对不变列添加噪声
X_perturbed = X.copy()
for col in invariant_columns:
if col in X.columns:
X_perturbed[col] = X_perturbed[col] + np.random.normal(0, 0.01, len(X))
perturbed_pred = self.model.predict(X_perturbed)
# 预测应该保持不变
invariant = np.array_equal(original_pred, perturbed_pred)
return {
"name": "invariance",
"passed": invariant,
"columns_tested": invariant_columns
}
def run_all_tests(
self,
X: pd.DataFrame,
config: Dict[str, Any]
) -> ModelTestResult:
"""运行所有测试"""
tests = [
self.test_prediction_shape(X),
self.test_prediction_range(
X,
config.get("min_prediction", 0),
config.get("max_prediction", 1)
),
self.test_latency(X, config.get("max_latency_ms", 100)),
self.test_consistency(X, config.get("consistency_runs", 5))
]
if "invariant_columns" in config:
tests.append(self.test_invariance(X, config["invariant_columns"]))
passed = all(t["passed"] for t in tests)
coverage = sum(1 for t in tests if t["passed"]) / len(tests)
return ModelTestResult(
passed=passed,
tests=tests,
coverage=coverage
)
class ModelComparator:
"""模型对比器"""
def __init__(self, tracking_uri: str = "http://mlflow:5000"):
mlflow.set_tracking_uri(tracking_uri)
self.client = MlflowClient()
def compare_with_baseline(
self,
model_name: str,
new_version: str,
metrics: List[str] = ["accuracy", "f1"]
) -> Dict[str, Any]:
"""与基线模型对比"""
# 获取生产版本作为基线
baseline_versions = self.client.get_latest_versions(
model_name,
stages=["Production"]
)
if not baseline_versions:
return {
"has_baseline": False,
"comparison": None,
"recommendation": "promote"
}
baseline = baseline_versions[0]
# 获取两个版本的运行指标
new_run = self.client.get_run(
self.client.get_model_version(model_name, new_version).run_id
)
baseline_run = self.client.get_run(baseline.run_id)
comparison = {}
improvements = 0
regressions = 0
for metric in metrics:
new_val = new_run.data.metrics.get(metric, 0)
baseline_val = baseline_run.data.metrics.get(metric, 0)
diff = new_val - baseline_val
diff_pct = (diff / baseline_val * 100) if baseline_val != 0 else 0
comparison[metric] = {
"new": new_val,
"baseline": baseline_val,
"diff": diff,
"diff_pct": diff_pct
}
if diff > 0:
improvements += 1
elif diff < 0:
regressions += 1
# 推荐决策
if regressions == 0 and improvements > 0:
recommendation = "promote"
elif regressions > improvements:
recommendation = "reject"
else:
recommendation = "review"
return {
"has_baseline": True,
"baseline_version": baseline.version,
"new_version": new_version,
"comparison": comparison,
"improvements": improvements,
"regressions": regressions,
"recommendation": recommendation
}
class DeploymentManager:
"""部署管理器"""
def __init__(self, tracking_uri: str = "http://mlflow:5000"):
mlflow.set_tracking_uri(tracking_uri)
self.client = MlflowClient()
def deploy_to_staging(
self,
model_name: str,
version: str
) -> bool:
"""部署到 Staging"""
try:
self.client.transition_model_version_stage(
name=model_name,
version=version,
stage="Staging"
)
return True
except Exception as e:
print(f"Staging deployment failed: {e}")
return False
def deploy_to_production(
self,
model_name: str,
version: str,
archive_existing: bool = True
) -> bool:
"""部署到 Production"""
try:
self.client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production",
archive_existing_versions=archive_existing
)
return True
except Exception as e:
print(f"Production deployment failed: {e}")
return False
def rollback(
self,
model_name: str,
to_version: str
) -> bool:
"""回滚到指定版本"""
# 归档当前 Production 版本
current_prod = self.client.get_latest_versions(
model_name,
stages=["Production"]
)
if current_prod:
self.client.transition_model_version_stage(
name=model_name,
version=current_prod[0].version,
stage="Archived"
)
# 提升目标版本到 Production
return self.deploy_to_production(model_name, to_version, archive_existing=False)
# 使用示例
if __name__ == "__main__":
# 数据验证
validator = DataValidator()
df = pd.read_parquet("data/test.parquet")
validation_result = validator.validate_dataset(df, "training_data_suite")
if not validation_result.success:
print(f"Data validation failed: {validation_result.failed_expectations}")
exit(1)
# 模型测试
tester = ModelTester("models:/fraud-classifier/Staging")
test_result = tester.run_all_tests(df.drop(columns=["label"]), {
"max_latency_ms": 50,
"min_prediction": 0,
"max_prediction": 1
})
if not test_result.passed:
print(f"Model tests failed: {test_result.tests}")
exit(1)
# 基线对比
comparator = ModelComparator()
comparison = comparator.compare_with_baseline(
"fraud-classifier",
"5",
metrics=["accuracy", "f1", "precision", "recall"]
)
print(f"Comparison result: {json.dumps(comparison, indent=2)}")
# 部署
if comparison["recommendation"] == "promote":
deployer = DeploymentManager()
deployer.deploy_to_production("fraud-classifier", "5")
print("Model promoted to Production")
Level 3: 持续训练 (CT)
# level_3_implementation.py
"""
Level 3 MLOps 实现
持续训练 (Continuous Training)
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
from scipy import stats
from sklearn.metrics import accuracy_score
import mlflow
from mlflow.tracking import MlflowClient
@dataclass
class DriftDetectionResult:
"""漂移检测结果"""
feature: str
drift_detected: bool
drift_score: float
p_value: float
threshold: float
@dataclass
class MonitoringMetrics:
"""监控指标"""
timestamp: datetime
prediction_count: int
avg_latency_ms: float
error_rate: float
drift_scores: Dict[str, float]
class DataDriftDetector:
"""数据漂移检测器"""
def __init__(self, reference_data: pd.DataFrame):
self.reference = reference_data
self.reference_stats = self._compute_stats(reference_data)
def _compute_stats(self, df: pd.DataFrame) -> Dict[str, Dict]:
"""计算数据统计"""
stats_dict = {}
for col in df.columns:
if df[col].dtype in ['int64', 'float64']:
stats_dict[col] = {
"type": "numeric",
"mean": df[col].mean(),
"std": df[col].std(),
"min": df[col].min(),
"max": df[col].max(),
"distribution": df[col].values
}
else:
stats_dict[col] = {
"type": "categorical",
"value_counts": df[col].value_counts(normalize=True).to_dict()
}
return stats_dict
def detect_drift_ks(
self,
current_data: pd.DataFrame,
column: str,
threshold: float = 0.05
) -> DriftDetectionResult:
"""使用 KS 检验检测漂移"""
if column not in self.reference_stats:
raise ValueError(f"Column {column} not in reference data")
ref_stats = self.reference_stats[column]
if ref_stats["type"] != "numeric":
raise ValueError(f"KS test only for numeric columns")
# KS 检验
statistic, p_value = stats.ks_2samp(
ref_stats["distribution"],
current_data[column].values
)
return DriftDetectionResult(
feature=column,
drift_detected=p_value < threshold,
drift_score=statistic,
p_value=p_value,
threshold=threshold
)
def detect_drift_psi(
self,
current_data: pd.DataFrame,
column: str,
buckets: int = 10,
threshold: float = 0.2
) -> DriftDetectionResult:
"""使用 PSI (Population Stability Index) 检测漂移"""
if column not in self.reference_stats:
raise ValueError(f"Column {column} not in reference data")
ref_stats = self.reference_stats[column]
if ref_stats["type"] == "numeric":
# 数值型:分桶计算
ref_values = ref_stats["distribution"]
cur_values = current_data[column].values
# 创建分桶
_, bin_edges = np.histogram(ref_values, bins=buckets)
# 计算各桶比例
ref_counts, _ = np.histogram(ref_values, bins=bin_edges)
cur_counts, _ = np.histogram(cur_values, bins=bin_edges)
ref_pct = ref_counts / len(ref_values)
cur_pct = cur_counts / len(cur_values)
else:
# 类别型:直接计算
ref_dist = ref_stats["value_counts"]
cur_dist = current_data[column].value_counts(normalize=True).to_dict()
all_categories = set(ref_dist.keys()) | set(cur_dist.keys())
ref_pct = np.array([ref_dist.get(c, 0.0001) for c in all_categories])
cur_pct = np.array([cur_dist.get(c, 0.0001) for c in all_categories])
# 计算 PSI
ref_pct = np.clip(ref_pct, 0.0001, None)
cur_pct = np.clip(cur_pct, 0.0001, None)
psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
return DriftDetectionResult(
feature=column,
drift_detected=psi > threshold,
drift_score=psi,
p_value=0, # PSI 不产生 p-value
threshold=threshold
)
def detect_all_drift(
self,
current_data: pd.DataFrame,
method: str = "psi"
) -> List[DriftDetectionResult]:
"""检测所有特征的漂移"""
results = []
for column in self.reference.columns:
if column in current_data.columns:
try:
if method == "ks" and self.reference_stats[column]["type"] == "numeric":
result = self.detect_drift_ks(current_data, column)
else:
result = self.detect_drift_psi(current_data, column)
results.append(result)
except Exception as e:
print(f"Error detecting drift for {column}: {e}")
return results
class ModelPerformanceMonitor:
"""模型性能监控器"""
def __init__(
self,
model_name: str,
tracking_uri: str = "http://mlflow:5000"
):
self.model_name = model_name
mlflow.set_tracking_uri(tracking_uri)
self.client = MlflowClient()
self.baseline_metrics = self._get_baseline_metrics()
def _get_baseline_metrics(self) -> Dict[str, float]:
"""获取基线指标"""
versions = self.client.get_latest_versions(
self.model_name,
stages=["Production"]
)
if not versions:
return {}
run = self.client.get_run(versions[0].run_id)
return run.data.metrics
def compute_online_metrics(
self,
predictions: np.ndarray,
actuals: np.ndarray
) -> Dict[str, float]:
"""计算在线指标"""
return {
"online_accuracy": accuracy_score(actuals, predictions),
"prediction_positive_rate": np.mean(predictions),
"actual_positive_rate": np.mean(actuals)
}
def detect_performance_degradation(
self,
current_metrics: Dict[str, float],
threshold_pct: float = 0.05
) -> Dict[str, Any]:
"""检测性能下降"""
degradations = {}
for metric, baseline_value in self.baseline_metrics.items():
if metric in current_metrics:
current_value = current_metrics[metric]
degradation = (baseline_value - current_value) / baseline_value
if degradation > threshold_pct:
degradations[metric] = {
"baseline": baseline_value,
"current": current_value,
"degradation_pct": degradation * 100
}
return {
"degradation_detected": len(degradations) > 0,
"degraded_metrics": degradations
}
class ContinuousTrainingPipeline:
"""持续训练 Pipeline"""
def __init__(
self,
model_name: str,
experiment_name: str,
training_fn: Callable,
tracking_uri: str = "http://mlflow:5000"
):
self.model_name = model_name
self.experiment_name = experiment_name
self.training_fn = training_fn
mlflow.set_tracking_uri(tracking_uri)
self.client = MlflowClient()
def should_retrain(
self,
drift_results: List[DriftDetectionResult],
performance_result: Dict[str, Any],
drift_threshold: int = 3,
performance_threshold: bool = True
) -> Dict[str, Any]:
"""判断是否需要重训练"""
drifted_features = [r for r in drift_results if r.drift_detected]
drift_triggered = len(drifted_features) >= drift_threshold
performance_triggered = (
performance_threshold and
performance_result.get("degradation_detected", False)
)
should_retrain = drift_triggered or performance_triggered
return {
"should_retrain": should_retrain,
"triggers": {
"data_drift": drift_triggered,
"performance_degradation": performance_triggered
},
"details": {
"drifted_features": [r.feature for r in drifted_features],
"degraded_metrics": list(
performance_result.get("degraded_metrics", {}).keys()
)
}
}
def trigger_retraining(
self,
training_data: pd.DataFrame,
reason: str,
params: Optional[Dict[str, Any]] = None
) -> str:
"""触发重训练"""
mlflow.set_experiment(self.experiment_name)
with mlflow.start_run() as run:
# 记录触发原因
mlflow.set_tag("retrain_reason", reason)
mlflow.set_tag("retrain_timestamp", datetime.now().isoformat())
# 执行训练
model, metrics = self.training_fn(training_data, params or {})
# 记录参数和指标
mlflow.log_params(params or {})
mlflow.log_metrics(metrics)
# 注册模型
mlflow.sklearn.log_model(
model,
"model",
registered_model_name=self.model_name
)
return run.info.run_id
def auto_promote(
self,
run_id: str,
validation_metrics: Dict[str, float],
threshold_metrics: Dict[str, float]
) -> bool:
"""自动推广模型"""
# 检查是否满足阈值
for metric, threshold in threshold_metrics.items():
if validation_metrics.get(metric, 0) < threshold:
print(f"Metric {metric} below threshold: {validation_metrics.get(metric)} < {threshold}")
return False
# 获取新版本
filter_string = f"run_id='{run_id}'"
versions = self.client.search_model_versions(filter_string)
if not versions:
return False
version = versions[0].version
# 先推到 Staging
self.client.transition_model_version_stage(
name=self.model_name,
version=version,
stage="Staging"
)
# 如果通过验证,推到 Production
self.client.transition_model_version_stage(
name=self.model_name,
version=version,
stage="Production",
archive_existing_versions=True
)
return True
class RetrainingOrchestrator:
"""重训练编排器"""
def __init__(
self,
model_name: str,
reference_data: pd.DataFrame,
training_fn: Callable,
tracking_uri: str = "http://mlflow:5000"
):
self.model_name = model_name
self.drift_detector = DataDriftDetector(reference_data)
self.monitor = ModelPerformanceMonitor(model_name, tracking_uri)
self.ct_pipeline = ContinuousTrainingPipeline(
model_name,
f"{model_name}-ct",
training_fn,
tracking_uri
)
def run_monitoring_cycle(
self,
current_data: pd.DataFrame,
predictions: np.ndarray,
actuals: np.ndarray
) -> Dict[str, Any]:
"""运行监控周期"""
# 检测数据漂移
drift_results = self.drift_detector.detect_all_drift(current_data)
# 计算在线指标
online_metrics = self.monitor.compute_online_metrics(predictions, actuals)
# 检测性能下降
performance_result = self.monitor.detect_performance_degradation(online_metrics)
# 判断是否需要重训练
retrain_decision = self.ct_pipeline.should_retrain(
drift_results,
performance_result
)
return {
"timestamp": datetime.now().isoformat(),
"drift_results": [
{
"feature": r.feature,
"drift_detected": r.drift_detected,
"drift_score": r.drift_score
}
for r in drift_results
],
"online_metrics": online_metrics,
"performance_result": performance_result,
"retrain_decision": retrain_decision
}
def execute_retraining_if_needed(
self,
monitoring_result: Dict[str, Any],
training_data: pd.DataFrame,
validation_data: pd.DataFrame,
params: Optional[Dict[str, Any]] = None
) -> Optional[str]:
"""如果需要则执行重训练"""
if not monitoring_result["retrain_decision"]["should_retrain"]:
return None
# 构建触发原因
triggers = monitoring_result["retrain_decision"]["triggers"]
reasons = []
if triggers["data_drift"]:
reasons.append("data_drift")
if triggers["performance_degradation"]:
reasons.append("performance_degradation")
reason = ",".join(reasons)
# 触发重训练
run_id = self.ct_pipeline.trigger_retraining(
training_data,
reason,
params
)
# 验证并自动推广
# 这里需要实际的验证逻辑
validation_metrics = {"accuracy": 0.95, "f1": 0.92}
threshold_metrics = {"accuracy": 0.90, "f1": 0.85}
self.ct_pipeline.auto_promote(run_id, validation_metrics, threshold_metrics)
return run_id
# 使用示例
if __name__ == "__main__":
# 模拟训练函数
def train_model(data: pd.DataFrame, params: Dict[str, Any]):
from sklearn.ensemble import RandomForestClassifier
X = data.drop(columns=["label"])
y = data["label"]
model = RandomForestClassifier(**params)
model.fit(X, y)
metrics = {"accuracy": 0.95, "f1": 0.92}
return model, metrics
# 参考数据
reference_data = pd.read_parquet("data/reference.parquet")
# 创建编排器
orchestrator = RetrainingOrchestrator(
model_name="fraud-classifier",
reference_data=reference_data,
training_fn=train_model
)
# 模拟监控周期
current_data = pd.read_parquet("data/current.parquet")
predictions = np.random.randint(0, 2, 1000)
actuals = np.random.randint(0, 2, 1000)
# 运行监控
result = orchestrator.run_monitoring_cycle(
current_data.drop(columns=["label"]),
predictions,
actuals
)
print(f"Monitoring result: {result}")
# 如果需要,执行重训练
if result["retrain_decision"]["should_retrain"]:
run_id = orchestrator.execute_retraining_if_needed(
result,
reference_data,
current_data,
params={"n_estimators": 100, "max_depth": 10}
)
print(f"Retrained model: {run_id}")
成熟度评估框架
评估维度
# maturity_assessment.py
"""
MLOps 成熟度评估框架
"""
from typing import Dict, List
from dataclasses import dataclass
from enum import Enum
class MaturityLevel(Enum):
LEVEL_0 = 0
LEVEL_1 = 1
LEVEL_2 = 2
LEVEL_3 = 3
@dataclass
class AssessmentQuestion:
"""评估问题"""
dimension: str
question: str
level_0: str
level_1: str
level_2: str
level_3: str
ASSESSMENT_QUESTIONS = [
AssessmentQuestion(
dimension="数据管理",
question="数据版本控制",
level_0="无版本控制",
level_1="基础版本标记",
level_2="DVC/LakeFS 系统化管理",
level_3="自动化数据血缘追踪"
),
AssessmentQuestion(
dimension="数据管理",
question="数据质量",
level_0="无系统检查",
level_1="手动检查",
level_2="自动化验证 Pipeline",
level_3="实时数据质量监控"
),
AssessmentQuestion(
dimension="实验管理",
question="实验追踪",
level_0="手动记录/无记录",
level_1="MLflow/W&B 追踪",
level_2="自动化实验对比",
level_3="AutoML 集成"
),
AssessmentQuestion(
dimension="模型开发",
question="特征工程",
level_0="临时脚本",
level_1="模块化代码",
level_2="特征存储",
level_3="实时特征服务"
),
AssessmentQuestion(
dimension="模型训练",
question="训练流程",
level_0="手动脚本执行",
level_1="自动化 Pipeline",
level_2="可复现训练",
level_3="分布式/增量训练"
),
AssessmentQuestion(
dimension="模型注册",
question="模型管理",
level_0="文件系统存储",
level_1="模型注册中心",
level_2="版本+阶段管理",
level_3="审批流程+血缘"
),
AssessmentQuestion(
dimension="部署",
question="部署流程",
level_0="手动部署",
level_1="脚本化部署",
level_2="CI/CD 自动部署",
level_3="渐进式发布"
),
AssessmentQuestion(
dimension="监控",
question="模型监控",
level_0="无监控",
level_1="基础指标监控",
level_2="性能+漂移监控",
level_3="自动告警+重训练"
),
AssessmentQuestion(
dimension="治理",
question="模型治理",
level_0="无治理",
level_1="基础文档",
level_2="模型卡+审计",
level_3="合规自动化"
),
AssessmentQuestion(
dimension="团队",
question="协作模式",
level_0="孤立工作",
level_1="代码共享",
level_2="平台化协作",
level_3="自助服务平台"
)
]
class MaturityAssessor:
"""成熟度评估器"""
def __init__(self):
self.questions = ASSESSMENT_QUESTIONS
def assess(self, answers: Dict[str, int]) -> Dict[str, any]:
"""执行评估"""
dimension_scores = {}
for q in self.questions:
key = f"{q.dimension}_{q.question}"
score = answers.get(key, 0)
if q.dimension not in dimension_scores:
dimension_scores[q.dimension] = []
dimension_scores[q.dimension].append(score)
# 计算各维度平均分
dimension_averages = {
dim: sum(scores) / len(scores)
for dim, scores in dimension_scores.items()
}
# 整体成熟度
overall_score = sum(dimension_averages.values()) / len(dimension_averages)
overall_level = MaturityLevel(int(overall_score))
return {
"overall_level": overall_level.name,
"overall_score": overall_score,
"dimension_scores": dimension_averages,
"recommendations": self._generate_recommendations(dimension_averages)
}
def _generate_recommendations(
self,
scores: Dict[str, float]
) -> List[str]:
"""生成改进建议"""
recommendations = []
# 找出最低分维度
sorted_dims = sorted(scores.items(), key=lambda x: x[1])
for dim, score in sorted_dims[:3]: # 前3个最低分
if score < 1:
recommendations.append(f"{dim}: 建立基础自动化流程")
elif score < 2:
recommendations.append(f"{dim}: 实现完整的 CI/CD 集成")
elif score < 3:
recommendations.append(f"{dim}: 引入持续监控和自动化治理")
return recommendations
# 使用示例
if __name__ == "__main__":
assessor = MaturityAssessor()
# 模拟回答
answers = {
"数据管理_数据版本控制": 2,
"数据管理_数据质量": 1,
"实验管理_实验追踪": 2,
"模型开发_特征工程": 1,
"模型训练_训练流程": 2,
"模型注册_模型管理": 2,
"部署_部署流程": 1,
"监控_模型监控": 1,
"治理_模型治理": 0,
"团队_协作模式": 1
}
result = assessor.assess(answers)
print(f"Overall Level: {result['overall_level']}")
print(f"Overall Score: {result['overall_score']:.2f}")
print("\nDimension Scores:")
for dim, score in result['dimension_scores'].items():
print(f" {dim}: {score:.2f}")
print("\nRecommendations:")
for rec in result['recommendations']:
print(f" - {rec}")
总结
MLOps 成熟度模型帮助组织:
- 评估现状 - 了解当前 ML 系统的成熟度
- 规划路径 - 制定明确的演进计划
- 优先级排序 - 识别最需要改进的领域
- 度量进展 - 跟踪改进效果
关键要点:
- Level 0→1:建立基础自动化
- Level 1→2:实现完整 CI/CD
- Level 2→3:引入持续训练
下一章节将探讨 Feature Store(特征存储)的设计与实现。