Kubeflow Pipelines 深度实践
概述
Kubeflow Pipelines(KFP)是 Kubernetes 上最流行的 ML 工作流引擎,提供端到端的机器学习流水线编排能力。本文深入探讨 KFP 的架构、组件开发及最佳实践。
KFP 架构
整体架构
┌─────────────────────────────────────────────────────────────────┐
│ Kubeflow Pipelines 架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 用户接口 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Web UI │ │ Python │ │ CLI │ │ REST │ │ │
│ │ │ │ │ SDK │ │ (kfp) │ │ API │ │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │
│ └───────┼─────────────┼─────────────┼─────────────┼────────┘ │
│ └─────────────┴──────┬──────┴─────────────┘ │
│ │ │
│ ════════════════════════════╪════════════════════════════════ │
│ │ │
│ KFP 服务层 │ │
│ ┌────────────────────────────┴─────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ KFP API Server │ │ │
│ │ │ • Pipeline CRUD │ │ │
│ │ │ • Run 管理 │ │ │
│ │ │ • Experiment 管理 │ │ │
│ │ │ • Artifact 管理 │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌───────────────────┐ ┌───────────────────────────┐ │ │
│ │ │ Persistence │ │ Visualization │ │ │
│ │ │ Agent │ │ Server │ │ │
│ │ │ │ │ │ │ │
│ │ │ • 状态持久化 │ │ • 可视化渲染 │ │ │
│ │ │ • 事件记录 │ │ • 指标图表 │ │ │
│ │ └───────────────────┘ └───────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Scheduled Workflow │ │ │
│ │ │ Controller │ │ │
│ │ │ • 定时触发 │ │ │
│ │ │ • Cron 调度 │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ════════════════════════════╪════════════════════════════════ │
│ │ │
│ 执行层 (Argo Workflows) │ │
│ ┌────────────────────────────┴─────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Argo Workflow Controller │ │ │
│ │ │ • DAG 解析执行 │ │ │
│ │ │ • Pod 调度 │ │ │
│ │ │ • 状态管理 │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Step 1 │ │ Step 2 │ │ Step 3 │ │ Step N │ │ │
│ │ │ Pod │ │ Pod │ │ Pod │ │ Pod │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ════════════════════════════╪════════════════════════════════ │
│ │ │
│ 存储层 │ │
│ ┌────────────────────────────┴─────────────────────────────┐ │
│ │ │ │
│ │ ┌───────────────┐ ┌───────────────┐ ┌──────────────┐ │ │
│ │ │ MySQL │ │ MinIO │ │ ML │ │ │
│ │ │ Metadata │ │ Artifacts │ │ Metadata │ │ │
│ │ │ │ │ │ │ (MLMD) │ │ │
│ │ │ • Pipeline │ │ • 模型文件 │ │ • 血缘追溯 │ │ │
│ │ │ • Run 记录 │ │ • 数据集 │ │ • 版本管理 │ │ │
│ │ │ • 参数配置 │ │ • 日志文件 │ │ • 指标记录 │ │ │
│ │ └───────────────┘ └───────────────┘ └──────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
KFP v2 SDK 架构
┌─────────────────────────────────────────────────────────────────┐
│ KFP v2 SDK 架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Python 代码 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ @dsl.component │ │
│ │ def preprocess(data: Input[Dataset]) -> Output[Dataset]:│ │
│ │ ... │ │
│ │ │ │
│ │ @dsl.pipeline │ │
│ │ def ml_pipeline(): │ │
│ │ preprocess_task = preprocess(...) │ │
│ │ train_task = train(preprocess_task.output) │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ KFP Compiler │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ • 类型检查 │ │ │
│ │ │ • DAG 构建 │ │ │
│ │ │ • IR 生成 │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Pipeline IR (YAML/JSON) │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ pipelineSpec: │ │ │
│ │ │ components: │ │ │
│ │ │ - name: preprocess │ │ │
│ │ │ executorLabel: exec-preprocess │ │ │
│ │ │ deploymentSpec: │ │ │
│ │ │ executors: │ │ │
│ │ │ exec-preprocess: │ │ │
│ │ │ container: │ │ │
│ │ │ image: python:3.9 │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Argo Workflow │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ apiVersion: argoproj.io/v1alpha1 │ │ │
│ │ │ kind: Workflow │ │ │
│ │ │ spec: │ │ │
│ │ │ templates: │ │ │
│ │ │ - name: preprocess │ │ │
│ │ │ container: ... │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
组件开发
基础组件
"""
KFP v2 组件开发
"""
from kfp import dsl
from kfp.dsl import (
component,
pipeline,
Input,
Output,
Dataset,
Model,
Metrics,
Artifact,
InputPath,
OutputPath,
)
from typing import NamedTuple
# 1. 轻量级 Python 组件
@component(
base_image="python:3.9",
packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
input_path: str,
output_dataset: Output[Dataset],
test_size: float = 0.2
):
"""数据预处理组件"""
import pandas as pd
from sklearn.model_selection import train_test_split
# 读取数据
df = pd.read_csv(input_path)
# 数据清洗
df = df.dropna()
# 划分数据集
train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)
# 保存处理后的数据
train_df.to_csv(f"{output_dataset.path}_train.csv", index=False)
test_df.to_csv(f"{output_dataset.path}_test.csv", index=False)
# 设置元数据
output_dataset.metadata["train_samples"] = len(train_df)
output_dataset.metadata["test_samples"] = len(test_df)
# 2. 带有多输出的组件
@component(
base_image="python:3.9",
packages_to_install=["pandas", "scikit-learn"]
)
def train_model(
train_data: Input[Dataset],
model_output: Output[Model],
metrics_output: Output[Metrics],
learning_rate: float = 0.01,
n_estimators: int = 100
) -> NamedTuple("TrainOutput", [("accuracy", float), ("f1_score", float)]):
"""模型训练组件"""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import pickle
# 加载数据
train_df = pd.read_csv(f"{train_data.path}_train.csv")
X = train_df.drop("target", axis=1)
y = train_df["target"]
# 训练模型
model = RandomForestClassifier(
n_estimators=n_estimators,
random_state=42
)
model.fit(X, y)
# 评估
predictions = model.predict(X)
acc = accuracy_score(y, predictions)
f1 = f1_score(y, predictions, average="weighted")
# 保存模型
with open(model_output.path, "wb") as f:
pickle.dump(model, f)
# 记录指标
metrics_output.log_metric("accuracy", acc)
metrics_output.log_metric("f1_score", f1)
metrics_output.log_metric("n_estimators", n_estimators)
# 设置模型元数据
model_output.metadata["framework"] = "sklearn"
model_output.metadata["model_type"] = "RandomForestClassifier"
from collections import namedtuple
output = namedtuple("TrainOutput", ["accuracy", "f1_score"])
return output(acc, f1)
# 3. 使用自定义容器镜像
@component(
base_image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime"
)
def train_pytorch_model(
train_data: Input[Dataset],
model_output: Output[Model],
epochs: int = 10,
batch_size: int = 32,
learning_rate: float = 0.001
):
"""PyTorch 模型训练"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import numpy as np
# 加载数据
df = pd.read_csv(f"{train_data.path}_train.csv")
X = torch.tensor(df.drop("target", axis=1).values, dtype=torch.float32)
y = torch.tensor(df["target"].values, dtype=torch.long)
dataset = TensorDataset(X, y)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# 定义模型
model = nn.Sequential(
nn.Linear(X.shape[1], 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, len(y.unique()))
)
if torch.cuda.is_available():
model = model.cuda()
X = X.cuda()
y = y.cuda()
# 训练
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
for epoch in range(epochs):
for batch_X, batch_y in dataloader:
if torch.cuda.is_available():
batch_X, batch_y = batch_X.cuda(), batch_y.cuda()
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
# 保存模型
torch.save(model.state_dict(), model_output.path)
model_output.metadata["framework"] = "pytorch"
model_output.metadata["epochs"] = epochs
# 4. 容器组件 (适用于复杂依赖)
from kfp.dsl import ContainerSpec
distributed_training_op = dsl.ContainerSpec(
image="my-registry/distributed-training:v1.0",
command=["python", "train_distributed.py"],
args=[
"--world-size", dsl.PipelineParameterArgument("world_size"),
"--epochs", dsl.PipelineParameterArgument("epochs"),
"--model-path", dsl.OutputPathArgument("model")
]
)
# 5. 条件组件
@component
def evaluate_model(
model: Input[Model],
test_data: Input[Dataset],
metrics: Output[Metrics]
) -> float:
"""模型评估"""
import pickle
import pandas as pd
from sklearn.metrics import accuracy_score
# 加载模型
with open(model.path, "rb") as f:
clf = pickle.load(f)
# 加载测试数据
test_df = pd.read_csv(f"{test_data.path}_test.csv")
X_test = test_df.drop("target", axis=1)
y_test = test_df["target"]
# 预测
predictions = clf.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
metrics.log_metric("test_accuracy", accuracy)
return accuracy
@component
def deploy_model(
model: Input[Model],
endpoint_name: str,
min_replicas: int = 1
):
"""模型部署"""
import subprocess
# 使用 kubectl 部署
deploy_yaml = f"""
apiVersion: serving.kubeflow.org/v1beta1
kind: InferenceService
metadata:
name: {endpoint_name}
spec:
predictor:
sklearn:
storageUri: {model.uri}
resources:
requests:
cpu: 100m
memory: 256Mi
minReplicas: {min_replicas}
"""
with open("/tmp/deploy.yaml", "w") as f:
f.write(deploy_yaml)
subprocess.run(["kubectl", "apply", "-f", "/tmp/deploy.yaml"], check=True)
# 6. 带重试的组件
@component(
base_image="python:3.9",
packages_to_install=["requests"]
)
def fetch_data_with_retry(
url: str,
output_data: Output[Dataset],
max_retries: int = 3
):
"""带重试的数据获取"""
import requests
import time
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
with open(output_data.path, "w") as f:
f.write(response.text)
output_data.metadata["source_url"] = url
output_data.metadata["size_bytes"] = len(response.text)
return
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
else:
raise RuntimeError(f"Failed after {max_retries} attempts: {e}")
Pipeline 定义
"""
KFP Pipeline 定义
"""
from kfp import dsl
from kfp import compiler
from kfp.client import Client
@dsl.pipeline(
name="ml-training-pipeline",
description="End-to-end ML training pipeline"
)
def ml_pipeline(
data_path: str = "gs://bucket/data.csv",
learning_rate: float = 0.01,
epochs: int = 10,
deploy_threshold: float = 0.9
):
"""
ML 训练流水线
Args:
data_path: 输入数据路径
learning_rate: 学习率
epochs: 训练轮数
deploy_threshold: 部署阈值
"""
# 1. 数据预处理
preprocess_task = preprocess_data(
input_path=data_path,
test_size=0.2
)
preprocess_task.set_display_name("Data Preprocessing")
preprocess_task.set_cpu_limit("2")
preprocess_task.set_memory_limit("4Gi")
# 2. 模型训练 (使用 GPU)
train_task = train_pytorch_model(
train_data=preprocess_task.outputs["output_dataset"],
epochs=epochs,
learning_rate=learning_rate
)
train_task.set_display_name("Model Training")
train_task.set_gpu_limit(1)
train_task.set_memory_limit("16Gi")
train_task.set_accelerator_type("nvidia.com/gpu")
# 3. 模型评估
evaluate_task = evaluate_model(
model=train_task.outputs["model_output"],
test_data=preprocess_task.outputs["output_dataset"]
)
evaluate_task.set_display_name("Model Evaluation")
# 4. 条件部署
with dsl.Condition(
evaluate_task.output >= deploy_threshold,
name="deploy-condition"
):
deploy_task = deploy_model(
model=train_task.outputs["model_output"],
endpoint_name="ml-model-endpoint"
)
deploy_task.set_display_name("Deploy Model")
# 并行训练多个模型
@dsl.pipeline(
name="hyperparameter-search-pipeline",
description="Hyperparameter search with parallel training"
)
def hyperparameter_search_pipeline(
data_path: str,
learning_rates: list = [0.001, 0.01, 0.1],
n_estimators_list: list = [50, 100, 200]
):
"""超参数搜索流水线"""
# 数据预处理
preprocess_task = preprocess_data(input_path=data_path)
# 并行训练
with dsl.ParallelFor(
items=learning_rates,
parallelism=3
) as lr:
with dsl.ParallelFor(
items=n_estimators_list,
parallelism=3
) as n_est:
train_task = train_model(
train_data=preprocess_task.outputs["output_dataset"],
learning_rate=lr,
n_estimators=n_est
)
# 注意: ParallelFor 的输出聚合需要额外处理
# 带退出处理的 Pipeline
@dsl.pipeline(name="pipeline-with-exit-handler")
def pipeline_with_exit_handler(data_path: str):
"""带退出处理的流水线"""
# 定义退出处理任务
@component
def cleanup_task(status: str):
"""清理任务"""
print(f"Pipeline finished with status: {status}")
# 清理临时资源
# 发送通知
import requests
requests.post(
"https://hooks.slack.com/...",
json={"text": f"Pipeline completed: {status}"}
)
# 主流水线
preprocess = preprocess_data(input_path=data_path)
train = train_model(train_data=preprocess.outputs["output_dataset"])
# 退出处理
with dsl.ExitHandler(
exit_task=cleanup_task(status="{{workflow.status}}")
):
# 主流水线任务已定义
pass
# 编译和提交
def compile_and_submit():
"""编译并提交流水线"""
# 编译为 YAML
compiler.Compiler().compile(
pipeline_func=ml_pipeline,
package_path="ml_pipeline.yaml"
)
# 提交到 KFP
client = Client(host="http://localhost:8080")
# 创建实验
experiment = client.create_experiment(
name="ml-experiments",
description="ML training experiments"
)
# 创建运行
run = client.create_run_from_pipeline_func(
ml_pipeline,
experiment_name="ml-experiments",
run_name="ml-pipeline-run-001",
arguments={
"data_path": "gs://my-bucket/data.csv",
"learning_rate": 0.01,
"epochs": 20
}
)
print(f"Run created: {run.run_id}")
# 等待完成
client.wait_for_run_completion(run.run_id, timeout=3600)
if __name__ == "__main__":
compile_and_submit()
高级特性
缓存与复用
"""
KFP 缓存与组件复用
"""
from kfp import dsl
from kfp.dsl import component
# 启用缓存的组件
@component(
base_image="python:3.9",
packages_to_install=["pandas"]
)
def cacheable_preprocess(
input_path: str,
output_path: dsl.OutputPath(str)
) -> str:
"""可缓存的预处理组件"""
import pandas as pd
import hashlib
df = pd.read_csv(input_path)
df_processed = df.dropna()
df_processed.to_csv(output_path, index=False)
# 返回数据指纹用于缓存验证
return hashlib.md5(df_processed.to_json().encode()).hexdigest()
@dsl.pipeline(name="cached-pipeline")
def cached_pipeline(data_path: str):
"""使用缓存的流水线"""
preprocess_task = cacheable_preprocess(input_path=data_path)
# 设置缓存策略
preprocess_task.set_caching_options(enable_caching=True)
# 也可以在 Pipeline 级别禁用缓存
# 通过运行参数 enable_caching=False
# 组件复用 - 创建可复用组件库
class MLComponents:
"""ML 组件库"""
@staticmethod
@component(
base_image="python:3.9",
packages_to_install=["pandas", "scikit-learn"]
)
def standard_scaler(
input_data: dsl.Input[dsl.Dataset],
output_data: dsl.Output[dsl.Dataset],
scaler_artifact: dsl.Output[dsl.Artifact]
):
"""标准化组件"""
import pandas as pd
from sklearn.preprocessing import StandardScaler
import pickle
df = pd.read_csv(input_data.path)
scaler = StandardScaler()
numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
df.to_csv(output_data.path, index=False)
with open(scaler_artifact.path, 'wb') as f:
pickle.dump(scaler, f)
@staticmethod
@component(
base_image="python:3.9",
packages_to_install=["pandas", "scikit-learn"]
)
def feature_selection(
input_data: dsl.Input[dsl.Dataset],
output_data: dsl.Output[dsl.Dataset],
n_features: int = 10,
method: str = "mutual_info"
):
"""特征选择组件"""
import pandas as pd
from sklearn.feature_selection import (
SelectKBest,
mutual_info_classif,
f_classif
)
df = pd.read_csv(input_data.path)
X = df.drop("target", axis=1)
y = df["target"]
selector_map = {
"mutual_info": mutual_info_classif,
"f_classif": f_classif
}
selector = SelectKBest(
score_func=selector_map[method],
k=n_features
)
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()].tolist()
result_df = pd.DataFrame(X_selected, columns=selected_features)
result_df["target"] = y.values
result_df.to_csv(output_data.path, index=False)
@staticmethod
@component(
base_image="python:3.9",
packages_to_install=["pandas", "scikit-learn", "imbalanced-learn"]
)
def handle_imbalance(
input_data: dsl.Input[dsl.Dataset],
output_data: dsl.Output[dsl.Dataset],
method: str = "smote",
sampling_strategy: str = "auto"
):
"""处理类别不平衡"""
import pandas as pd
from imblearn.over_sampling import SMOTE, RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
df = pd.read_csv(input_data.path)
X = df.drop("target", axis=1)
y = df["target"]
sampler_map = {
"smote": SMOTE(sampling_strategy=sampling_strategy),
"oversample": RandomOverSampler(sampling_strategy=sampling_strategy),
"undersample": RandomUnderSampler(sampling_strategy=sampling_strategy)
}
sampler = sampler_map[method]
X_resampled, y_resampled = sampler.fit_resample(X, y)
result_df = pd.DataFrame(X_resampled, columns=X.columns)
result_df["target"] = y_resampled
result_df.to_csv(output_data.path, index=False)
# 使用组件库构建流水线
@dsl.pipeline(name="feature-engineering-pipeline")
def feature_engineering_pipeline(data_path: str):
"""特征工程流水线"""
# 数据加载
load_task = preprocess_data(input_path=data_path)
# 标准化
scale_task = MLComponents.standard_scaler(
input_data=load_task.outputs["output_dataset"]
)
# 处理不平衡
balance_task = MLComponents.handle_imbalance(
input_data=scale_task.outputs["output_data"],
method="smote"
)
# 特征选择
select_task = MLComponents.feature_selection(
input_data=balance_task.outputs["output_data"],
n_features=20
)
# 训练
train_task = train_model(
train_data=select_task.outputs["output_data"]
)
分布式训练集成
"""
KFP 分布式训练集成
"""
from kfp import dsl
from kfp.dsl import component
# PyTorch 分布式训练组件
@component(
base_image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime",
packages_to_install=["kubernetes"]
)
def create_pytorch_distributed_job(
train_data: dsl.Input[dsl.Dataset],
model_output: dsl.Output[dsl.Model],
image: str,
num_workers: int = 2,
gpus_per_worker: int = 1,
epochs: int = 10
):
"""创建 PyTorchJob 分布式训练"""
from kubernetes import client, config
config.load_incluster_config()
api = client.CustomObjectsApi()
pytorchjob = {
"apiVersion": "kubeflow.org/v1",
"kind": "PyTorchJob",
"metadata": {
"name": f"pytorch-train-{dsl.PIPELINE_RUN_ID}"
},
"spec": {
"pytorchReplicaSpecs": {
"Master": {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"spec": {
"containers": [{
"name": "pytorch",
"image": image,
"command": ["python", "train.py"],
"args": [
"--data-path", train_data.path,
"--epochs", str(epochs),
"--output-path", model_output.path
],
"resources": {
"limits": {
"nvidia.com/gpu": gpus_per_worker
}
}
}]
}
}
},
"Worker": {
"replicas": num_workers - 1,
"restartPolicy": "OnFailure",
"template": {
"spec": {
"containers": [{
"name": "pytorch",
"image": image,
"command": ["python", "train.py"],
"args": [
"--data-path", train_data.path,
"--epochs", str(epochs)
],
"resources": {
"limits": {
"nvidia.com/gpu": gpus_per_worker
}
}
}]
}
}
}
}
}
}
# 创建 PyTorchJob
api.create_namespaced_custom_object(
group="kubeflow.org",
version="v1",
namespace="kubeflow",
plural="pytorchjobs",
body=pytorchjob
)
# 等待完成
import time
while True:
job = api.get_namespaced_custom_object(
group="kubeflow.org",
version="v1",
namespace="kubeflow",
plural="pytorchjobs",
name=f"pytorch-train-{dsl.PIPELINE_RUN_ID}"
)
status = job.get("status", {})
if status.get("phase") == "Succeeded":
break
elif status.get("phase") == "Failed":
raise RuntimeError("PyTorchJob failed")
time.sleep(30)
# 使用 Katib 进行超参数优化
@component(
base_image="python:3.9",
packages_to_install=["kubernetes"]
)
def run_katib_experiment(
experiment_name: str,
objective_metric: str,
objective_type: str = "maximize",
max_trial_count: int = 10,
parallel_trial_count: int = 3
) -> str:
"""运行 Katib 超参数优化实验"""
from kubernetes import client, config
config.load_incluster_config()
api = client.CustomObjectsApi()
experiment = {
"apiVersion": "kubeflow.org/v1beta1",
"kind": "Experiment",
"metadata": {
"name": experiment_name,
"namespace": "kubeflow"
},
"spec": {
"objective": {
"type": objective_type,
"goal": 0.99 if objective_type == "maximize" else 0.01,
"objectiveMetricName": objective_metric
},
"algorithm": {
"algorithmName": "bayesianoptimization"
},
"maxTrialCount": max_trial_count,
"parallelTrialCount": parallel_trial_count,
"parameters": [
{
"name": "learning_rate",
"parameterType": "double",
"feasibleSpace": {
"min": "0.0001",
"max": "0.1"
}
},
{
"name": "batch_size",
"parameterType": "categorical",
"feasibleSpace": {
"list": ["16", "32", "64", "128"]
}
},
{
"name": "num_layers",
"parameterType": "int",
"feasibleSpace": {
"min": "2",
"max": "10"
}
}
],
"trialTemplate": {
"primaryContainerName": "training-container",
"trialParameters": [
{"name": "learningRate", "reference": "learning_rate"},
{"name": "batchSize", "reference": "batch_size"},
{"name": "numLayers", "reference": "num_layers"}
],
"trialSpec": {
"apiVersion": "batch/v1",
"kind": "Job",
"spec": {
"template": {
"spec": {
"containers": [{
"name": "training-container",
"image": "pytorch/pytorch:2.0.0",
"command": ["python", "train.py"],
"args": [
"--lr=${trialParameters.learningRate}",
"--batch-size=${trialParameters.batchSize}",
"--layers=${trialParameters.numLayers}"
]
}],
"restartPolicy": "Never"
}
}
}
}
}
}
}
# 创建实验
api.create_namespaced_custom_object(
group="kubeflow.org",
version="v1beta1",
namespace="kubeflow",
plural="experiments",
body=experiment
)
# 等待完成并返回最佳参数
import time
while True:
exp = api.get_namespaced_custom_object(
group="kubeflow.org",
version="v1beta1",
namespace="kubeflow",
plural="experiments",
name=experiment_name
)
status = exp.get("status", {})
if status.get("completionTime"):
best_trial = status.get("currentOptimalTrial", {})
return str(best_trial.get("parameterAssignments", {}))
time.sleep(60)
部署配置
KFP 部署 YAML
# KFP 完整部署配置
---
# Namespace
apiVersion: v1
kind: Namespace
metadata:
name: kubeflow
---
# KFP API Server Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-pipeline-api-server
namespace: kubeflow
spec:
replicas: 2
selector:
matchLabels:
app: ml-pipeline-api-server
template:
metadata:
labels:
app: ml-pipeline-api-server
spec:
containers:
- name: api-server
image: gcr.io/ml-pipeline/api-server:2.0.0
ports:
- containerPort: 8888
- containerPort: 8887
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: DBCONFIG_USER
valueFrom:
secretKeyRef:
name: mysql-secret
key: username
- name: DBCONFIG_PASSWORD
valueFrom:
secretKeyRef:
name: mysql-secret
key: password
- name: DBCONFIG_DBNAME
value: mlpipeline
- name: DBCONFIG_HOST
value: mysql
- name: DBCONFIG_PORT
value: "3306"
- name: OBJECTSTORECONFIG_BUCKETNAME
value: mlpipeline
- name: OBJECTSTORECONFIG_HOST
value: minio-service
- name: OBJECTSTORECONFIG_PORT
value: "9000"
resources:
requests:
cpu: 250m
memory: 500Mi
limits:
cpu: 500m
memory: 1Gi
livenessProbe:
httpGet:
path: /apis/v1beta1/healthz
port: 8888
initialDelaySeconds: 30
periodSeconds: 10
---
# KFP UI Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-pipeline-ui
namespace: kubeflow
spec:
replicas: 1
selector:
matchLabels:
app: ml-pipeline-ui
template:
metadata:
labels:
app: ml-pipeline-ui
spec:
containers:
- name: ui
image: gcr.io/ml-pipeline/frontend:2.0.0
ports:
- containerPort: 3000
env:
- name: VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH
value: /etc/config/viewer-pod-template.json
- name: MINIO_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: MINIO_HOST
value: minio-service
- name: MINIO_PORT
value: "9000"
resources:
requests:
cpu: 100m
memory: 256Mi
---
# 持久化 Agent
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-pipeline-persistenceagent
namespace: kubeflow
spec:
replicas: 1
selector:
matchLabels:
app: ml-pipeline-persistenceagent
template:
metadata:
labels:
app: ml-pipeline-persistenceagent
spec:
containers:
- name: persistenceagent
image: gcr.io/ml-pipeline/persistenceagent:2.0.0
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: TTL_SECONDS_AFTER_WORKFLOW_FINISH
value: "86400"
resources:
requests:
cpu: 100m
memory: 256Mi
---
# 调度工作流控制器
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-pipeline-scheduledworkflow
namespace: kubeflow
spec:
replicas: 1
selector:
matchLabels:
app: ml-pipeline-scheduledworkflow
template:
metadata:
labels:
app: ml-pipeline-scheduledworkflow
spec:
containers:
- name: scheduledworkflow
image: gcr.io/ml-pipeline/scheduledworkflow:2.0.0
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
resources:
requests:
cpu: 50m
memory: 128Mi
---
# MySQL 部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
namespace: kubeflow
spec:
replicas: 1
selector:
matchLabels:
app: mysql
template:
metadata:
labels:
app: mysql
spec:
containers:
- name: mysql
image: mysql:8.0
env:
- name: MYSQL_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: mysql-secret
key: password
- name: MYSQL_DATABASE
value: mlpipeline
ports:
- containerPort: 3306
volumeMounts:
- name: mysql-data
mountPath: /var/lib/mysql
resources:
requests:
cpu: 100m
memory: 512Mi
volumes:
- name: mysql-data
persistentVolumeClaim:
claimName: mysql-pvc
---
# MinIO 部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: minio
namespace: kubeflow
spec:
replicas: 1
selector:
matchLabels:
app: minio
template:
metadata:
labels:
app: minio
spec:
containers:
- name: minio
image: minio/minio:RELEASE.2023-09-04T19-57-37Z
args:
- server
- /data
- --console-address
- ":9001"
env:
- name: MINIO_ROOT_USER
valueFrom:
secretKeyRef:
name: minio-secret
key: accesskey
- name: MINIO_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: minio-secret
key: secretkey
ports:
- containerPort: 9000
- containerPort: 9001
volumeMounts:
- name: minio-data
mountPath: /data
resources:
requests:
cpu: 100m
memory: 512Mi
volumes:
- name: minio-data
persistentVolumeClaim:
claimName: minio-pvc
---
# Service 定义
apiVersion: v1
kind: Service
metadata:
name: ml-pipeline-api-server
namespace: kubeflow
spec:
selector:
app: ml-pipeline-api-server
ports:
- name: http
port: 8888
- name: grpc
port: 8887
---
apiVersion: v1
kind: Service
metadata:
name: ml-pipeline-ui
namespace: kubeflow
spec:
selector:
app: ml-pipeline-ui
ports:
- port: 80
targetPort: 3000
type: LoadBalancer
---
apiVersion: v1
kind: Service
metadata:
name: minio-service
namespace: kubeflow
spec:
selector:
app: minio
ports:
- name: api
port: 9000
- name: console
port: 9001
最佳实践
组件设计原则
# KFP 组件设计最佳实践
component_design:
# 1. 输入输出
io_design:
- name: "使用强类型"
description: "使用 Input/Output 类型注解"
good: "data: Input[Dataset]"
bad: "data: str"
- name: "小数据用参数"
description: "简单值作为参数传递"
limit: "< 1MB"
- name: "大数据用 Artifact"
description: "文件通过 Artifact 传递"
types: [Dataset, Model, Metrics]
# 2. 容器镜像
image_design:
- name: "使用固定版本"
good: "pytorch/pytorch:2.0.0-cuda11.7"
bad: "pytorch/pytorch:latest"
- name: "最小化镜像"
description: "只包含必要依赖"
- name: "使用基础镜像"
description: "构建组织内部基础镜像"
# 3. 错误处理
error_handling:
- name: "设置重试策略"
config:
max_retries: 3
backoff_duration: "10s"
- name: "记录详细日志"
description: "使用结构化日志"
- name: "返回有意义的错误"
description: "错误信息包含上下文"
# 4. 资源管理
resource_management:
- name: "明确资源需求"
fields: [cpu_request, memory_request, gpu_limit]
- name: "设置超时"
description: "防止任务无限运行"
- name: "清理临时文件"
description: "任务结束时清理"
---
# Pipeline 设计原则
pipeline_design:
# 1. 模块化
modularity:
- 将大 Pipeline 拆分为子 Pipeline
- 使用条件分支减少不必要执行
- 复用通用组件
# 2. 可观测性
observability:
- 设置有意义的 display_name
- 记录关键指标到 Metrics
- 使用 Artifact 追踪数据血缘
# 3. 可维护性
maintainability:
- 参数化所有可变配置
- 使用版本控制
- 编写组件测试
总结
Kubeflow Pipelines 是构建生产级 ML 流水线的强大工具:
- 架构特点:基于 Argo Workflows,Kubernetes 原生
- 组件开发:Python 装饰器定义,类型安全
- 高级特性:缓存、分布式训练、超参数优化
- 部署运维:可扩展、可观测、易于集成
KFP 适合需要完整 MLOps 能力的大规模 ML 平台建设。