HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

05-AIOps平台实战

概述

本章将前面章节的理论和技术整合为完整的 AIOps 平台实战方案,包括平台架构设计、核心模块实现、系统集成和最佳实践。

1. 平台架构设计

1.1 整体架构

┌─────────────────────────────────────────────────────────────────┐
│                      AIOps 平台架构                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    前端层                                 │   │
│  │  Dashboard │ 告警中心 │ 分析控制台 │ 配置管理            │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │ REST/WebSocket                        │
│  ┌──────────────────────┴──────────────────────────────────┐   │
│  │                    API 网关层                             │   │
│  │  认证授权 │ 限流 │ 路由 │ 负载均衡                        │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    应用服务层                             │   │
│  │                                                         │   │
│  │  ┌───────────┐ ┌───────────┐ ┌───────────┐             │   │
│  │  │ 异常检测   │ │ 根因分析   │ │ 告警管理   │             │   │
│  │  │ Service   │ │ Service   │ │ Service   │             │   │
│  │  └───────────┘ └───────────┘ └───────────┘             │   │
│  │  ┌───────────┐ ┌───────────┐ ┌───────────┐             │   │
│  │  │ 容量预测   │ │ 自动修复   │ │ 知识库    │             │   │
│  │  │ Service   │ │ Service   │ │ Service   │             │   │
│  │  └───────────┘ └───────────┘ └───────────┘             │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    AI 引擎层                              │   │
│  │  模型服务 │ 模型训练 │ 特征工程 │ 模型管理                │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    数据处理层                             │   │
│  │  实时流处理 (Flink) │ 批处理 (Spark) │ ETL                │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    数据存储层                             │   │
│  │  时序库 │ 日志库 │ 图数据库 │ 对象存储 │ 缓存              │   │
│  │  (VictoriaMetrics) (ES) (Neo4j) (MinIO) (Redis)        │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    数据采集层                             │   │
│  │  Prometheus │ Fluentd │ Jaeger │ Kafka                  │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 Kubernetes 部署架构

# AIOps 平台 Kubernetes 部署

apiVersion: v1
kind: Namespace
metadata:
  name: aiops-platform

---
# 配置管理
apiVersion: v1
kind: ConfigMap
metadata:
  name: aiops-config
  namespace: aiops-platform
data:
  config.yaml: |
    database:
      timeseries:
        type: victoriametrics
        endpoints:
          - http://victoria-metrics:8428
      logs:
        type: elasticsearch
        endpoints:
          - http://elasticsearch:9200
      graph:
        type: neo4j
        endpoint: bolt://neo4j:7687

    kafka:
      brokers:
        - kafka:9092
      topics:
        metrics: aiops-metrics
        logs: aiops-logs
        alerts: aiops-alerts

    ai:
      model_registry: http://mlflow:5000
      inference_endpoint: http://triton:8000

    features:
      anomaly_detection: true
      root_cause_analysis: true
      auto_remediation: true
      capacity_planning: true

---
# 异常检测服务
apiVersion: apps/v1
kind: Deployment
metadata:
  name: anomaly-detection
  namespace: aiops-platform
spec:
  replicas: 3
  selector:
    matchLabels:
      app: anomaly-detection
  template:
    metadata:
      labels:
        app: anomaly-detection
    spec:
      containers:
      - name: anomaly-detection
        image: aiops/anomaly-detection:v1.0
        ports:
        - containerPort: 8080
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2000m"
        env:
        - name: CONFIG_PATH
          value: /config/config.yaml
        volumeMounts:
        - name: config
          mountPath: /config
      volumes:
      - name: config
        configMap:
          name: aiops-config

---
# 根因分析服务
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rca-service
  namespace: aiops-platform
spec:
  replicas: 2
  selector:
    matchLabels:
      app: rca-service
  template:
    metadata:
      labels:
        app: rca-service
    spec:
      containers:
      - name: rca-service
        image: aiops/rca-service:v1.0
        ports:
        - containerPort: 8080
        resources:
          requests:
            memory: "1Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "4000m"

---
# 告警管理服务
apiVersion: apps/v1
kind: Deployment
metadata:
  name: alert-manager
  namespace: aiops-platform
spec:
  replicas: 2
  selector:
    matchLabels:
      app: alert-manager
  template:
    metadata:
      labels:
        app: alert-manager
    spec:
      containers:
      - name: alert-manager
        image: aiops/alert-manager:v1.0
        ports:
        - containerPort: 8080

---
# API 网关
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: aiops-ingress
  namespace: aiops-platform
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
spec:
  rules:
  - host: aiops.example.com
    http:
      paths:
      - path: /api/anomaly
        pathType: Prefix
        backend:
          service:
            name: anomaly-detection
            port:
              number: 8080
      - path: /api/rca
        pathType: Prefix
        backend:
          service:
            name: rca-service
            port:
              number: 8080
      - path: /api/alerts
        pathType: Prefix
        backend:
          service:
            name: alert-manager
            port:
              number: 8080

2. 核心模块实现

2.1 统一数据采集

"""
统一数据采集模块
"""

import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from aiokafka import AIOKafkaProducer
import json
import time


@dataclass
class DataPoint:
    """数据点"""
    source: str
    data_type: str  # metric, log, trace, event
    timestamp: float
    resource: str
    data: Dict[str, Any]
    tags: Dict[str, str] = field(default_factory=dict)


class DataCollector(ABC):
    """数据采集器基类"""

    @abstractmethod
    async def collect(self) -> List[DataPoint]:
        """采集数据"""
        pass

    @abstractmethod
    async def start(self):
        """启动采集"""
        pass

    @abstractmethod
    async def stop(self):
        """停止采集"""
        pass


class PrometheusCollector(DataCollector):
    """Prometheus 指标采集器"""

    def __init__(self, prometheus_url: str,
                 scrape_interval: int = 15):
        self.prometheus_url = prometheus_url
        self.scrape_interval = scrape_interval
        self.running = False

    async def collect(self) -> List[DataPoint]:
        """采集 Prometheus 指标"""
        import aiohttp

        async with aiohttp.ClientSession() as session:
            # 查询所有活跃的指标
            url = f"{self.prometheus_url}/api/v1/query"
            queries = [
                "up",
                "node_cpu_seconds_total",
                "node_memory_MemAvailable_bytes",
                "container_cpu_usage_seconds_total",
                "container_memory_usage_bytes"
            ]

            data_points = []
            for query in queries:
                async with session.get(url, params={"query": query}) as resp:
                    if resp.status == 200:
                        result = await resp.json()
                        for item in result.get("data", {}).get("result", []):
                            metric = item["metric"]
                            value = item["value"]

                            data_points.append(DataPoint(
                                source="prometheus",
                                data_type="metric",
                                timestamp=float(value[0]),
                                resource=metric.get("instance", "unknown"),
                                data={
                                    "metric_name": metric.get("__name__", query),
                                    "value": float(value[1])
                                },
                                tags=metric
                            ))

            return data_points

    async def start(self):
        """启动采集循环"""
        self.running = True
        while self.running:
            await self.collect()
            await asyncio.sleep(self.scrape_interval)

    async def stop(self):
        """停止采集"""
        self.running = False


class ElasticsearchCollector(DataCollector):
    """Elasticsearch 日志采集器"""

    def __init__(self, es_url: str,
                 index_pattern: str = "logs-*",
                 poll_interval: int = 5):
        self.es_url = es_url
        self.index_pattern = index_pattern
        self.poll_interval = poll_interval
        self.last_timestamp = None
        self.running = False

    async def collect(self) -> List[DataPoint]:
        """采集日志"""
        import aiohttp

        query = {
            "query": {
                "range": {
                    "@timestamp": {
                        "gte": self.last_timestamp or "now-5m",
                        "lt": "now"
                    }
                }
            },
            "size": 1000,
            "sort": [{"@timestamp": "asc"}]
        }

        async with aiohttp.ClientSession() as session:
            url = f"{self.es_url}/{self.index_pattern}/_search"
            async with session.post(url, json=query) as resp:
                if resp.status != 200:
                    return []

                result = await resp.json()
                data_points = []

                for hit in result.get("hits", {}).get("hits", []):
                    source = hit["_source"]
                    data_points.append(DataPoint(
                        source="elasticsearch",
                        data_type="log",
                        timestamp=source.get("@timestamp", time.time()),
                        resource=source.get("host", {}).get("name", "unknown"),
                        data={
                            "message": source.get("message", ""),
                            "level": source.get("level", "INFO"),
                            "service": source.get("service", {}).get("name", "")
                        },
                        tags={
                            "index": hit["_index"],
                            "log_type": source.get("type", "application")
                        }
                    ))

                if data_points:
                    self.last_timestamp = data_points[-1].timestamp

                return data_points

    async def start(self):
        self.running = True
        while self.running:
            await self.collect()
            await asyncio.sleep(self.poll_interval)

    async def stop(self):
        self.running = False


class UnifiedDataPipeline:
    """统一数据管道"""

    def __init__(self, kafka_brokers: List[str],
                 output_topic: str = "aiops-raw-data"):
        self.kafka_brokers = kafka_brokers
        self.output_topic = output_topic
        self.collectors: List[DataCollector] = []
        self.producer: Optional[AIOKafkaProducer] = None

    def add_collector(self, collector: DataCollector):
        """添加采集器"""
        self.collectors.append(collector)

    async def start(self):
        """启动管道"""
        # 初始化 Kafka 生产者
        self.producer = AIOKafkaProducer(
            bootstrap_servers=",".join(self.kafka_brokers),
            value_serializer=lambda v: json.dumps(v).encode()
        )
        await self.producer.start()

        # 启动所有采集器
        tasks = []
        for collector in self.collectors:
            task = asyncio.create_task(self._run_collector(collector))
            tasks.append(task)

        await asyncio.gather(*tasks)

    async def _run_collector(self, collector: DataCollector):
        """运行单个采集器"""
        while True:
            try:
                data_points = await collector.collect()
                for dp in data_points:
                    await self._send_to_kafka(dp)
            except Exception as e:
                print(f"采集错误: {e}")

            await asyncio.sleep(1)

    async def _send_to_kafka(self, data_point: DataPoint):
        """发送到 Kafka"""
        message = {
            "source": data_point.source,
            "data_type": data_point.data_type,
            "timestamp": data_point.timestamp,
            "resource": data_point.resource,
            "data": data_point.data,
            "tags": data_point.tags
        }
        await self.producer.send_and_wait(self.output_topic, message)

    async def stop(self):
        """停止管道"""
        for collector in self.collectors:
            await collector.stop()
        if self.producer:
            await self.producer.stop()

2.2 实时流处理

"""
实时流处理模块 (基于 Flink Python API)
"""

from typing import Dict, List, Any
import json


class StreamProcessor:
    """流处理器 (简化实现)"""

    def __init__(self):
        self.processors = []

    def add_processor(self, name: str, func):
        """添加处理函数"""
        self.processors.append((name, func))

    async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """处理单条数据"""
        result = data.copy()

        for name, func in self.processors:
            try:
                result = func(result)
            except Exception as e:
                result["_processing_errors"] = result.get("_processing_errors", [])
                result["_processing_errors"].append({
                    "processor": name,
                    "error": str(e)
                })

        return result


def feature_extraction_processor(data: Dict[str, Any]) -> Dict[str, Any]:
    """特征提取处理器"""
    if data.get("data_type") == "metric":
        # 提取指标特征
        value = data.get("data", {}).get("value", 0)
        data["features"] = {
            "value": value,
            "is_high": value > 80,
            "is_low": value < 20
        }

    elif data.get("data_type") == "log":
        # 提取日志特征
        message = data.get("data", {}).get("message", "")
        data["features"] = {
            "length": len(message),
            "has_error": "error" in message.lower(),
            "has_exception": "exception" in message.lower(),
            "has_warning": "warning" in message.lower()
        }

    return data


def anomaly_scoring_processor(data: Dict[str, Any]) -> Dict[str, Any]:
    """异常评分处理器"""
    features = data.get("features", {})

    # 简单的规则评分
    score = 0

    if data.get("data_type") == "metric":
        if features.get("is_high"):
            score += 0.5
        if features.get("is_low"):
            score += 0.3

    elif data.get("data_type") == "log":
        if features.get("has_error"):
            score += 0.7
        if features.get("has_exception"):
            score += 0.8
        if features.get("has_warning"):
            score += 0.3

    data["anomaly_score"] = min(score, 1.0)
    data["is_anomaly"] = score > 0.5

    return data


def enrichment_processor(data: Dict[str, Any]) -> Dict[str, Any]:
    """数据富化处理器"""
    resource = data.get("resource", "")

    # 添加资源元数据 (实际实现会查询 CMDB)
    data["resource_metadata"] = {
        "type": "pod" if "pod" in resource else "host",
        "namespace": resource.split("/")[0] if "/" in resource else "default",
        "cluster": "production"
    }

    return data


# Flink 作业定义 (伪代码)
"""
# flink_job.py (需要 PyFlink 环境)

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema

def create_flink_job():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(4)

    # Kafka Source
    kafka_consumer = FlinkKafkaConsumer(
        topics='aiops-raw-data',
        deserialization_schema=SimpleStringSchema(),
        properties={
            'bootstrap.servers': 'kafka:9092',
            'group.id': 'aiops-stream-processor'
        }
    )

    # 处理流
    stream = env.add_source(kafka_consumer)

    processed = stream \
        .map(lambda x: json.loads(x)) \
        .map(feature_extraction_processor) \
        .map(anomaly_scoring_processor) \
        .map(enrichment_processor)

    # 分流
    anomalies = processed.filter(lambda x: x.get('is_anomaly', False))
    normal = processed.filter(lambda x: not x.get('is_anomaly', False))

    # 输出
    anomalies.add_sink(FlinkKafkaProducer(
        'aiops-anomalies',
        SimpleStringSchema(),
        {'bootstrap.servers': 'kafka:9092'}
    ))

    env.execute('AIOps Stream Processing')
"""

2.3 API 服务实现

"""
AIOps API 服务
"""

from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, List, Any, Optional
from datetime import datetime
import asyncio

app = FastAPI(
    title="AIOps Platform API",
    description="智能运维平台 API",
    version="1.0.0"
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# ==================== 数据模型 ====================

class AnomalyQuery(BaseModel):
    resource: Optional[str] = None
    metric: Optional[str] = None
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    severity: Optional[str] = None
    limit: int = 100


class RCARequest(BaseModel):
    incident_id: Optional[str] = None
    anomaly_ids: Optional[List[str]] = None
    time_window_minutes: int = 30


class AlertRule(BaseModel):
    name: str
    condition: Dict[str, Any]
    severity: str
    notification_channels: List[str]
    enabled: bool = True


class RemediationRequest(BaseModel):
    incident_id: str
    action: str
    parameters: Dict[str, Any] = {}
    dry_run: bool = False


class CapacityQuery(BaseModel):
    resources: List[str]
    metric: str
    forecast_hours: int = 24


# ==================== 依赖注入 ====================

async def get_anomaly_service():
    """获取异常检测服务"""
    # 实际实现会返回服务实例
    return {}


async def get_rca_service():
    """获取根因分析服务"""
    return {}


async def get_alert_service():
    """获取告警服务"""
    return {}


# ==================== API 端点 ====================

@app.get("/api/v1/health")
async def health_check():
    """健康检查"""
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "version": "1.0.0"
    }


# 异常检测 API
@app.post("/api/v1/anomalies/detect")
async def detect_anomalies(
    data: Dict[str, Any],
    background_tasks: BackgroundTasks
):
    """实时异常检测"""
    # 调用异常检测服务
    result = {
        "request_id": "req-123",
        "is_anomaly": False,
        "anomaly_score": 0.2,
        "detection_time_ms": 15
    }
    return result


@app.get("/api/v1/anomalies")
async def list_anomalies(
    query: AnomalyQuery = Depends()
):
    """查询异常列表"""
    return {
        "total": 10,
        "anomalies": [
            {
                "anomaly_id": "anom-001",
                "resource": "api-server-pod-1",
                "metric": "cpu_usage",
                "value": 95.5,
                "threshold": 80,
                "severity": "high",
                "detected_at": datetime.now().isoformat()
            }
        ]
    }


@app.get("/api/v1/anomalies/{anomaly_id}")
async def get_anomaly_detail(anomaly_id: str):
    """获取异常详情"""
    return {
        "anomaly_id": anomaly_id,
        "resource": "api-server-pod-1",
        "metric": "cpu_usage",
        "timeline": [],
        "related_metrics": [],
        "suggested_causes": []
    }


# 根因分析 API
@app.post("/api/v1/rca/analyze")
async def analyze_root_cause(request: RCARequest):
    """执行根因分析"""
    return {
        "analysis_id": "rca-001",
        "status": "completed",
        "root_causes": [
            {
                "rank": 1,
                "resource": "database-primary",
                "issue": "Connection pool exhausted",
                "confidence": 0.85,
                "evidence": [
                    "DB connection count spiked",
                    "Multiple timeout errors"
                ]
            }
        ],
        "impact_analysis": {
            "affected_services": ["api-server", "user-service"],
            "estimated_users_affected": 1500
        },
        "recommended_actions": [
            "增加数据库连接池大小",
            "检查慢查询"
        ]
    }


@app.get("/api/v1/rca/{analysis_id}")
async def get_rca_result(analysis_id: str):
    """获取根因分析结果"""
    return {"analysis_id": analysis_id, "status": "completed"}


# 告警管理 API
@app.get("/api/v1/alerts")
async def list_alerts(
    status: Optional[str] = None,
    severity: Optional[str] = None,
    limit: int = 100
):
    """查询告警列表"""
    return {
        "total": 5,
        "alerts": []
    }


@app.post("/api/v1/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: str, comment: Optional[str] = None):
    """确认告警"""
    return {"alert_id": alert_id, "status": "acknowledged"}


@app.post("/api/v1/alerts/{alert_id}/resolve")
async def resolve_alert(alert_id: str, resolution: str):
    """解决告警"""
    return {"alert_id": alert_id, "status": "resolved"}


@app.post("/api/v1/alert-rules")
async def create_alert_rule(rule: AlertRule):
    """创建告警规则"""
    return {"rule_id": "rule-001", **rule.dict()}


@app.get("/api/v1/alert-rules")
async def list_alert_rules():
    """列出告警规则"""
    return {"rules": []}


# 自动修复 API
@app.post("/api/v1/remediation/execute")
async def execute_remediation(request: RemediationRequest):
    """执行自动修复"""
    if request.dry_run:
        return {
            "status": "dry_run",
            "would_execute": request.action,
            "parameters": request.parameters
        }

    return {
        "execution_id": "exec-001",
        "status": "running",
        "action": request.action
    }


@app.get("/api/v1/remediation/{execution_id}")
async def get_remediation_status(execution_id: str):
    """获取修复状态"""
    return {
        "execution_id": execution_id,
        "status": "completed",
        "result": "success"
    }


# 容量规划 API
@app.post("/api/v1/capacity/forecast")
async def forecast_capacity(query: CapacityQuery):
    """容量预测"""
    return {
        "forecasts": [
            {
                "resource": "api-server",
                "metric": "cpu_usage",
                "predictions": [
                    {"timestamp": datetime.now().isoformat(), "value": 65, "lower": 60, "upper": 70}
                ],
                "trend": "increasing",
                "saturation_time_hours": 48
            }
        ]
    }


@app.get("/api/v1/capacity/recommendations")
async def get_capacity_recommendations():
    """获取容量建议"""
    return {
        "recommendations": [
            {
                "resource": "api-server",
                "action": "scale_up",
                "reason": "预测 48 小时内达到容量上限",
                "suggested_replicas": 5
            }
        ]
    }


# 拓扑与依赖 API
@app.get("/api/v1/topology")
async def get_topology():
    """获取服务拓扑"""
    return {
        "nodes": [],
        "edges": []
    }


@app.get("/api/v1/topology/{resource_id}/dependencies")
async def get_dependencies(resource_id: str, direction: str = "both"):
    """获取资源依赖"""
    return {
        "resource_id": resource_id,
        "upstream": [],
        "downstream": []
    }


# 知识库 API
@app.get("/api/v1/knowledge/runbooks")
async def list_runbooks():
    """列出运维手册"""
    return {"runbooks": []}


@app.get("/api/v1/knowledge/search")
async def search_knowledge(query: str):
    """搜索知识库"""
    return {"results": []}


# WebSocket 实时推送
from fastapi import WebSocket, WebSocketDisconnect

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: dict):
        for connection in self.active_connections:
            await connection.send_json(message)


manager = ConnectionManager()


@app.websocket("/ws/alerts")
async def websocket_alerts(websocket: WebSocket):
    """告警实时推送"""
    await manager.connect(websocket)
    try:
        while True:
            # 等待并推送新告警
            await asyncio.sleep(1)
    except WebSocketDisconnect:
        manager.disconnect(websocket)


@app.websocket("/ws/metrics/{resource_id}")
async def websocket_metrics(websocket: WebSocket, resource_id: str):
    """指标实时推送"""
    await websocket.accept()
    try:
        while True:
            # 推送实时指标
            await websocket.send_json({
                "resource_id": resource_id,
                "timestamp": datetime.now().isoformat(),
                "metrics": {
                    "cpu": 45.5,
                    "memory": 60.2
                }
            })
            await asyncio.sleep(1)
    except WebSocketDisconnect:
        pass

3. 系统集成

3.1 监控系统集成

"""
监控系统集成
"""

from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod
import aiohttp


class MonitoringIntegration(ABC):
    """监控集成基类"""

    @abstractmethod
    async def get_metrics(self, query: str) -> List[Dict]:
        pass

    @abstractmethod
    async def create_alert_rule(self, rule: Dict) -> str:
        pass


class PrometheusIntegration(MonitoringIntegration):
    """Prometheus 集成"""

    def __init__(self, url: str):
        self.url = url

    async def get_metrics(self, query: str,
                          start: Optional[float] = None,
                          end: Optional[float] = None,
                          step: str = "1m") -> List[Dict]:
        """查询 Prometheus 指标"""
        async with aiohttp.ClientSession() as session:
            if start and end:
                # 范围查询
                url = f"{self.url}/api/v1/query_range"
                params = {
                    "query": query,
                    "start": start,
                    "end": end,
                    "step": step
                }
            else:
                # 即时查询
                url = f"{self.url}/api/v1/query"
                params = {"query": query}

            async with session.get(url, params=params) as resp:
                data = await resp.json()
                return data.get("data", {}).get("result", [])

    async def create_alert_rule(self, rule: Dict) -> str:
        """创建告警规则 (通过 Alertmanager)"""
        # 实际需要写入 Prometheus 规则文件或使用 Thanos Ruler
        return "rule-created"


class GrafanaIntegration:
    """Grafana 集成"""

    def __init__(self, url: str, api_key: str):
        self.url = url
        self.headers = {"Authorization": f"Bearer {api_key}"}

    async def create_dashboard(self, dashboard: Dict) -> str:
        """创建仪表盘"""
        async with aiohttp.ClientSession() as session:
            url = f"{self.url}/api/dashboards/db"
            async with session.post(url, json=dashboard,
                                   headers=self.headers) as resp:
                result = await resp.json()
                return result.get("uid", "")

    async def add_annotation(self, dashboard_id: str,
                              text: str, tags: List[str],
                              time_from: int, time_to: int):
        """添加注释"""
        async with aiohttp.ClientSession() as session:
            url = f"{self.url}/api/annotations"
            annotation = {
                "dashboardId": dashboard_id,
                "text": text,
                "tags": tags,
                "time": time_from,
                "timeEnd": time_to
            }
            await session.post(url, json=annotation, headers=self.headers)


class AlertManagerIntegration:
    """Alertmanager 集成"""

    def __init__(self, url: str):
        self.url = url

    async def get_alerts(self, active: bool = True) -> List[Dict]:
        """获取告警"""
        async with aiohttp.ClientSession() as session:
            url = f"{self.url}/api/v2/alerts"
            params = {"active": str(active).lower()}
            async with session.get(url, params=params) as resp:
                return await resp.json()

    async def silence_alert(self, matchers: List[Dict],
                            starts_at: str, ends_at: str,
                            comment: str, created_by: str) -> str:
        """静默告警"""
        async with aiohttp.ClientSession() as session:
            url = f"{self.url}/api/v2/silences"
            silence = {
                "matchers": matchers,
                "startsAt": starts_at,
                "endsAt": ends_at,
                "comment": comment,
                "createdBy": created_by
            }
            async with session.post(url, json=silence) as resp:
                result = await resp.json()
                return result.get("silenceID", "")

3.2 工单系统集成

"""
工单系统集成
"""

from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod
import aiohttp


class TicketingIntegration(ABC):
    """工单系统集成基类"""

    @abstractmethod
    async def create_ticket(self, ticket: Dict) -> str:
        pass

    @abstractmethod
    async def update_ticket(self, ticket_id: str, updates: Dict):
        pass

    @abstractmethod
    async def close_ticket(self, ticket_id: str, resolution: str):
        pass


class JiraIntegration(TicketingIntegration):
    """Jira 集成"""

    def __init__(self, url: str, username: str, api_token: str):
        self.url = url
        self.auth = aiohttp.BasicAuth(username, api_token)

    async def create_ticket(self, ticket: Dict) -> str:
        """创建 Jira Issue"""
        async with aiohttp.ClientSession() as session:
            url = f"{self.url}/rest/api/3/issue"
            issue = {
                "fields": {
                    "project": {"key": ticket.get("project", "OPS")},
                    "summary": ticket["title"],
                    "description": {
                        "type": "doc",
                        "version": 1,
                        "content": [
                            {
                                "type": "paragraph",
                                "content": [
                                    {"type": "text", "text": ticket["description"]}
                                ]
                            }
                        ]
                    },
                    "issuetype": {"name": ticket.get("type", "Bug")},
                    "priority": {"name": ticket.get("priority", "Medium")}
                }
            }

            async with session.post(url, json=issue, auth=self.auth) as resp:
                result = await resp.json()
                return result.get("key", "")

    async def update_ticket(self, ticket_id: str, updates: Dict):
        """更新 Issue"""
        async with aiohttp.ClientSession() as session:
            url = f"{self.url}/rest/api/3/issue/{ticket_id}"
            await session.put(url, json={"fields": updates}, auth=self.auth)

    async def close_ticket(self, ticket_id: str, resolution: str):
        """关闭 Issue"""
        async with aiohttp.ClientSession() as session:
            # 获取可用的转换
            url = f"{self.url}/rest/api/3/issue/{ticket_id}/transitions"
            async with session.get(url, auth=self.auth) as resp:
                transitions = await resp.json()

            # 找到 "Done" 转换
            done_transition = None
            for t in transitions.get("transitions", []):
                if t["name"].lower() in ["done", "closed", "resolved"]:
                    done_transition = t["id"]
                    break

            if done_transition:
                await session.post(url, json={
                    "transition": {"id": done_transition},
                    "update": {
                        "comment": [
                            {
                                "add": {
                                    "body": {
                                        "type": "doc",
                                        "version": 1,
                                        "content": [
                                            {
                                                "type": "paragraph",
                                                "content": [
                                                    {"type": "text", "text": resolution}
                                                ]
                                            }
                                        ]
                                    }
                                }
                            }
                        ]
                    }
                }, auth=self.auth)


class ServiceNowIntegration(TicketingIntegration):
    """ServiceNow 集成"""

    def __init__(self, instance: str, username: str, password: str):
        self.base_url = f"https://{instance}.service-now.com/api/now"
        self.auth = aiohttp.BasicAuth(username, password)

    async def create_ticket(self, ticket: Dict) -> str:
        """创建 Incident"""
        async with aiohttp.ClientSession() as session:
            url = f"{self.base_url}/table/incident"
            incident = {
                "short_description": ticket["title"],
                "description": ticket["description"],
                "urgency": ticket.get("urgency", "2"),
                "impact": ticket.get("impact", "2"),
                "category": ticket.get("category", "software")
            }

            async with session.post(url, json=incident, auth=self.auth) as resp:
                result = await resp.json()
                return result.get("result", {}).get("number", "")

    async def update_ticket(self, ticket_id: str, updates: Dict):
        """更新 Incident"""
        async with aiohttp.ClientSession() as session:
            url = f"{self.base_url}/table/incident/{ticket_id}"
            await session.patch(url, json=updates, auth=self.auth)

    async def close_ticket(self, ticket_id: str, resolution: str):
        """关闭 Incident"""
        await self.update_ticket(ticket_id, {
            "state": "6",  # Resolved
            "close_code": "Solved",
            "close_notes": resolution
        })

3.3 通知系统集成

"""
通知系统集成
"""

from typing import Dict, List, Any
from abc import ABC, abstractmethod
import aiohttp


class NotificationChannel(ABC):
    """通知渠道基类"""

    @abstractmethod
    async def send(self, message: Dict) -> bool:
        pass


class SlackNotification(NotificationChannel):
    """Slack 通知"""

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    async def send(self, message: Dict) -> bool:
        """发送 Slack 消息"""
        async with aiohttp.ClientSession() as session:
            # 构建 Slack 消息
            slack_message = {
                "text": message.get("title", "AIOps 通知"),
                "attachments": [
                    {
                        "color": self._severity_to_color(message.get("severity", "info")),
                        "title": message.get("title"),
                        "text": message.get("description"),
                        "fields": [
                            {"title": "资源", "value": message.get("resource", "N/A"), "short": True},
                            {"title": "严重程度", "value": message.get("severity", "N/A"), "short": True}
                        ],
                        "footer": "AIOps Platform",
                        "ts": message.get("timestamp")
                    }
                ]
            }

            async with session.post(self.webhook_url, json=slack_message) as resp:
                return resp.status == 200

    def _severity_to_color(self, severity: str) -> str:
        colors = {
            "critical": "#ff0000",
            "high": "#ff6600",
            "medium": "#ffcc00",
            "low": "#00ff00",
            "info": "#0066ff"
        }
        return colors.get(severity.lower(), "#808080")


class DingTalkNotification(NotificationChannel):
    """钉钉通知"""

    def __init__(self, webhook_url: str, secret: str = None):
        self.webhook_url = webhook_url
        self.secret = secret

    async def send(self, message: Dict) -> bool:
        """发送钉钉消息"""
        import hashlib
        import hmac
        import base64
        import urllib.parse
        import time

        url = self.webhook_url

        # 签名
        if self.secret:
            timestamp = str(round(time.time() * 1000))
            string_to_sign = f"{timestamp}\n{self.secret}"
            hmac_code = hmac.new(
                self.secret.encode(),
                string_to_sign.encode(),
                digestmod=hashlib.sha256
            ).digest()
            sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
            url = f"{url}&timestamp={timestamp}&sign={sign}"

        # 构建消息
        dingtalk_message = {
            "msgtype": "markdown",
            "markdown": {
                "title": message.get("title", "AIOps 通知"),
                "text": f"""### {message.get('title')}
> **资源**: {message.get('resource', 'N/A')}
> **严重程度**: {message.get('severity', 'N/A')}

{message.get('description', '')}
"""
            }
        }

        async with aiohttp.ClientSession() as session:
            async with session.post(url, json=dingtalk_message) as resp:
                result = await resp.json()
                return result.get("errcode") == 0


class EmailNotification(NotificationChannel):
    """邮件通知"""

    def __init__(self, smtp_host: str, smtp_port: int,
                 username: str, password: str,
                 from_addr: str):
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.from_addr = from_addr

    async def send(self, message: Dict) -> bool:
        """发送邮件"""
        import aiosmtplib
        from email.mime.text import MIMEText
        from email.mime.multipart import MIMEMultipart

        msg = MIMEMultipart('alternative')
        msg['Subject'] = f"[AIOps] {message.get('title', 'Alert')}"
        msg['From'] = self.from_addr
        msg['To'] = ", ".join(message.get('recipients', []))

        # HTML 内容
        html = f"""
        <html>
        <body>
            <h2>{message.get('title')}</h2>
            <table border="1" cellpadding="5">
                <tr><td><b>资源</b></td><td>{message.get('resource', 'N/A')}</td></tr>
                <tr><td><b>严重程度</b></td><td>{message.get('severity', 'N/A')}</td></tr>
                <tr><td><b>时间</b></td><td>{message.get('timestamp', 'N/A')}</td></tr>
            </table>
            <p>{message.get('description', '')}</p>
        </body>
        </html>
        """
        msg.attach(MIMEText(html, 'html'))

        try:
            await aiosmtplib.send(
                msg,
                hostname=self.smtp_host,
                port=self.smtp_port,
                username=self.username,
                password=self.password,
                use_tls=True
            )
            return True
        except Exception:
            return False


class NotificationService:
    """通知服务"""

    def __init__(self):
        self.channels: Dict[str, NotificationChannel] = {}

    def add_channel(self, name: str, channel: NotificationChannel):
        """添加通知渠道"""
        self.channels[name] = channel

    async def notify(self, message: Dict, channels: List[str] = None):
        """发送通知"""
        if channels is None:
            channels = list(self.channels.keys())

        results = {}
        for channel_name in channels:
            channel = self.channels.get(channel_name)
            if channel:
                success = await channel.send(message)
                results[channel_name] = success

        return results

4. 最佳实践

4.1 运维流程自动化

# 运维自动化流程配置

workflows:
  # 告警响应流程
  alert_response:
    trigger:
      type: alert
      conditions:
        severity: [critical, high]

    steps:
      - name: enrich_alert
        action: fetch_context
        params:
          include:
            - recent_changes
            - related_alerts
            - resource_health

      - name: analyze_root_cause
        action: run_rca
        params:
          time_window: 30m
          methods: [topology, temporal]

      - name: create_incident
        action: create_ticket
        params:
          system: jira
          project: OPS
          type: Incident
          priority: "{{ alert.severity }}"

      - name: notify_team
        action: send_notification
        params:
          channels: [slack, pagerduty]
          escalation_policy: default

      - name: attempt_auto_remediation
        condition: "{{ rca.confidence > 0.8 }}"
        action: execute_remediation
        params:
          max_attempts: 1
          rollback_on_failure: true

  # 容量预警流程
  capacity_alert:
    trigger:
      type: schedule
      cron: "0 */6 * * *"  # 每6小时

    steps:
      - name: forecast_capacity
        action: run_capacity_forecast
        params:
          horizon: 7d
          resources: all

      - name: check_thresholds
        action: evaluate_thresholds
        params:
          warning: 70%
          critical: 85%

      - name: generate_recommendations
        condition: "{{ forecast.saturation_time < 7d }}"
        action: create_recommendations
        params:
          include_cost_analysis: true

      - name: create_capacity_ticket
        condition: "{{ forecast.saturation_time < 3d }}"
        action: create_ticket
        params:
          system: jira
          project: CAP
          type: Task

  # 部署监控流程
  deployment_monitoring:
    trigger:
      type: event
      source: kubernetes
      event_type: deployment

    steps:
      - name: track_deployment
        action: create_deployment_marker
        params:
          dashboards: [production-overview]

      - name: monitor_metrics
        action: start_monitoring
        params:
          duration: 30m
          metrics:
            - error_rate
            - latency_p99
            - cpu_usage

      - name: detect_regression
        action: compare_baselines
        params:
          baseline_window: 1h
          threshold_multiplier: 1.5

      - name: auto_rollback
        condition: "{{ regression.detected and regression.severity == 'critical' }}"
        action: rollback_deployment
        params:
          notify: true
          ticket: create

4.2 模型运维最佳实践

"""
AIOps 模型运维最佳实践
"""

model_ops_config = {
    # 模型监控
    "monitoring": {
        "metrics": [
            "prediction_latency",
            "prediction_accuracy",
            "feature_drift",
            "concept_drift",
            "model_staleness"
        ],
        "alerting": {
            "accuracy_drop": {
                "threshold": 0.1,
                "window": "1h",
                "action": "notify_ml_team"
            },
            "latency_spike": {
                "threshold": "100ms",
                "action": "scale_inference"
            }
        }
    },

    # 模型更新策略
    "update_strategy": {
        "anomaly_detection": {
            "retrain_schedule": "weekly",
            "online_learning": True,
            "validation_required": True,
            "canary_rollout": True
        },
        "root_cause": {
            "retrain_schedule": "monthly",
            "requires_labeled_data": True,
            "human_review": True
        },
        "capacity_forecast": {
            "retrain_schedule": "daily",
            "incremental_training": True,
            "auto_deploy": True
        }
    },

    # 特征管理
    "feature_management": {
        "feature_store": "feast",
        "version_control": True,
        "lineage_tracking": True,
        "freshness_requirements": {
            "real_time": "< 1s",
            "near_real_time": "< 1m",
            "batch": "< 1h"
        }
    },

    # 模型版本控制
    "version_control": {
        "registry": "mlflow",
        "artifact_storage": "s3://aiops-models",
        "metadata": [
            "training_data_version",
            "feature_versions",
            "hyperparameters",
            "performance_metrics"
        ]
    }
}

4.3 安全与合规

# 安全与合规配置

security:
  authentication:
    method: oidc
    provider: keycloak
    required_claims:
      - email
      - groups

  authorization:
    rbac:
      enabled: true
      roles:
        - name: viewer
          permissions:
            - read:alerts
            - read:dashboards
            - read:reports

        - name: operator
          permissions:
            - read:*
            - write:alerts
            - execute:remediation
            - acknowledge:incidents

        - name: admin
          permissions:
            - "*"

  audit:
    enabled: true
    events:
      - type: auth
        level: all
      - type: config_change
        level: all
      - type: remediation
        level: all
      - type: data_access
        level: sensitive

  data_protection:
    encryption:
      at_rest: aes-256-gcm
      in_transit: tls-1.3

    pii_handling:
      detection: enabled
      masking: enabled
      retention: 90d

    data_retention:
      metrics: 90d
      logs: 30d
      traces: 7d
      alerts: 365d

compliance:
  frameworks:
    - soc2
    - iso27001
    - gdpr

  controls:
    access_review:
      frequency: quarterly
      automated: true

    change_management:
      approval_required: true
      rollback_capability: required

    incident_response:
      sla:
        critical: 15m
        high: 1h
        medium: 4h
        low: 24h

5. 平台演进路线

5.1 成熟度演进

┌─────────────────────────────────────────────────────────────────┐
│                    AIOps 平台演进路线                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Phase 1: 基础建设 (3-6月)                                       │
│  ├─ 统一数据采集平台                                             │
│  ├─ 基础异常检测 (阈值 + 统计)                                   │
│  ├─ 告警聚合与降噪                                               │
│  └─ 仪表盘与可视化                                               │
│                                                                 │
│  Phase 2: 智能化 (6-12月)                                        │
│  ├─ ML 异常检测 (时序 + 日志)                                    │
│  ├─ 基础根因分析                                                 │
│  ├─ 容量预测                                                     │
│  └─ 知识库建设                                                   │
│                                                                 │
│  Phase 3: 自动化 (12-18月)                                       │
│  ├─ 自动修复引擎                                                 │
│  ├─ 智能伸缩                                                     │
│  ├─ 预测性维护                                                   │
│  └─ ChatOps 集成                                                 │
│                                                                 │
│  Phase 4: 高级 (18-24月)                                         │
│  ├─ 因果推断 RCA                                                 │
│  ├─ RL 自动调优                                                  │
│  ├─ 全链路可观测性                                               │
│  └─ 自适应学习系统                                               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

5.2 关键指标

"""
AIOps 平台关键指标
"""

kpis = {
    "运维效率": {
        "MTTR": {
            "定义": "平均修复时间",
            "目标": "< 30分钟",
            "当前": "2小时"
        },
        "MTTD": {
            "定义": "平均检测时间",
            "目标": "< 5分钟",
            "当前": "15分钟"
        },
        "incident_reduction": {
            "定义": "事件减少率",
            "目标": "> 50%",
            "当前": "30%"
        }
    },

    "AI 效果": {
        "anomaly_precision": {
            "定义": "异常检测精确率",
            "目标": "> 90%",
            "当前": "85%"
        },
        "rca_accuracy": {
            "定义": "根因分析准确率",
            "目标": "> 80%",
            "当前": "70%"
        },
        "auto_remediation_success": {
            "定义": "自动修复成功率",
            "目标": "> 95%",
            "当前": "80%"
        }
    },

    "业务价值": {
        "downtime_reduction": {
            "定义": "停机时间减少",
            "目标": "> 70%",
            "当前": "40%"
        },
        "cost_savings": {
            "定义": "成本节省",
            "目标": "> 30%",
            "当前": "15%"
        },
        "engineer_productivity": {
            "定义": "工程师效率提升",
            "目标": "> 50%",
            "当前": "25%"
        }
    }
}

总结

本章完成了 AIOps 平台实战的全面探讨:

  1. 平台架构:分层架构设计与 K8s 部署方案
  2. 核心模块:数据采集、流处理、API 服务实现
  3. 系统集成:监控、工单、通知系统对接
  4. 最佳实践:运维流程自动化、模型运维、安全合规

关键收获:

  • AIOps 是渐进式演进的过程
  • 数据质量是 AI 效果的基础
  • 人机协作比完全自动化更现实
  • 持续反馈和优化至关重要

通过本章的学习,读者应该能够设计和实现一个企业级的 AIOps 平台,提升运维效率和系统可靠性。

Prev
04-智能运维决策