HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

01-AIOps概述与架构

概述

AIOps (Artificial Intelligence for IT Operations) 是将人工智能和机器学习技术应用于 IT 运维领域的实践。本章介绍 AIOps 的核心概念、架构设计和关键能力。

1. AIOps 核心概念

1.1 什么是 AIOps

┌─────────────────────────────────────────────────────────────────┐
│                      AIOps 定义                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  AIOps = Big Data + Machine Learning + IT Operations           │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    核心能力                               │   │
│  │                                                         │   │
│  │   数据聚合          异常检测          根因分析           │   │
│  │   ├─ 日志           ├─ 阈值检测       ├─ 拓扑分析        │   │
│  │   ├─ 指标           ├─ 统计异常       ├─ 时序关联        │   │
│  │   ├─ 追踪           ├─ 模式异常       ├─ 因果推断        │   │
│  │   └─ 事件           └─ 预测异常       └─ 知识图谱        │   │
│  │                                                         │   │
│  │   智能告警          自动修复          容量预测           │   │
│  │   ├─ 告警聚合       ├─ 自愈脚本       ├─ 资源预测        │   │
│  │   ├─ 告警抑制       ├─ 变更回滚       ├─ 弹性伸缩        │   │
│  │   └─ 告警关联       └─ 故障转移       └─ 成本优化        │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 AIOps 发展阶段

┌─────────────────────────────────────────────────────────────────┐
│                    AIOps 成熟度模型                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Level 0: 手动运维                                              │
│  ├─ 人工监控和告警响应                                          │
│  ├─ 基于阈值的简单告警                                          │
│  └─ 事后分析和修复                                              │
│                                                                 │
│  Level 1: 基础自动化                                            │
│  ├─ 集中化监控平台                                              │
│  ├─ 基本告警规则                                                │
│  └─ 脚本化运维操作                                              │
│                                                                 │
│  Level 2: 智能辅助                                              │
│  ├─ 异常检测算法                                                │
│  ├─ 告警聚合和关联                                              │
│  └─ 辅助决策建议                                                │
│                                                                 │
│  Level 3: 智能自动化                                            │
│  ├─ 自动根因分析                                                │
│  ├─ 智能告警处理                                                │
│  └─ 部分自动修复                                                │
│                                                                 │
│  Level 4: 全面智能化                                            │
│  ├─ 预测性运维                                                  │
│  ├─ 自动故障修复                                                │
│  └─ 持续优化学习                                                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.3 关键挑战

"""
AIOps 面临的关键挑战
"""

challenges = {
    "数据挑战": {
        "volume": "海量数据处理 - 每天 TB 级日志和指标",
        "variety": "多样性 - 结构化/半结构化/非结构化",
        "velocity": "实时性 - 秒级延迟要求",
        "veracity": "准确性 - 噪声数据过滤"
    },

    "算法挑战": {
        "cold_start": "冷启动 - 新系统缺乏历史数据",
        "concept_drift": "概念漂移 - 系统行为动态变化",
        "imbalance": "类别不平衡 - 异常样本稀少",
        "interpretability": "可解释性 - 运维人员需要理解决策"
    },

    "工程挑战": {
        "integration": "系统集成 - 对接多种监控工具",
        "scalability": "可扩展性 - 支持大规模部署",
        "reliability": "可靠性 - AIOps 系统自身的稳定性",
        "feedback": "反馈闭环 - 持续学习和优化"
    },

    "组织挑战": {
        "trust": "信任 - 运维人员对 AI 决策的信任",
        "adoption": "采纳 - 改变传统运维流程",
        "skills": "技能 - 跨领域人才需求",
        "roi": "ROI - 投入产出比证明"
    }
}

2. AIOps 架构设计

2.1 整体架构

┌─────────────────────────────────────────────────────────────────┐
│                      AIOps 平台架构                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    应用层                                 │   │
│  │  智能告警 │ 根因分析 │ 容量预测 │ 自动修复 │ 智能问答     │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    AI 引擎层                              │   │
│  │  异常检测 │ 关联分析 │ 预测模型 │ NLP │ 知识图谱          │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    数据处理层                             │   │
│  │  实时流处理 │ 批处理 │ 特征工程 │ 数据清洗               │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    数据采集层                             │   │
│  │  Agent │ API │ Webhook │ Kafka │ 日志采集               │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    数据源                                 │   │
│  │  Prometheus │ Elasticsearch │ Jaeger │ CMDB │ 工单系统   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2.2 数据流架构

"""
AIOps 数据流架构实现
"""

from dataclasses import dataclass
from typing import Dict, List, Any, Optional
from enum import Enum
import asyncio
from abc import ABC, abstractmethod


class DataType(Enum):
    """数据类型"""
    METRIC = "metric"
    LOG = "log"
    TRACE = "trace"
    EVENT = "event"
    ALERT = "alert"


@dataclass
class DataPoint:
    """数据点"""
    data_type: DataType
    timestamp: float
    source: str
    tags: Dict[str, str]
    fields: Dict[str, Any]
    raw_data: Optional[str] = None


class DataCollector(ABC):
    """数据采集器基类"""

    @abstractmethod
    async def collect(self) -> List[DataPoint]:
        """采集数据"""
        pass


class MetricCollector(DataCollector):
    """指标采集器"""

    def __init__(self, prometheus_url: str):
        self.prometheus_url = prometheus_url

    async def collect(self) -> List[DataPoint]:
        """从 Prometheus 采集指标"""
        import aiohttp

        async with aiohttp.ClientSession() as session:
            # 查询所有指标
            query = 'up'  # 示例查询
            url = f"{self.prometheus_url}/api/v1/query"

            async with session.get(url, params={"query": query}) as resp:
                data = await resp.json()

                points = []
                for result in data.get("data", {}).get("result", []):
                    metric = result["metric"]
                    value = result["value"]

                    points.append(DataPoint(
                        data_type=DataType.METRIC,
                        timestamp=float(value[0]),
                        source="prometheus",
                        tags=metric,
                        fields={"value": float(value[1])}
                    ))

                return points


class LogCollector(DataCollector):
    """日志采集器"""

    def __init__(self, elasticsearch_url: str, index_pattern: str = "logs-*"):
        self.elasticsearch_url = elasticsearch_url
        self.index_pattern = index_pattern

    async def collect(self) -> List[DataPoint]:
        """从 Elasticsearch 采集日志"""
        import aiohttp

        async with aiohttp.ClientSession() as session:
            url = f"{self.elasticsearch_url}/{self.index_pattern}/_search"

            query = {
                "query": {
                    "range": {
                        "@timestamp": {
                            "gte": "now-5m",
                            "lt": "now"
                        }
                    }
                },
                "size": 1000,
                "sort": [{"@timestamp": "desc"}]
            }

            async with session.post(url, json=query) as resp:
                data = await resp.json()

                points = []
                for hit in data.get("hits", {}).get("hits", []):
                    source = hit["_source"]

                    points.append(DataPoint(
                        data_type=DataType.LOG,
                        timestamp=source.get("@timestamp", 0),
                        source="elasticsearch",
                        tags={
                            "host": source.get("host", ""),
                            "service": source.get("service", ""),
                            "level": source.get("level", "")
                        },
                        fields={
                            "message": source.get("message", "")
                        },
                        raw_data=source.get("message", "")
                    ))

                return points


class DataPipeline:
    """数据处理管道"""

    def __init__(self):
        self.collectors: List[DataCollector] = []
        self.processors: List['DataProcessor'] = []
        self.sinks: List['DataSink'] = []

    def add_collector(self, collector: DataCollector):
        """添加采集器"""
        self.collectors.append(collector)

    def add_processor(self, processor: 'DataProcessor'):
        """添加处理器"""
        self.processors.append(processor)

    def add_sink(self, sink: 'DataSink'):
        """添加输出"""
        self.sinks.append(sink)

    async def run(self):
        """运行管道"""
        while True:
            # 并行采集数据
            tasks = [collector.collect() for collector in self.collectors]
            results = await asyncio.gather(*tasks)

            # 合并数据点
            all_points = []
            for points in results:
                all_points.extend(points)

            # 处理数据
            for processor in self.processors:
                all_points = await processor.process(all_points)

            # 输出数据
            for sink in self.sinks:
                await sink.write(all_points)

            await asyncio.sleep(10)  # 采集间隔


class DataProcessor(ABC):
    """数据处理器基类"""

    @abstractmethod
    async def process(self, points: List[DataPoint]) -> List[DataPoint]:
        """处理数据"""
        pass


class FeatureExtractor(DataProcessor):
    """特征提取器"""

    async def process(self, points: List[DataPoint]) -> List[DataPoint]:
        """提取特征"""
        for point in points:
            if point.data_type == DataType.LOG:
                # 提取日志特征
                message = point.raw_data or ""
                point.fields["length"] = len(message)
                point.fields["word_count"] = len(message.split())
                point.fields["has_error"] = "error" in message.lower()
                point.fields["has_exception"] = "exception" in message.lower()

        return points


class DataSink(ABC):
    """数据输出基类"""

    @abstractmethod
    async def write(self, points: List[DataPoint]):
        """写入数据"""
        pass


class KafkaSink(DataSink):
    """Kafka 输出"""

    def __init__(self, bootstrap_servers: str, topic: str):
        self.bootstrap_servers = bootstrap_servers
        self.topic = topic
        self.producer = None

    async def write(self, points: List[DataPoint]):
        """写入 Kafka"""
        from aiokafka import AIOKafkaProducer
        import json

        if self.producer is None:
            self.producer = AIOKafkaProducer(
                bootstrap_servers=self.bootstrap_servers
            )
            await self.producer.start()

        for point in points:
            value = json.dumps({
                "type": point.data_type.value,
                "timestamp": point.timestamp,
                "source": point.source,
                "tags": point.tags,
                "fields": point.fields
            }).encode()

            await self.producer.send_and_wait(self.topic, value)

2.3 AI 引擎架构

"""
AIOps AI 引擎架构
"""

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import numpy as np


class ModelType(Enum):
    """模型类型"""
    ANOMALY_DETECTION = "anomaly_detection"
    ROOT_CAUSE = "root_cause"
    FORECASTING = "forecasting"
    CLASSIFICATION = "classification"
    CLUSTERING = "clustering"


@dataclass
class ModelConfig:
    """模型配置"""
    model_id: str
    model_type: ModelType
    model_class: str
    parameters: Dict[str, Any]
    features: List[str]
    target: Optional[str] = None


@dataclass
class PredictionResult:
    """预测结果"""
    model_id: str
    timestamp: float
    prediction: Any
    confidence: float
    explanation: Optional[Dict[str, Any]] = None


class AIModel(ABC):
    """AI 模型基类"""

    def __init__(self, config: ModelConfig):
        self.config = config
        self.model = None

    @abstractmethod
    def train(self, X: np.ndarray, y: Optional[np.ndarray] = None):
        """训练模型"""
        pass

    @abstractmethod
    def predict(self, X: np.ndarray) -> List[PredictionResult]:
        """预测"""
        pass

    def save(self, path: str):
        """保存模型"""
        import joblib
        joblib.dump(self.model, path)

    def load(self, path: str):
        """加载模型"""
        import joblib
        self.model = joblib.load(path)


class AIEngine:
    """AI 引擎"""

    def __init__(self):
        self.models: Dict[str, AIModel] = {}
        self.model_registry: Dict[str, ModelConfig] = {}

    def register_model(self, config: ModelConfig):
        """注册模型配置"""
        self.model_registry[config.model_id] = config

    def load_model(self, model_id: str, model_path: str):
        """加载模型"""
        if model_id not in self.model_registry:
            raise ValueError(f"模型 {model_id} 未注册")

        config = self.model_registry[model_id]
        model = self._create_model(config)
        model.load(model_path)
        self.models[model_id] = model

    def _create_model(self, config: ModelConfig) -> AIModel:
        """创建模型实例"""
        model_classes = {
            "IsolationForestModel": IsolationForestModel,
            "LSTMForecastModel": LSTMForecastModel,
            "XGBClassifier": XGBClassifierModel,
            "KMeansCluster": KMeansClusterModel
        }

        model_class = model_classes.get(config.model_class)
        if model_class is None:
            raise ValueError(f"未知模型类: {config.model_class}")

        return model_class(config)

    def predict(self, model_id: str, data: np.ndarray) -> List[PredictionResult]:
        """使用模型预测"""
        if model_id not in self.models:
            raise ValueError(f"模型 {model_id} 未加载")

        return self.models[model_id].predict(data)

    def batch_predict(self, data: Dict[str, np.ndarray]) -> Dict[str, List[PredictionResult]]:
        """批量预测"""
        results = {}
        for model_id, model_data in data.items():
            if model_id in self.models:
                results[model_id] = self.predict(model_id, model_data)
        return results


class IsolationForestModel(AIModel):
    """孤立森林异常检测模型"""

    def train(self, X: np.ndarray, y: Optional[np.ndarray] = None):
        """训练模型"""
        from sklearn.ensemble import IsolationForest

        self.model = IsolationForest(
            n_estimators=self.config.parameters.get("n_estimators", 100),
            contamination=self.config.parameters.get("contamination", 0.1),
            random_state=42
        )
        self.model.fit(X)

    def predict(self, X: np.ndarray) -> List[PredictionResult]:
        """预测异常"""
        import time

        predictions = self.model.predict(X)
        scores = self.model.score_samples(X)

        results = []
        for i, (pred, score) in enumerate(zip(predictions, scores)):
            is_anomaly = pred == -1
            confidence = 1 - (score - scores.min()) / (scores.max() - scores.min())

            results.append(PredictionResult(
                model_id=self.config.model_id,
                timestamp=time.time(),
                prediction={"is_anomaly": is_anomaly, "anomaly_score": float(score)},
                confidence=float(confidence),
                explanation={"feature_contributions": self._get_feature_contributions(X[i])}
            ))

        return results

    def _get_feature_contributions(self, x: np.ndarray) -> Dict[str, float]:
        """计算特征贡献"""
        # 简化的特征贡献计算
        contributions = {}
        for i, feature in enumerate(self.config.features):
            contributions[feature] = float(abs(x[i]))
        return contributions


class LSTMForecastModel(AIModel):
    """LSTM 预测模型"""

    def train(self, X: np.ndarray, y: Optional[np.ndarray] = None):
        """训练模型"""
        import torch
        import torch.nn as nn

        # 定义 LSTM 模型
        class LSTMNet(nn.Module):
            def __init__(self, input_size, hidden_size, num_layers, output_size):
                super().__init__()
                self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
                self.fc = nn.Linear(hidden_size, output_size)

            def forward(self, x):
                lstm_out, _ = self.lstm(x)
                return self.fc(lstm_out[:, -1, :])

        params = self.config.parameters
        self.model = LSTMNet(
            input_size=params.get("input_size", 1),
            hidden_size=params.get("hidden_size", 64),
            num_layers=params.get("num_layers", 2),
            output_size=params.get("output_size", 1)
        )

        # 训练逻辑...

    def predict(self, X: np.ndarray) -> List[PredictionResult]:
        """预测"""
        import torch
        import time

        self.model.eval()
        with torch.no_grad():
            X_tensor = torch.FloatTensor(X)
            predictions = self.model(X_tensor).numpy()

        results = []
        for pred in predictions:
            results.append(PredictionResult(
                model_id=self.config.model_id,
                timestamp=time.time(),
                prediction={"forecast": float(pred[0])},
                confidence=0.9  # 需要更复杂的置信度计算
            ))

        return results


class XGBClassifierModel(AIModel):
    """XGBoost 分类模型"""

    def train(self, X: np.ndarray, y: Optional[np.ndarray] = None):
        """训练模型"""
        import xgboost as xgb

        self.model = xgb.XGBClassifier(
            n_estimators=self.config.parameters.get("n_estimators", 100),
            max_depth=self.config.parameters.get("max_depth", 6),
            learning_rate=self.config.parameters.get("learning_rate", 0.1)
        )
        self.model.fit(X, y)

    def predict(self, X: np.ndarray) -> List[PredictionResult]:
        """预测"""
        import time

        predictions = self.model.predict(X)
        probabilities = self.model.predict_proba(X)

        results = []
        for pred, prob in zip(predictions, probabilities):
            results.append(PredictionResult(
                model_id=self.config.model_id,
                timestamp=time.time(),
                prediction={"class": int(pred)},
                confidence=float(max(prob)),
                explanation=self._get_shap_explanation(X[0]) if len(X) == 1 else None
            ))

        return results

    def _get_shap_explanation(self, x: np.ndarray) -> Dict[str, Any]:
        """获取 SHAP 解释"""
        import shap

        explainer = shap.TreeExplainer(self.model)
        shap_values = explainer.shap_values(x.reshape(1, -1))

        return {
            "shap_values": shap_values[0].tolist() if isinstance(shap_values, list) else shap_values.tolist(),
            "base_value": float(explainer.expected_value) if not isinstance(explainer.expected_value, list) else float(explainer.expected_value[0])
        }


class KMeansClusterModel(AIModel):
    """KMeans 聚类模型"""

    def train(self, X: np.ndarray, y: Optional[np.ndarray] = None):
        """训练模型"""
        from sklearn.cluster import KMeans

        self.model = KMeans(
            n_clusters=self.config.parameters.get("n_clusters", 5),
            random_state=42
        )
        self.model.fit(X)

    def predict(self, X: np.ndarray) -> List[PredictionResult]:
        """预测聚类"""
        import time

        clusters = self.model.predict(X)
        distances = self.model.transform(X)

        results = []
        for cluster, dist in zip(clusters, distances):
            results.append(PredictionResult(
                model_id=self.config.model_id,
                timestamp=time.time(),
                prediction={"cluster": int(cluster)},
                confidence=1 - float(dist[cluster]) / dist.sum(),
                explanation={"distances_to_centroids": dist.tolist()}
            ))

        return results

3. 可观测性数据模型

3.1 统一数据模型

"""
AIOps 统一可观测性数据模型
"""

from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from enum import Enum
import time
import uuid


class Severity(Enum):
    """严重程度"""
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"


@dataclass
class Resource:
    """资源标识"""
    type: str  # host, pod, service, etc.
    name: str
    namespace: Optional[str] = None
    attributes: Dict[str, str] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, Any]:
        return {
            "type": self.type,
            "name": self.name,
            "namespace": self.namespace,
            "attributes": self.attributes
        }


@dataclass
class Metric:
    """指标数据"""
    name: str
    value: float
    timestamp: float
    resource: Resource
    labels: Dict[str, str] = field(default_factory=dict)
    unit: Optional[str] = None

    def to_dict(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "value": self.value,
            "timestamp": self.timestamp,
            "resource": self.resource.to_dict(),
            "labels": self.labels,
            "unit": self.unit
        }


@dataclass
class LogEntry:
    """日志条目"""
    timestamp: float
    message: str
    resource: Resource
    level: str = "INFO"
    attributes: Dict[str, Any] = field(default_factory=dict)
    trace_id: Optional[str] = None
    span_id: Optional[str] = None

    def to_dict(self) -> Dict[str, Any]:
        return {
            "timestamp": self.timestamp,
            "message": self.message,
            "resource": self.resource.to_dict(),
            "level": self.level,
            "attributes": self.attributes,
            "trace_id": self.trace_id,
            "span_id": self.span_id
        }


@dataclass
class Span:
    """追踪 Span"""
    trace_id: str
    span_id: str
    parent_span_id: Optional[str]
    operation_name: str
    service_name: str
    start_time: float
    end_time: float
    status: str = "OK"
    attributes: Dict[str, Any] = field(default_factory=dict)
    events: List[Dict[str, Any]] = field(default_factory=list)

    @property
    def duration_ms(self) -> float:
        return (self.end_time - self.start_time) * 1000

    def to_dict(self) -> Dict[str, Any]:
        return {
            "trace_id": self.trace_id,
            "span_id": self.span_id,
            "parent_span_id": self.parent_span_id,
            "operation_name": self.operation_name,
            "service_name": self.service_name,
            "start_time": self.start_time,
            "end_time": self.end_time,
            "duration_ms": self.duration_ms,
            "status": self.status,
            "attributes": self.attributes,
            "events": self.events
        }


@dataclass
class Alert:
    """告警"""
    alert_id: str
    name: str
    severity: Severity
    resource: Resource
    message: str
    timestamp: float
    status: str = "firing"  # firing, resolved
    labels: Dict[str, str] = field(default_factory=dict)
    annotations: Dict[str, str] = field(default_factory=dict)
    fingerprint: Optional[str] = None

    def to_dict(self) -> Dict[str, Any]:
        return {
            "alert_id": self.alert_id,
            "name": self.name,
            "severity": self.severity.value,
            "resource": self.resource.to_dict(),
            "message": self.message,
            "timestamp": self.timestamp,
            "status": self.status,
            "labels": self.labels,
            "annotations": self.annotations,
            "fingerprint": self.fingerprint
        }


@dataclass
class Incident:
    """事件/故障"""
    incident_id: str
    title: str
    severity: Severity
    status: str  # open, investigating, resolved, closed
    created_at: float
    updated_at: float
    resolved_at: Optional[float] = None
    affected_resources: List[Resource] = field(default_factory=list)
    related_alerts: List[str] = field(default_factory=list)
    root_cause: Optional[str] = None
    resolution: Optional[str] = None
    timeline: List[Dict[str, Any]] = field(default_factory=list)

    def to_dict(self) -> Dict[str, Any]:
        return {
            "incident_id": self.incident_id,
            "title": self.title,
            "severity": self.severity.value,
            "status": self.status,
            "created_at": self.created_at,
            "updated_at": self.updated_at,
            "resolved_at": self.resolved_at,
            "affected_resources": [r.to_dict() for r in self.affected_resources],
            "related_alerts": self.related_alerts,
            "root_cause": self.root_cause,
            "resolution": self.resolution,
            "timeline": self.timeline
        }


class ObservabilityStore:
    """可观测性数据存储"""

    def __init__(self):
        self.metrics: List[Metric] = []
        self.logs: List[LogEntry] = []
        self.spans: List[Span] = []
        self.alerts: List[Alert] = []
        self.incidents: Dict[str, Incident] = {}

    def store_metric(self, metric: Metric):
        """存储指标"""
        self.metrics.append(metric)

    def store_log(self, log: LogEntry):
        """存储日志"""
        self.logs.append(log)

    def store_span(self, span: Span):
        """存储 Span"""
        self.spans.append(span)

    def store_alert(self, alert: Alert):
        """存储告警"""
        self.alerts.append(alert)

    def create_incident(self, title: str, severity: Severity,
                        affected_resources: List[Resource],
                        related_alerts: List[str]) -> Incident:
        """创建事件"""
        now = time.time()
        incident = Incident(
            incident_id=str(uuid.uuid4()),
            title=title,
            severity=severity,
            status="open",
            created_at=now,
            updated_at=now,
            affected_resources=affected_resources,
            related_alerts=related_alerts,
            timeline=[{
                "timestamp": now,
                "action": "created",
                "description": f"事件创建: {title}"
            }]
        )
        self.incidents[incident.incident_id] = incident
        return incident

    def query_metrics(self, name: str, resource_type: Optional[str] = None,
                      start_time: Optional[float] = None,
                      end_time: Optional[float] = None) -> List[Metric]:
        """查询指标"""
        results = []
        for metric in self.metrics:
            if metric.name != name:
                continue
            if resource_type and metric.resource.type != resource_type:
                continue
            if start_time and metric.timestamp < start_time:
                continue
            if end_time and metric.timestamp > end_time:
                continue
            results.append(metric)
        return results

    def query_logs(self, resource: Optional[Resource] = None,
                   level: Optional[str] = None,
                   start_time: Optional[float] = None,
                   end_time: Optional[float] = None,
                   keyword: Optional[str] = None) -> List[LogEntry]:
        """查询日志"""
        results = []
        for log in self.logs:
            if resource and log.resource.name != resource.name:
                continue
            if level and log.level != level:
                continue
            if start_time and log.timestamp < start_time:
                continue
            if end_time and log.timestamp > end_time:
                continue
            if keyword and keyword.lower() not in log.message.lower():
                continue
            results.append(log)
        return results

    def get_trace(self, trace_id: str) -> List[Span]:
        """获取完整追踪"""
        return [span for span in self.spans if span.trace_id == trace_id]

    def get_service_topology(self) -> Dict[str, List[str]]:
        """构建服务拓扑"""
        topology = {}

        for span in self.spans:
            service = span.service_name
            if service not in topology:
                topology[service] = set()

            # 查找子 Span
            child_services = set()
            for other_span in self.spans:
                if other_span.parent_span_id == span.span_id:
                    child_services.add(other_span.service_name)

            topology[service].update(child_services)

        return {k: list(v) for k, v in topology.items()}

3.2 CMDB 集成

"""
CMDB (配置管理数据库) 集成
"""

from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from enum import Enum


class CIType(Enum):
    """配置项类型"""
    HOST = "host"
    VM = "vm"
    CONTAINER = "container"
    POD = "pod"
    SERVICE = "service"
    DATABASE = "database"
    NETWORK = "network"
    APPLICATION = "application"


@dataclass
class ConfigurationItem:
    """配置项"""
    ci_id: str
    ci_type: CIType
    name: str
    status: str  # active, inactive, maintenance
    attributes: Dict[str, Any] = field(default_factory=dict)
    relationships: List['CIRelationship'] = field(default_factory=list)
    tags: Dict[str, str] = field(default_factory=dict)


@dataclass
class CIRelationship:
    """配置项关系"""
    relationship_type: str  # runs_on, depends_on, connects_to, contains
    source_ci_id: str
    target_ci_id: str
    attributes: Dict[str, Any] = field(default_factory=dict)


class CMDB:
    """配置管理数据库"""

    def __init__(self):
        self.cis: Dict[str, ConfigurationItem] = {}
        self.relationships: List[CIRelationship] = []

    def add_ci(self, ci: ConfigurationItem):
        """添加配置项"""
        self.cis[ci.ci_id] = ci

    def add_relationship(self, relationship: CIRelationship):
        """添加关系"""
        self.relationships.append(relationship)

        # 更新配置项的关系列表
        if relationship.source_ci_id in self.cis:
            self.cis[relationship.source_ci_id].relationships.append(relationship)

    def get_dependencies(self, ci_id: str) -> List[ConfigurationItem]:
        """获取依赖项"""
        dependencies = []
        for rel in self.relationships:
            if rel.source_ci_id == ci_id and rel.relationship_type == "depends_on":
                if rel.target_ci_id in self.cis:
                    dependencies.append(self.cis[rel.target_ci_id])
        return dependencies

    def get_dependents(self, ci_id: str) -> List[ConfigurationItem]:
        """获取被依赖项"""
        dependents = []
        for rel in self.relationships:
            if rel.target_ci_id == ci_id and rel.relationship_type == "depends_on":
                if rel.source_ci_id in self.cis:
                    dependents.append(self.cis[rel.source_ci_id])
        return dependents

    def get_impact_radius(self, ci_id: str, depth: int = 3) -> List[ConfigurationItem]:
        """获取影响半径"""
        impacted = set()
        to_visit = [ci_id]
        current_depth = 0

        while to_visit and current_depth < depth:
            next_level = []
            for current_id in to_visit:
                dependents = self.get_dependents(current_id)
                for dep in dependents:
                    if dep.ci_id not in impacted:
                        impacted.add(dep.ci_id)
                        next_level.append(dep.ci_id)

            to_visit = next_level
            current_depth += 1

        return [self.cis[ci_id] for ci_id in impacted if ci_id in self.cis]

    def build_topology_graph(self) -> Dict[str, Any]:
        """构建拓扑图"""
        nodes = []
        edges = []

        for ci_id, ci in self.cis.items():
            nodes.append({
                "id": ci_id,
                "label": ci.name,
                "type": ci.ci_type.value,
                "status": ci.status
            })

        for rel in self.relationships:
            edges.append({
                "source": rel.source_ci_id,
                "target": rel.target_ci_id,
                "type": rel.relationship_type
            })

        return {"nodes": nodes, "edges": edges}

    def sync_from_kubernetes(self, k8s_client):
        """从 Kubernetes 同步"""
        from kubernetes import client

        # 同步 Pods
        pods = k8s_client.list_pod_for_all_namespaces()
        for pod in pods.items:
            ci = ConfigurationItem(
                ci_id=f"pod:{pod.metadata.namespace}:{pod.metadata.name}",
                ci_type=CIType.POD,
                name=pod.metadata.name,
                status="active" if pod.status.phase == "Running" else "inactive",
                attributes={
                    "namespace": pod.metadata.namespace,
                    "node": pod.spec.node_name,
                    "ip": pod.status.pod_ip,
                    "containers": [c.name for c in pod.spec.containers]
                },
                tags=pod.metadata.labels or {}
            )
            self.add_ci(ci)

        # 同步 Services
        services = k8s_client.list_service_for_all_namespaces()
        for svc in services.items:
            ci = ConfigurationItem(
                ci_id=f"service:{svc.metadata.namespace}:{svc.metadata.name}",
                ci_type=CIType.SERVICE,
                name=svc.metadata.name,
                status="active",
                attributes={
                    "namespace": svc.metadata.namespace,
                    "type": svc.spec.type,
                    "cluster_ip": svc.spec.cluster_ip,
                    "ports": [{"port": p.port, "target_port": p.target_port} for p in svc.spec.ports or []]
                },
                tags=svc.metadata.labels or {}
            )
            self.add_ci(ci)

            # 创建 Service 到 Pod 的关系
            if svc.spec.selector:
                for pod_ci_id, pod_ci in self.cis.items():
                    if pod_ci.ci_type == CIType.POD:
                        if all(pod_ci.tags.get(k) == v for k, v in svc.spec.selector.items()):
                            rel = CIRelationship(
                                relationship_type="contains",
                                source_ci_id=ci.ci_id,
                                target_ci_id=pod_ci_id
                            )
                            self.add_relationship(rel)

4. 平台服务设计

4.1 AIOps API 服务

"""
AIOps 平台 API 服务
"""

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Dict, List, Any, Optional
from datetime import datetime
import asyncio


app = FastAPI(title="AIOps Platform API", version="1.0.0")


# ==================== 数据模型 ====================

class MetricQuery(BaseModel):
    """指标查询请求"""
    metric_name: str
    resource_type: Optional[str] = None
    labels: Optional[Dict[str, str]] = None
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    step: Optional[str] = "1m"


class LogQuery(BaseModel):
    """日志查询请求"""
    query: str
    resource: Optional[str] = None
    level: Optional[str] = None
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    limit: int = 100


class AlertQuery(BaseModel):
    """告警查询请求"""
    status: Optional[str] = None
    severity: Optional[str] = None
    resource: Optional[str] = None
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None


class AnomalyDetectionRequest(BaseModel):
    """异常检测请求"""
    metric_name: str
    resource: str
    lookback_minutes: int = 60
    sensitivity: float = 0.8


class RootCauseRequest(BaseModel):
    """根因分析请求"""
    incident_id: Optional[str] = None
    alert_ids: Optional[List[str]] = None
    time_window_minutes: int = 30


class ForecastRequest(BaseModel):
    """预测请求"""
    metric_name: str
    resource: str
    forecast_minutes: int = 60


# ==================== API 端点 ====================

@app.get("/api/v1/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy", "timestamp": datetime.now().isoformat()}


@app.post("/api/v1/metrics/query")
async def query_metrics(request: MetricQuery):
    """查询指标"""
    # 实际实现会查询 Prometheus 或时序数据库
    return {
        "metric_name": request.metric_name,
        "data": [
            {"timestamp": "2024-01-01T00:00:00Z", "value": 100},
            {"timestamp": "2024-01-01T00:01:00Z", "value": 102}
        ]
    }


@app.post("/api/v1/logs/query")
async def query_logs(request: LogQuery):
    """查询日志"""
    # 实际实现会查询 Elasticsearch
    return {
        "total": 100,
        "logs": [
            {
                "timestamp": "2024-01-01T00:00:00Z",
                "level": "ERROR",
                "message": "Connection timeout",
                "resource": "api-server"
            }
        ]
    }


@app.post("/api/v1/alerts/query")
async def query_alerts(request: AlertQuery):
    """查询告警"""
    return {
        "total": 10,
        "alerts": [
            {
                "alert_id": "alert-001",
                "name": "HighCPUUsage",
                "severity": "high",
                "status": "firing",
                "message": "CPU usage > 90%"
            }
        ]
    }


@app.get("/api/v1/traces/{trace_id}")
async def get_trace(trace_id: str):
    """获取追踪详情"""
    return {
        "trace_id": trace_id,
        "spans": [
            {
                "span_id": "span-001",
                "operation_name": "HTTP GET /api/users",
                "service_name": "api-server",
                "duration_ms": 150
            }
        ]
    }


@app.post("/api/v1/anomaly/detect")
async def detect_anomaly(request: AnomalyDetectionRequest):
    """异常检测"""
    # 调用 AI 引擎进行异常检测
    return {
        "is_anomaly": True,
        "confidence": 0.92,
        "anomaly_score": -0.85,
        "explanation": {
            "contributing_factors": [
                {"factor": "sudden_spike", "contribution": 0.6},
                {"factor": "pattern_deviation", "contribution": 0.4}
            ]
        }
    }


@app.post("/api/v1/rootcause/analyze")
async def analyze_root_cause(request: RootCauseRequest):
    """根因分析"""
    return {
        "analysis_id": "analysis-001",
        "probable_root_causes": [
            {
                "rank": 1,
                "resource": "database-primary",
                "issue": "Connection pool exhausted",
                "confidence": 0.85,
                "evidence": [
                    "DB connection count spiked at 10:15",
                    "Multiple services reporting timeout errors"
                ]
            },
            {
                "rank": 2,
                "resource": "api-server",
                "issue": "Memory leak",
                "confidence": 0.65,
                "evidence": [
                    "Memory usage trending upward over 24h"
                ]
            }
        ],
        "impact_analysis": {
            "affected_services": ["user-service", "order-service"],
            "affected_users": 1500,
            "estimated_revenue_impact": "$5000"
        }
    }


@app.post("/api/v1/forecast/predict")
async def forecast(request: ForecastRequest):
    """预测"""
    return {
        "metric_name": request.metric_name,
        "forecast": [
            {"timestamp": "2024-01-01T01:00:00Z", "value": 105, "lower": 100, "upper": 110},
            {"timestamp": "2024-01-01T02:00:00Z", "value": 108, "lower": 102, "upper": 114}
        ],
        "trend": "increasing",
        "seasonality_detected": True
    }


@app.get("/api/v1/incidents")
async def list_incidents(status: Optional[str] = None):
    """列出事件"""
    return {
        "total": 5,
        "incidents": [
            {
                "incident_id": "inc-001",
                "title": "Database Performance Degradation",
                "severity": "high",
                "status": "investigating",
                "created_at": "2024-01-01T10:00:00Z"
            }
        ]
    }


@app.post("/api/v1/incidents")
async def create_incident(title: str, severity: str, description: str):
    """创建事件"""
    return {
        "incident_id": "inc-002",
        "title": title,
        "severity": severity,
        "status": "open",
        "created_at": datetime.now().isoformat()
    }


@app.get("/api/v1/topology")
async def get_topology():
    """获取服务拓扑"""
    return {
        "nodes": [
            {"id": "api-gateway", "type": "service", "status": "healthy"},
            {"id": "user-service", "type": "service", "status": "healthy"},
            {"id": "database", "type": "database", "status": "degraded"}
        ],
        "edges": [
            {"source": "api-gateway", "target": "user-service", "type": "http"},
            {"source": "user-service", "target": "database", "type": "sql"}
        ]
    }


@app.get("/api/v1/cmdb/ci/{ci_id}")
async def get_ci(ci_id: str):
    """获取配置项"""
    return {
        "ci_id": ci_id,
        "ci_type": "service",
        "name": "user-service",
        "status": "active",
        "attributes": {
            "version": "1.2.0",
            "replicas": 3
        }
    }


@app.get("/api/v1/cmdb/ci/{ci_id}/impact")
async def get_impact_radius(ci_id: str, depth: int = 3):
    """获取影响半径"""
    return {
        "source_ci": ci_id,
        "depth": depth,
        "impacted_cis": [
            {"ci_id": "api-gateway", "impact_level": 1},
            {"ci_id": "frontend", "impact_level": 2}
        ]
    }


# ==================== WebSocket 实时推送 ====================

from fastapi import WebSocket, WebSocketDisconnect

class ConnectionManager:
    """WebSocket 连接管理器"""

    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: dict):
        for connection in self.active_connections:
            await connection.send_json(message)


manager = ConnectionManager()


@app.websocket("/ws/alerts")
async def websocket_alerts(websocket: WebSocket):
    """告警实时推送"""
    await manager.connect(websocket)
    try:
        while True:
            # 等待新告警并推送
            await asyncio.sleep(1)
            # 实际实现会从消息队列接收告警
    except WebSocketDisconnect:
        manager.disconnect(websocket)


@app.websocket("/ws/metrics/{metric_name}")
async def websocket_metrics(websocket: WebSocket, metric_name: str):
    """指标实时推送"""
    await websocket.accept()
    try:
        while True:
            # 实际实现会从时序数据库获取最新指标
            await websocket.send_json({
                "metric_name": metric_name,
                "timestamp": datetime.now().isoformat(),
                "value": 100
            })
            await asyncio.sleep(1)
    except WebSocketDisconnect:
        pass

5. 最佳实践

5.1 架构设计原则

# AIOps 架构设计原则

原则:
  可扩展性:
    - 微服务架构,各组件独立扩展
    - 数据处理支持水平扩展
    - 模型服务支持多实例部署

  可靠性:
    - 数据采集链路高可用
    - AI 引擎故障降级
    - 告警系统独立部署

  实时性:
    - 流式数据处理
    - 增量模型更新
    - 实时告警推送

  可观测性:
    - 平台自身监控
    - 模型性能追踪
    - 数据质量监控

  安全性:
    - 数据脱敏处理
    - 访问权限控制
    - 审计日志记录

5.2 技术选型建议

┌─────────────────────────────────────────────────────────────────┐
│                    AIOps 技术栈选型                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  数据采集:                                                       │
│  ├─ 指标: Prometheus / VictoriaMetrics / InfluxDB              │
│  ├─ 日志: Fluentd / Filebeat / Vector                          │
│  └─ 追踪: Jaeger / Zipkin / OpenTelemetry                      │
│                                                                 │
│  数据存储:                                                       │
│  ├─ 时序: InfluxDB / TimescaleDB / ClickHouse                  │
│  ├─ 日志: Elasticsearch / Loki / ClickHouse                    │
│  └─ 图数据: Neo4j / JanusGraph / Amazon Neptune                │
│                                                                 │
│  消息队列:                                                       │
│  ├─ 高吞吐: Kafka / Pulsar                                      │
│  └─ 轻量级: Redis Streams / RabbitMQ                           │
│                                                                 │
│  AI 框架:                                                        │
│  ├─ 传统 ML: scikit-learn / XGBoost / LightGBM                 │
│  ├─ 深度学习: PyTorch / TensorFlow                              │
│  └─ 时序分析: Prophet / statsmodels / PyOD                      │
│                                                                 │
│  可视化:                                                         │
│  ├─ 仪表盘: Grafana / Kibana                                    │
│  └─ 拓扑图: D3.js / ECharts / Cytoscape                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

总结

本章介绍了 AIOps 的核心概念和架构设计:

  1. AIOps 概念:将 AI 应用于 IT 运维的实践
  2. 成熟度模型:从手动运维到全面智能化的演进路径
  3. 架构设计:数据采集、处理、AI 引擎、应用层的分层架构
  4. 数据模型:统一的可观测性数据模型和 CMDB 集成
  5. 平台服务:RESTful API 和 WebSocket 实时推送

下一章将深入探讨异常检测算法,包括时序异常、日志异常和多维异常检测方法。

Prev
08-AIOps实践
Next
02-异常检测算法