HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

03-根因分析与告警聚合

概述

根因分析 (Root Cause Analysis, RCA) 是 AIOps 的核心能力,帮助运维人员快速定位故障根源。本章探讨拓扑分析、时序关联、因果推断等根因分析方法,以及智能告警聚合技术。

1. 根因分析方法论

1.1 根因分析框架

┌─────────────────────────────────────────────────────────────────┐
│                    根因分析框架                                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   数据源                                  │   │
│  │  告警数据 │ 指标数据 │ 日志数据 │ 追踪数据 │ 拓扑数据     │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │                                       │
│                         ▼                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   分析方法                                │   │
│  │                                                         │   │
│  │   拓扑分析        时序关联         因果推断              │   │
│  │   ├─ 依赖分析     ├─ 相关性分析    ├─ Granger因果        │   │
│  │   ├─ 传播分析     ├─ 滞后分析      ├─ PC算法            │   │
│  │   └─ 影响分析     └─ 模式匹配      └─ 结构方程          │   │
│  │                                                         │   │
│  │   知识图谱        机器学习         规则引擎              │   │
│  │   ├─ 实体关系     ├─ 分类模型      ├─ 专家规则          │   │
│  │   ├─ 推理引擎     ├─ 聚类分析      ├─ 决策树            │   │
│  │   └─ 路径搜索     └─ 深度学习      └─ 规则学习          │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │                                       │
│                         ▼                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   输出结果                                │   │
│  │  根因排名 │ 置信度评估 │ 证据链 │ 修复建议               │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 数据模型定义

"""
根因分析数据模型
"""

from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Set
from enum import Enum
from datetime import datetime
import uuid


class ResourceType(Enum):
    """资源类型"""
    HOST = "host"
    POD = "pod"
    SERVICE = "service"
    DATABASE = "database"
    NETWORK = "network"
    STORAGE = "storage"


class RelationType(Enum):
    """关系类型"""
    DEPENDS_ON = "depends_on"
    RUNS_ON = "runs_on"
    CONNECTS_TO = "connects_to"
    CONTAINS = "contains"
    CALLS = "calls"


@dataclass
class Resource:
    """资源"""
    resource_id: str
    resource_type: ResourceType
    name: str
    attributes: Dict[str, Any] = field(default_factory=dict)
    metrics: Dict[str, float] = field(default_factory=dict)


@dataclass
class Relationship:
    """资源关系"""
    source_id: str
    target_id: str
    relation_type: RelationType
    weight: float = 1.0
    attributes: Dict[str, Any] = field(default_factory=dict)


@dataclass
class Anomaly:
    """异常事件"""
    anomaly_id: str
    resource_id: str
    anomaly_type: str
    severity: str
    timestamp: float
    value: float
    expected_value: Optional[float] = None
    description: str = ""
    metrics: Dict[str, float] = field(default_factory=dict)


@dataclass
class RootCauseCandidate:
    """根因候选"""
    resource_id: str
    resource_name: str
    confidence: float
    rank: int
    evidence: List[str]
    impact_chain: List[str]
    related_anomalies: List[str]
    recommended_actions: List[str]


@dataclass
class RCAResult:
    """根因分析结果"""
    analysis_id: str
    timestamp: float
    trigger_anomaly: Anomaly
    root_causes: List[RootCauseCandidate]
    affected_resources: List[str]
    analysis_method: str
    execution_time_ms: float
    details: Dict[str, Any] = field(default_factory=dict)

2. 拓扑分析

2.1 服务依赖图

"""
服务依赖图分析
"""

import networkx as nx
from typing import Dict, List, Set, Tuple, Optional
from collections import defaultdict
import numpy as np


class ServiceDependencyGraph:
    """服务依赖图"""

    def __init__(self):
        self.graph = nx.DiGraph()
        self.resources: Dict[str, Resource] = {}
        self.anomalies: Dict[str, List[Anomaly]] = defaultdict(list)

    def add_resource(self, resource: Resource):
        """添加资源"""
        self.resources[resource.resource_id] = resource
        self.graph.add_node(
            resource.resource_id,
            name=resource.name,
            type=resource.resource_type.value,
            **resource.attributes
        )

    def add_relationship(self, relationship: Relationship):
        """添加关系"""
        self.graph.add_edge(
            relationship.source_id,
            relationship.target_id,
            relation_type=relationship.relation_type.value,
            weight=relationship.weight,
            **relationship.attributes
        )

    def add_anomaly(self, anomaly: Anomaly):
        """添加异常"""
        self.anomalies[anomaly.resource_id].append(anomaly)

    def get_upstream_dependencies(self, resource_id: str,
                                   max_depth: int = 5) -> List[str]:
        """获取上游依赖"""
        upstream = []
        visited = set()

        def dfs(node_id: str, depth: int):
            if depth > max_depth or node_id in visited:
                return
            visited.add(node_id)

            for predecessor in self.graph.predecessors(node_id):
                upstream.append(predecessor)
                dfs(predecessor, depth + 1)

        dfs(resource_id, 0)
        return upstream

    def get_downstream_impact(self, resource_id: str,
                               max_depth: int = 5) -> List[str]:
        """获取下游影响"""
        downstream = []
        visited = set()

        def dfs(node_id: str, depth: int):
            if depth > max_depth or node_id in visited:
                return
            visited.add(node_id)

            for successor in self.graph.successors(node_id):
                downstream.append(successor)
                dfs(successor, depth + 1)

        dfs(resource_id, 0)
        return downstream

    def find_common_ancestors(self, resource_ids: List[str]) -> List[str]:
        """查找公共祖先"""
        if not resource_ids:
            return []

        # 获取第一个资源的所有祖先
        ancestors_sets = []
        for rid in resource_ids:
            ancestors = set(nx.ancestors(self.graph, rid))
            ancestors_sets.append(ancestors)

        # 求交集
        common = ancestors_sets[0]
        for ancestors in ancestors_sets[1:]:
            common = common.intersection(ancestors)

        return list(common)

    def calculate_pagerank(self) -> Dict[str, float]:
        """计算 PageRank 中心性"""
        return nx.pagerank(self.graph)

    def calculate_betweenness(self) -> Dict[str, float]:
        """计算介数中心性"""
        return nx.betweenness_centrality(self.graph)


class TopologyBasedRCA:
    """基于拓扑的根因分析"""

    def __init__(self, graph: ServiceDependencyGraph):
        self.graph = graph

    def analyze(self, anomalies: List[Anomaly],
                time_window_seconds: float = 300) -> RCAResult:
        """
        分析根因

        Args:
            anomalies: 异常列表
            time_window_seconds: 时间窗口

        Returns:
            RCAResult: 分析结果
        """
        import time
        start_time = time.time()

        # 按时间排序
        sorted_anomalies = sorted(anomalies, key=lambda x: x.timestamp)

        # 获取涉及的资源
        affected_resources = list(set(a.resource_id for a in anomalies))

        # 计算每个资源的根因评分
        candidate_scores = {}

        for resource_id in affected_resources:
            score = self._calculate_root_cause_score(
                resource_id, sorted_anomalies, time_window_seconds
            )
            candidate_scores[resource_id] = score

        # 排序并生成候选
        sorted_candidates = sorted(
            candidate_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )

        root_causes = []
        for rank, (resource_id, score) in enumerate(sorted_candidates[:5], 1):
            resource = self.graph.resources.get(resource_id)
            if resource is None:
                continue

            # 收集证据
            evidence = self._collect_evidence(resource_id, sorted_anomalies)

            # 获取影响链
            impact_chain = self.graph.get_downstream_impact(resource_id)

            # 生成修复建议
            actions = self._generate_recommendations(resource_id, sorted_anomalies)

            root_causes.append(RootCauseCandidate(
                resource_id=resource_id,
                resource_name=resource.name,
                confidence=min(score, 1.0),
                rank=rank,
                evidence=evidence,
                impact_chain=impact_chain,
                related_anomalies=[a.anomaly_id for a in sorted_anomalies
                                  if a.resource_id == resource_id],
                recommended_actions=actions
            ))

        execution_time = (time.time() - start_time) * 1000

        return RCAResult(
            analysis_id=str(uuid.uuid4()),
            timestamp=time.time(),
            trigger_anomaly=sorted_anomalies[0] if sorted_anomalies else None,
            root_causes=root_causes,
            affected_resources=affected_resources,
            analysis_method="topology_based",
            execution_time_ms=execution_time,
            details={
                "total_anomalies": len(anomalies),
                "time_window": time_window_seconds,
                "candidate_scores": candidate_scores
            }
        )

    def _calculate_root_cause_score(self, resource_id: str,
                                     anomalies: List[Anomaly],
                                     time_window: float) -> float:
        """计算根因评分"""
        score = 0.0

        # 1. 时间优先级: 最早出现的异常更可能是根因
        resource_anomalies = [a for a in anomalies if a.resource_id == resource_id]
        if resource_anomalies:
            earliest = min(a.timestamp for a in resource_anomalies)
            all_earliest = min(a.timestamp for a in anomalies)

            time_score = 1.0 - (earliest - all_earliest) / time_window
            score += max(0, time_score) * 0.3

        # 2. 拓扑位置: 上游节点更可能是根因
        upstream = self.graph.get_upstream_dependencies(resource_id)
        downstream = self.graph.get_downstream_impact(resource_id)

        if len(downstream) > len(upstream):
            position_score = len(downstream) / (len(upstream) + len(downstream) + 1)
            score += position_score * 0.3

        # 3. 异常数量: 更多异常表示更严重的问题
        affected_downstream = sum(
            1 for d in downstream
            if any(a.resource_id == d for a in anomalies)
        )
        if downstream:
            propagation_score = affected_downstream / len(downstream)
            score += propagation_score * 0.2

        # 4. 中心性: 更中心的节点影响更大
        pagerank = self.graph.calculate_pagerank()
        centrality_score = pagerank.get(resource_id, 0)
        score += centrality_score * 0.2

        return score

    def _collect_evidence(self, resource_id: str,
                          anomalies: List[Anomaly]) -> List[str]:
        """收集证据"""
        evidence = []

        # 资源自身的异常
        resource_anomalies = [a for a in anomalies if a.resource_id == resource_id]
        if resource_anomalies:
            evidence.append(f"资源 {resource_id} 发现 {len(resource_anomalies)} 个异常")

        # 下游受影响
        downstream = self.graph.get_downstream_impact(resource_id)
        affected_downstream = [d for d in downstream
                              if any(a.resource_id == d for a in anomalies)]
        if affected_downstream:
            evidence.append(f"下游 {len(affected_downstream)} 个服务受影响")

        # 时间顺序
        if resource_anomalies:
            first_anomaly = min(resource_anomalies, key=lambda x: x.timestamp)
            evidence.append(f"首个异常发生于 {first_anomaly.timestamp}")

        return evidence

    def _generate_recommendations(self, resource_id: str,
                                   anomalies: List[Anomaly]) -> List[str]:
        """生成修复建议"""
        resource = self.graph.resources.get(resource_id)
        if resource is None:
            return []

        recommendations = []
        resource_type = resource.resource_type

        # 基于资源类型的建议
        if resource_type == ResourceType.DATABASE:
            recommendations.extend([
                "检查数据库连接池状态",
                "查看慢查询日志",
                "检查磁盘 I/O 和空间"
            ])
        elif resource_type == ResourceType.SERVICE:
            recommendations.extend([
                "检查服务日志中的错误",
                "查看 Pod 重启历史",
                "检查资源使用情况 (CPU/内存)"
            ])
        elif resource_type == ResourceType.NETWORK:
            recommendations.extend([
                "检查网络连通性",
                "查看 DNS 解析状态",
                "检查负载均衡器健康状态"
            ])

        # 基于异常类型的建议
        resource_anomalies = [a for a in anomalies if a.resource_id == resource_id]
        for anomaly in resource_anomalies:
            if "cpu" in anomaly.anomaly_type.lower():
                recommendations.append("考虑扩容或优化 CPU 密集型操作")
            elif "memory" in anomaly.anomaly_type.lower():
                recommendations.append("检查内存泄漏,考虑增加内存限制")
            elif "latency" in anomaly.anomaly_type.lower():
                recommendations.append("检查依赖服务响应时间和网络延迟")

        return list(set(recommendations))[:5]

2.2 故障传播分析

"""
故障传播分析
"""

import numpy as np
from typing import Dict, List, Tuple, Optional
from collections import defaultdict
import networkx as nx


class FaultPropagationAnalyzer:
    """故障传播分析器"""

    def __init__(self, graph: ServiceDependencyGraph):
        self.graph = graph

    def analyze_propagation(self, anomalies: List[Anomaly],
                            time_resolution: float = 60) -> Dict[str, Any]:
        """
        分析故障传播路径

        Args:
            anomalies: 异常列表
            time_resolution: 时间分辨率 (秒)

        Returns:
            Dict: 传播分析结果
        """
        # 按时间分组
        time_groups = self._group_by_time(anomalies, time_resolution)

        # 构建传播路径
        propagation_paths = []

        sorted_times = sorted(time_groups.keys())
        for i in range(len(sorted_times) - 1):
            current_time = sorted_times[i]
            next_time = sorted_times[i + 1]

            current_resources = {a.resource_id for a in time_groups[current_time]}
            next_resources = {a.resource_id for a in time_groups[next_time]}

            # 查找从当前组到下一组的传播路径
            for curr_r in current_resources:
                for next_r in next_resources:
                    if curr_r != next_r:
                        # 检查是否有依赖关系
                        if self.graph.graph.has_edge(curr_r, next_r):
                            propagation_paths.append({
                                "from": curr_r,
                                "to": next_r,
                                "from_time": current_time,
                                "to_time": next_time,
                                "delay": next_time - current_time
                            })

        # 计算传播概率
        propagation_probs = self._calculate_propagation_probability(propagation_paths)

        # 识别传播模式
        patterns = self._identify_propagation_patterns(propagation_paths)

        return {
            "propagation_paths": propagation_paths,
            "propagation_probabilities": propagation_probs,
            "patterns": patterns,
            "root_candidates": self._identify_propagation_roots(propagation_paths)
        }

    def _group_by_time(self, anomalies: List[Anomaly],
                       resolution: float) -> Dict[float, List[Anomaly]]:
        """按时间分组"""
        groups = defaultdict(list)

        for anomaly in anomalies:
            # 向下取整到时间分辨率
            time_bucket = int(anomaly.timestamp / resolution) * resolution
            groups[time_bucket].append(anomaly)

        return dict(groups)

    def _calculate_propagation_probability(self,
                                            paths: List[Dict]) -> Dict[Tuple[str, str], float]:
        """计算传播概率"""
        edge_counts = defaultdict(int)
        edge_totals = defaultdict(int)

        for path in paths:
            edge = (path["from"], path["to"])
            edge_counts[edge] += 1

        # 基于历史数据计算概率
        for edge in edge_counts:
            edge_totals[edge] = max(edge_counts[edge], 1)

        probs = {
            edge: count / edge_totals[edge]
            for edge, count in edge_counts.items()
        }

        return probs

    def _identify_propagation_patterns(self, paths: List[Dict]) -> List[Dict]:
        """识别传播模式"""
        patterns = []

        # 识别链式传播
        chains = self._find_chains(paths)
        for chain in chains:
            if len(chain) >= 3:
                patterns.append({
                    "type": "chain",
                    "path": chain,
                    "description": f"链式传播: {' -> '.join(chain)}"
                })

        # 识别扇出传播
        fanouts = self._find_fanouts(paths)
        for source, targets in fanouts.items():
            if len(targets) >= 2:
                patterns.append({
                    "type": "fanout",
                    "source": source,
                    "targets": list(targets),
                    "description": f"扇出传播: {source} 影响了 {len(targets)} 个下游服务"
                })

        return patterns

    def _find_chains(self, paths: List[Dict]) -> List[List[str]]:
        """查找链式传播"""
        # 构建临时图
        G = nx.DiGraph()
        for path in paths:
            G.add_edge(path["from"], path["to"])

        chains = []
        for node in G.nodes():
            if G.in_degree(node) == 0:  # 起始节点
                for path in nx.all_simple_paths(G, node, list(G.nodes())):
                    if len(path) >= 3:
                        chains.append(path)

        return chains

    def _find_fanouts(self, paths: List[Dict]) -> Dict[str, Set[str]]:
        """查找扇出模式"""
        fanouts = defaultdict(set)
        for path in paths:
            fanouts[path["from"]].add(path["to"])
        return dict(fanouts)

    def _identify_propagation_roots(self, paths: List[Dict]) -> List[str]:
        """识别传播根源"""
        incoming = defaultdict(int)
        outgoing = defaultdict(int)

        for path in paths:
            outgoing[path["from"]] += 1
            incoming[path["to"]] += 1

        # 只有出边没有入边的节点是根源
        roots = []
        for node in outgoing:
            if incoming[node] == 0:
                roots.append(node)

        return roots

    def simulate_propagation(self, root_resource: str,
                             propagation_prob: float = 0.7,
                             max_steps: int = 10) -> List[Dict]:
        """
        模拟故障传播

        Args:
            root_resource: 根源资源
            propagation_prob: 传播概率
            max_steps: 最大步数

        Returns:
            List: 传播模拟结果
        """
        simulation = []
        affected = {root_resource}
        current_frontier = {root_resource}

        for step in range(max_steps):
            next_frontier = set()

            for resource in current_frontier:
                # 获取下游依赖
                downstream = list(self.graph.graph.successors(resource))

                for ds in downstream:
                    if ds not in affected:
                        # 随机决定是否传播
                        if np.random.random() < propagation_prob:
                            affected.add(ds)
                            next_frontier.add(ds)
                            simulation.append({
                                "step": step + 1,
                                "from": resource,
                                "to": ds
                            })

            if not next_frontier:
                break

            current_frontier = next_frontier

        return simulation

3. 时序关联分析

3.1 相关性分析

"""
时序相关性分析
"""

import numpy as np
from typing import Dict, List, Tuple, Optional
from scipy import stats
from scipy.signal import correlate
import pandas as pd


class TimeSeriesCorrelationAnalyzer:
    """时序相关性分析器"""

    def __init__(self, max_lag: int = 60):
        """
        初始化分析器

        Args:
            max_lag: 最大滞后时间 (数据点数)
        """
        self.max_lag = max_lag

    def pearson_correlation(self, series1: np.ndarray,
                            series2: np.ndarray) -> Tuple[float, float]:
        """
        皮尔逊相关系数

        Returns:
            (相关系数, p值)
        """
        if len(series1) != len(series2):
            raise ValueError("序列长度必须相同")

        corr, pvalue = stats.pearsonr(series1, series2)
        return float(corr), float(pvalue)

    def cross_correlation(self, series1: np.ndarray,
                          series2: np.ndarray) -> Dict[str, Any]:
        """
        互相关分析

        Args:
            series1: 时间序列 1
            series2: 时间序列 2

        Returns:
            Dict: 互相关结果
        """
        # 标准化
        s1 = (series1 - np.mean(series1)) / (np.std(series1) + 1e-10)
        s2 = (series2 - np.mean(series2)) / (np.std(series2) + 1e-10)

        # 计算互相关
        correlation = correlate(s1, s2, mode='full')
        correlation = correlation / len(s1)

        # 滞后范围
        lags = np.arange(-len(s1) + 1, len(s1))

        # 限制在 max_lag 范围内
        mask = np.abs(lags) <= self.max_lag
        lags = lags[mask]
        correlation = correlation[mask]

        # 找到最大相关性及其滞后
        max_idx = np.argmax(np.abs(correlation))
        max_corr = correlation[max_idx]
        max_lag = lags[max_idx]

        return {
            "lags": lags.tolist(),
            "correlations": correlation.tolist(),
            "max_correlation": float(max_corr),
            "optimal_lag": int(max_lag),
            "is_significant": abs(max_corr) > 0.5
        }

    def dtw_distance(self, series1: np.ndarray,
                     series2: np.ndarray) -> float:
        """
        动态时间规整 (DTW) 距离

        Args:
            series1: 时间序列 1
            series2: 时间序列 2

        Returns:
            float: DTW 距离
        """
        n, m = len(series1), len(series2)

        # 创建代价矩阵
        dtw_matrix = np.full((n + 1, m + 1), np.inf)
        dtw_matrix[0, 0] = 0

        # 填充矩阵
        for i in range(1, n + 1):
            for j in range(1, m + 1):
                cost = abs(series1[i-1] - series2[j-1])
                dtw_matrix[i, j] = cost + min(
                    dtw_matrix[i-1, j],    # 插入
                    dtw_matrix[i, j-1],    # 删除
                    dtw_matrix[i-1, j-1]   # 匹配
                )

        return float(dtw_matrix[n, m])

    def find_correlated_metrics(self, target_metric: np.ndarray,
                                 candidate_metrics: Dict[str, np.ndarray],
                                 threshold: float = 0.5) -> List[Dict]:
        """
        查找相关指标

        Args:
            target_metric: 目标指标
            candidate_metrics: 候选指标字典
            threshold: 相关性阈值

        Returns:
            List: 相关指标列表
        """
        results = []

        for name, series in candidate_metrics.items():
            if len(series) != len(target_metric):
                continue

            # 计算互相关
            corr_result = self.cross_correlation(target_metric, series)

            if abs(corr_result["max_correlation"]) >= threshold:
                results.append({
                    "metric_name": name,
                    "correlation": corr_result["max_correlation"],
                    "lag": corr_result["optimal_lag"],
                    "is_leading": corr_result["optimal_lag"] < 0
                })

        # 按相关性排序
        results.sort(key=lambda x: abs(x["correlation"]), reverse=True)

        return results


class TemporalRCA:
    """基于时序的根因分析"""

    def __init__(self, correlation_analyzer: TimeSeriesCorrelationAnalyzer):
        self.analyzer = correlation_analyzer

    def analyze(self, target_anomaly: Anomaly,
                metric_data: Dict[str, Dict[str, np.ndarray]],
                time_window: int = 300) -> RCAResult:
        """
        时序根因分析

        Args:
            target_anomaly: 目标异常
            metric_data: 指标数据 {resource_id: {metric_name: values}}
            time_window: 时间窗口

        Returns:
            RCAResult: 分析结果
        """
        import time
        start_time = time.time()

        # 获取目标指标
        target_resource = target_anomaly.resource_id
        target_metrics = metric_data.get(target_resource, {})

        if not target_metrics:
            return self._empty_result(target_anomaly)

        # 选择目标指标 (使用第一个作为参考)
        target_metric_name = list(target_metrics.keys())[0]
        target_series = target_metrics[target_metric_name]

        # 分析所有其他资源的指标
        correlations = []

        for resource_id, metrics in metric_data.items():
            if resource_id == target_resource:
                continue

            for metric_name, series in metrics.items():
                if len(series) != len(target_series):
                    continue

                corr_result = self.analyzer.cross_correlation(target_series, series)

                if corr_result["is_significant"]:
                    correlations.append({
                        "resource_id": resource_id,
                        "metric_name": metric_name,
                        "correlation": corr_result["max_correlation"],
                        "lag": corr_result["optimal_lag"]
                    })

        # 按相关性和滞后排序 (优先负滞后,表示领先)
        correlations.sort(key=lambda x: (-abs(x["correlation"]),
                                         x["lag"] if x["lag"] < 0 else float('inf')))

        # 生成根因候选
        root_causes = []
        seen_resources = set()

        for rank, corr in enumerate(correlations[:10], 1):
            if corr["resource_id"] in seen_resources:
                continue
            seen_resources.add(corr["resource_id"])

            # 领先指标更可能是根因
            is_leading = corr["lag"] < 0
            confidence = abs(corr["correlation"])
            if is_leading:
                confidence *= 1.2  # 提升领先指标的置信度

            root_causes.append(RootCauseCandidate(
                resource_id=corr["resource_id"],
                resource_name=corr["resource_id"],  # 需要从外部获取名称
                confidence=min(confidence, 1.0),
                rank=len(root_causes) + 1,
                evidence=[
                    f"与目标指标相关性: {corr['correlation']:.3f}",
                    f"时间滞后: {corr['lag']} 个数据点",
                    "指标变化领先于目标" if is_leading else "指标变化滞后于目标"
                ],
                impact_chain=[],
                related_anomalies=[],
                recommended_actions=self._generate_recommendations_from_correlation(corr)
            ))

            if len(root_causes) >= 5:
                break

        execution_time = (time.time() - start_time) * 1000

        return RCAResult(
            analysis_id=str(uuid.uuid4()),
            timestamp=time.time(),
            trigger_anomaly=target_anomaly,
            root_causes=root_causes,
            affected_resources=list(seen_resources),
            analysis_method="temporal_correlation",
            execution_time_ms=execution_time,
            details={
                "correlations_found": len(correlations),
                "target_metric": target_metric_name
            }
        )

    def _empty_result(self, anomaly: Anomaly) -> RCAResult:
        """返回空结果"""
        return RCAResult(
            analysis_id=str(uuid.uuid4()),
            timestamp=time.time(),
            trigger_anomaly=anomaly,
            root_causes=[],
            affected_resources=[],
            analysis_method="temporal_correlation",
            execution_time_ms=0,
            details={"error": "无可用指标数据"}
        )

    def _generate_recommendations_from_correlation(self, corr: Dict) -> List[str]:
        """基于相关性生成建议"""
        recommendations = []

        if "cpu" in corr["metric_name"].lower():
            recommendations.append("检查 CPU 使用率和进程状态")
        elif "memory" in corr["metric_name"].lower():
            recommendations.append("检查内存使用和潜在泄漏")
        elif "latency" in corr["metric_name"].lower():
            recommendations.append("检查网络延迟和服务响应时间")
        elif "error" in corr["metric_name"].lower():
            recommendations.append("检查错误日志和异常堆栈")

        if corr["lag"] < 0:
            recommendations.append("该资源的问题可能先于目标资源出现,建议优先检查")

        return recommendations

3.2 因果推断

"""
因果推断分析
"""

import numpy as np
from typing import Dict, List, Tuple, Optional
from statsmodels.tsa.stattools import grangercausalitytests
import warnings


class GrangerCausalityAnalyzer:
    """Granger 因果分析"""

    def __init__(self, max_lag: int = 10):
        """
        初始化分析器

        Args:
            max_lag: 最大滞后阶数
        """
        self.max_lag = max_lag

    def test_causality(self, cause: np.ndarray,
                       effect: np.ndarray,
                       significance_level: float = 0.05) -> Dict[str, Any]:
        """
        测试因果关系

        Args:
            cause: 原因序列
            effect: 结果序列
            significance_level: 显著性水平

        Returns:
            Dict: 因果测试结果
        """
        if len(cause) != len(effect):
            raise ValueError("序列长度必须相同")

        # 组合数据
        data = np.column_stack([effect, cause])

        # 执行 Granger 因果检验
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            try:
                results = grangercausalitytests(data, maxlag=self.max_lag, verbose=False)
            except Exception as e:
                return {
                    "is_causal": False,
                    "error": str(e)
                }

        # 提取结果
        best_lag = None
        best_pvalue = 1.0

        lag_results = []
        for lag in range(1, self.max_lag + 1):
            if lag not in results:
                continue

            # 使用 F-test 结果
            test_result = results[lag][0]
            f_test = test_result['ssr_ftest']
            pvalue = f_test[1]

            lag_results.append({
                "lag": lag,
                "f_statistic": f_test[0],
                "p_value": pvalue,
                "is_significant": pvalue < significance_level
            })

            if pvalue < best_pvalue:
                best_pvalue = pvalue
                best_lag = lag

        return {
            "is_causal": best_pvalue < significance_level,
            "best_lag": best_lag,
            "best_pvalue": best_pvalue,
            "lag_results": lag_results
        }

    def build_causal_graph(self, time_series: Dict[str, np.ndarray],
                           significance_level: float = 0.05) -> Dict[str, Any]:
        """
        构建因果图

        Args:
            time_series: 时序数据字典
            significance_level: 显著性水平

        Returns:
            Dict: 因果图
        """
        names = list(time_series.keys())
        n = len(names)

        edges = []
        adjacency_matrix = np.zeros((n, n))

        for i, name1 in enumerate(names):
            for j, name2 in enumerate(names):
                if i == j:
                    continue

                result = self.test_causality(
                    time_series[name1],
                    time_series[name2],
                    significance_level
                )

                if result.get("is_causal", False):
                    edges.append({
                        "from": name1,
                        "to": name2,
                        "lag": result["best_lag"],
                        "p_value": result["best_pvalue"]
                    })
                    adjacency_matrix[i, j] = 1

        return {
            "nodes": names,
            "edges": edges,
            "adjacency_matrix": adjacency_matrix.tolist()
        }


class PCAlgorithm:
    """PC 因果发现算法"""

    def __init__(self, significance_level: float = 0.05):
        """
        初始化 PC 算法

        Args:
            significance_level: 显著性水平
        """
        self.significance_level = significance_level

    def discover_causal_structure(self,
                                   data: np.ndarray,
                                   variable_names: List[str]) -> Dict[str, Any]:
        """
        发现因果结构

        Args:
            data: 数据矩阵 (样本 x 变量)
            variable_names: 变量名称

        Returns:
            Dict: 因果结构
        """
        n_vars = data.shape[1]

        # 步骤 1: 初始化完全图
        adjacency = np.ones((n_vars, n_vars)) - np.eye(n_vars)

        # 步骤 2: 移除条件独立的边
        for cond_size in range(n_vars):
            for i in range(n_vars):
                neighbors = np.where(adjacency[i] == 1)[0]

                for j in neighbors:
                    if adjacency[i, j] == 0:
                        continue

                    # 获取可能的条件集
                    other_neighbors = [n for n in neighbors if n != j]

                    if len(other_neighbors) >= cond_size:
                        # 测试条件独立性
                        from itertools import combinations
                        for cond_set in combinations(other_neighbors, cond_size):
                            if self._conditional_independence_test(
                                data, i, j, list(cond_set)
                            ):
                                adjacency[i, j] = 0
                                adjacency[j, i] = 0
                                break

        # 步骤 3: 定向边 (简化版)
        directed_edges = self._orient_edges(adjacency, data)

        return {
            "nodes": variable_names,
            "undirected_adjacency": adjacency.tolist(),
            "directed_edges": directed_edges,
            "causal_order": self._topological_sort(directed_edges, variable_names)
        }

    def _conditional_independence_test(self, data: np.ndarray,
                                        i: int, j: int,
                                        cond_set: List[int]) -> bool:
        """条件独立性检验"""
        from scipy import stats

        if len(cond_set) == 0:
            # 无条件相关性检验
            corr, pvalue = stats.pearsonr(data[:, i], data[:, j])
        else:
            # 偏相关检验
            corr = self._partial_correlation(data, i, j, cond_set)
            n = len(data)
            k = len(cond_set)

            # Fisher's z 变换
            z = 0.5 * np.log((1 + corr) / (1 - corr + 1e-10))
            se = 1 / np.sqrt(n - k - 3)
            z_stat = abs(z) / se
            pvalue = 2 * (1 - stats.norm.cdf(z_stat))

        return pvalue > self.significance_level

    def _partial_correlation(self, data: np.ndarray,
                              i: int, j: int,
                              cond_set: List[int]) -> float:
        """计算偏相关"""
        # 使用回归残差计算偏相关
        from sklearn.linear_model import LinearRegression

        X_cond = data[:, cond_set]

        # 回归 i 对条件集
        model_i = LinearRegression().fit(X_cond, data[:, i])
        residual_i = data[:, i] - model_i.predict(X_cond)

        # 回归 j 对条件集
        model_j = LinearRegression().fit(X_cond, data[:, j])
        residual_j = data[:, j] - model_j.predict(X_cond)

        # 计算残差相关性
        corr = np.corrcoef(residual_i, residual_j)[0, 1]
        return corr

    def _orient_edges(self, adjacency: np.ndarray,
                      data: np.ndarray) -> List[Dict]:
        """定向边"""
        n = len(adjacency)
        directed = []

        for i in range(n):
            for j in range(i + 1, n):
                if adjacency[i, j] == 1:
                    # 简单规则: 根据时间滞后相关性定向
                    lag_corr_ij = self._lagged_correlation(data[:, i], data[:, j])
                    lag_corr_ji = self._lagged_correlation(data[:, j], data[:, i])

                    if lag_corr_ij > lag_corr_ji:
                        directed.append({"from": i, "to": j})
                    else:
                        directed.append({"from": j, "to": i})

        return directed

    def _lagged_correlation(self, x: np.ndarray, y: np.ndarray,
                            lag: int = 1) -> float:
        """滞后相关性"""
        if lag > 0:
            return np.corrcoef(x[:-lag], y[lag:])[0, 1]
        else:
            return np.corrcoef(x, y)[0, 1]

    def _topological_sort(self, edges: List[Dict],
                          names: List[str]) -> List[str]:
        """拓扑排序"""
        import networkx as nx

        G = nx.DiGraph()
        G.add_nodes_from(range(len(names)))

        for edge in edges:
            G.add_edge(edge["from"], edge["to"])

        try:
            order = list(nx.topological_sort(G))
            return [names[i] for i in order]
        except nx.NetworkXUnfeasible:
            return names  # 存在环

4. 告警聚合

4.1 告警压缩与去重

"""
告警聚合与压缩
"""

from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Callable
from collections import defaultdict
import time
import hashlib


@dataclass
class Alert:
    """告警"""
    alert_id: str
    name: str
    resource: str
    severity: str
    timestamp: float
    message: str
    labels: Dict[str, str] = field(default_factory=dict)
    fingerprint: Optional[str] = None

    def __post_init__(self):
        if self.fingerprint is None:
            self.fingerprint = self._compute_fingerprint()

    def _compute_fingerprint(self) -> str:
        """计算告警指纹"""
        key = f"{self.name}:{self.resource}:{sorted(self.labels.items())}"
        return hashlib.md5(key.encode()).hexdigest()


@dataclass
class AlertGroup:
    """告警组"""
    group_id: str
    fingerprint: str
    alerts: List[Alert]
    first_timestamp: float
    last_timestamp: float
    count: int
    severity: str
    resource: str
    name: str
    status: str = "firing"  # firing, resolved

    def add_alert(self, alert: Alert):
        """添加告警"""
        self.alerts.append(alert)
        self.count += 1
        self.last_timestamp = max(self.last_timestamp, alert.timestamp)
        # 更新严重程度为最高级别
        severity_order = {"critical": 4, "high": 3, "medium": 2, "low": 1, "info": 0}
        if severity_order.get(alert.severity, 0) > severity_order.get(self.severity, 0):
            self.severity = alert.severity


class AlertDeduplicator:
    """告警去重器"""

    def __init__(self, dedup_window: float = 300):
        """
        初始化去重器

        Args:
            dedup_window: 去重时间窗口 (秒)
        """
        self.dedup_window = dedup_window
        self.alert_groups: Dict[str, AlertGroup] = {}
        self.fingerprint_map: Dict[str, str] = {}  # fingerprint -> group_id

    def process(self, alert: Alert) -> Optional[AlertGroup]:
        """
        处理告警

        Args:
            alert: 告警

        Returns:
            AlertGroup: 如果是新组或更新了现有组则返回
        """
        fingerprint = alert.fingerprint

        # 检查是否存在相同指纹的组
        if fingerprint in self.fingerprint_map:
            group_id = self.fingerprint_map[fingerprint]
            group = self.alert_groups.get(group_id)

            if group:
                # 检查是否在时间窗口内
                if alert.timestamp - group.last_timestamp <= self.dedup_window:
                    group.add_alert(alert)
                    return group

        # 创建新组
        group = AlertGroup(
            group_id=f"group-{int(time.time())}-{len(self.alert_groups)}",
            fingerprint=fingerprint,
            alerts=[alert],
            first_timestamp=alert.timestamp,
            last_timestamp=alert.timestamp,
            count=1,
            severity=alert.severity,
            resource=alert.resource,
            name=alert.name
        )

        self.alert_groups[group.group_id] = group
        self.fingerprint_map[fingerprint] = group.group_id

        return group

    def get_active_groups(self, max_age: float = 3600) -> List[AlertGroup]:
        """获取活跃告警组"""
        now = time.time()
        active = []

        for group in self.alert_groups.values():
            if group.status == "firing" and now - group.last_timestamp <= max_age:
                active.append(group)

        return active


class AlertCorrelator:
    """告警关联器"""

    def __init__(self, time_window: float = 300,
                 correlation_rules: Optional[List[Dict]] = None):
        """
        初始化关联器

        Args:
            time_window: 关联时间窗口 (秒)
            correlation_rules: 关联规则
        """
        self.time_window = time_window
        self.correlation_rules = correlation_rules or []
        self.incidents: Dict[str, 'Incident'] = {}

    def add_rule(self, rule: Dict):
        """添加关联规则"""
        self.correlation_rules.append(rule)

    def correlate(self, alert_groups: List[AlertGroup]) -> List['Incident']:
        """
        关联告警组

        Args:
            alert_groups: 告警组列表

        Returns:
            List[Incident]: 事件列表
        """
        # 按时间排序
        sorted_groups = sorted(alert_groups, key=lambda x: x.first_timestamp)

        # 应用规则进行关联
        incidents = []

        for rule in self.correlation_rules:
            matched_groups = self._match_rule(rule, sorted_groups)

            if matched_groups:
                incident = self._create_incident(rule, matched_groups)
                incidents.append(incident)

        # 基于时间和资源的默认关联
        if not incidents:
            incidents = self._default_correlation(sorted_groups)

        return incidents

    def _match_rule(self, rule: Dict,
                    groups: List[AlertGroup]) -> List[AlertGroup]:
        """匹配规则"""
        matched = []

        for group in groups:
            # 检查名称匹配
            if "alert_names" in rule:
                if group.name not in rule["alert_names"]:
                    continue

            # 检查资源匹配
            if "resource_pattern" in rule:
                import re
                if not re.match(rule["resource_pattern"], group.resource):
                    continue

            # 检查标签匹配
            if "labels" in rule:
                label_match = all(
                    group.alerts[0].labels.get(k) == v
                    for k, v in rule["labels"].items()
                )
                if not label_match:
                    continue

            matched.append(group)

        return matched

    def _create_incident(self, rule: Dict,
                         groups: List[AlertGroup]) -> 'Incident':
        """创建事件"""
        return Incident(
            incident_id=f"inc-{int(time.time())}",
            title=rule.get("incident_title", "关联告警事件"),
            severity=max(g.severity for g in groups),
            alert_groups=groups,
            rule_name=rule.get("name", "unknown"),
            created_at=time.time()
        )

    def _default_correlation(self, groups: List[AlertGroup]) -> List['Incident']:
        """默认关联策略"""
        incidents = []

        # 按资源前缀分组
        resource_groups = defaultdict(list)
        for group in groups:
            # 提取资源前缀
            prefix = group.resource.split("-")[0] if "-" in group.resource else group.resource
            resource_groups[prefix].append(group)

        # 为每个资源组创建事件
        for prefix, related_groups in resource_groups.items():
            if len(related_groups) > 1:
                # 检查时间窗口
                times = [g.first_timestamp for g in related_groups]
                if max(times) - min(times) <= self.time_window:
                    incidents.append(Incident(
                        incident_id=f"inc-{prefix}-{int(time.time())}",
                        title=f"{prefix} 相关服务告警",
                        severity=max(g.severity for g in related_groups),
                        alert_groups=related_groups,
                        rule_name="default_resource_correlation",
                        created_at=time.time()
                    ))

        return incidents


@dataclass
class Incident:
    """事件"""
    incident_id: str
    title: str
    severity: str
    alert_groups: List[AlertGroup]
    rule_name: str
    created_at: float
    status: str = "open"
    root_cause: Optional[str] = None
    resolution: Optional[str] = None

    @property
    def total_alerts(self) -> int:
        return sum(g.count for g in self.alert_groups)

    @property
    def affected_resources(self) -> Set[str]:
        return {g.resource for g in self.alert_groups}

4.2 智能告警聚合

"""
智能告警聚合 - 基于机器学习
"""

import numpy as np
from typing import List, Dict, Tuple
from sklearn.cluster import DBSCAN
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import torch
import torch.nn as nn


class AlertEmbedding:
    """告警嵌入"""

    def __init__(self, embedding_dim: int = 64):
        """
        初始化告警嵌入

        Args:
            embedding_dim: 嵌入维度
        """
        self.embedding_dim = embedding_dim
        self.tfidf = TfidfVectorizer(max_features=1000)
        self.resource_encoder: Dict[str, int] = {}
        self.name_encoder: Dict[str, int] = {}

    def fit(self, alerts: List[Alert]):
        """拟合嵌入模型"""
        # 拟合 TF-IDF
        messages = [alert.message for alert in alerts]
        self.tfidf.fit(messages)

        # 构建编码器
        for alert in alerts:
            if alert.resource not in self.resource_encoder:
                self.resource_encoder[alert.resource] = len(self.resource_encoder)
            if alert.name not in self.name_encoder:
                self.name_encoder[alert.name] = len(self.name_encoder)

    def embed(self, alert: Alert) -> np.ndarray:
        """嵌入单个告警"""
        # 消息嵌入
        message_vec = self.tfidf.transform([alert.message]).toarray()[0]

        # 资源嵌入 (one-hot 简化)
        resource_id = self.resource_encoder.get(alert.resource, 0)
        resource_vec = np.zeros(len(self.resource_encoder))
        if resource_id < len(resource_vec):
            resource_vec[resource_id] = 1

        # 名称嵌入
        name_id = self.name_encoder.get(alert.name, 0)
        name_vec = np.zeros(len(self.name_encoder))
        if name_id < len(name_vec):
            name_vec[name_id] = 1

        # 时间特征
        time_features = np.array([
            alert.timestamp % 86400,  # 一天内的时间
            alert.timestamp % 3600,   # 一小时内的时间
        ])

        # 组合所有特征
        combined = np.concatenate([
            message_vec[:100],  # 限制消息向量长度
            resource_vec[:50],
            name_vec[:50],
            time_features
        ])

        return combined

    def embed_batch(self, alerts: List[Alert]) -> np.ndarray:
        """批量嵌入"""
        return np.array([self.embed(alert) for alert in alerts])


class ClusteringAlertAggregator:
    """基于聚类的告警聚合"""

    def __init__(self, eps: float = 0.3, min_samples: int = 2):
        """
        初始化聚合器

        Args:
            eps: DBSCAN epsilon 参数
            min_samples: 最小样本数
        """
        self.eps = eps
        self.min_samples = min_samples
        self.embedding = AlertEmbedding()

    def fit(self, historical_alerts: List[Alert]):
        """训练嵌入模型"""
        self.embedding.fit(historical_alerts)

    def aggregate(self, alerts: List[Alert]) -> List[List[Alert]]:
        """
        聚合告警

        Args:
            alerts: 告警列表

        Returns:
            List[List[Alert]]: 聚类后的告警组
        """
        if len(alerts) < 2:
            return [alerts] if alerts else []

        # 嵌入告警
        embeddings = self.embedding.embed_batch(alerts)

        # 标准化
        embeddings = (embeddings - embeddings.mean(axis=0)) / (embeddings.std(axis=0) + 1e-10)

        # DBSCAN 聚类
        clustering = DBSCAN(eps=self.eps, min_samples=self.min_samples, metric='cosine')
        labels = clustering.fit_predict(embeddings)

        # 分组
        groups = defaultdict(list)
        for i, label in enumerate(labels):
            groups[label].append(alerts[i])

        # 噪声点 (-1) 单独成组
        result = []
        for label, group_alerts in groups.items():
            if label == -1:
                # 每个噪声点单独成组
                for alert in group_alerts:
                    result.append([alert])
            else:
                result.append(group_alerts)

        return result


class GraphAttentionAggregator(nn.Module):
    """基于图注意力网络的告警聚合"""

    def __init__(self, input_dim: int, hidden_dim: int = 64, num_heads: int = 4):
        super().__init__()

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads

        # 特征变换
        self.W = nn.Linear(input_dim, hidden_dim * num_heads)

        # 注意力参数
        self.a = nn.Parameter(torch.zeros(size=(2 * hidden_dim, 1)))
        nn.init.xavier_uniform_(self.a.data)

        # 输出变换
        self.output = nn.Linear(hidden_dim * num_heads, hidden_dim)

    def forward(self, x: torch.Tensor, adj: torch.Tensor) -> torch.Tensor:
        """
        前向传播

        Args:
            x: 节点特征 (N, input_dim)
            adj: 邻接矩阵 (N, N)

        Returns:
            torch.Tensor: 聚合后的特征 (N, hidden_dim)
        """
        N = x.size(0)

        # 特征变换
        h = self.W(x)  # (N, hidden_dim * num_heads)
        h = h.view(N, self.num_heads, self.hidden_dim)  # (N, num_heads, hidden_dim)

        # 计算注意力
        attention_scores = []
        for head in range(self.num_heads):
            h_head = h[:, head, :]  # (N, hidden_dim)

            # 计算成对注意力
            a_input = torch.cat([
                h_head.repeat(1, N).view(N * N, -1),
                h_head.repeat(N, 1)
            ], dim=1).view(N, N, 2 * self.hidden_dim)

            e = torch.matmul(a_input, self.a).squeeze(-1)  # (N, N)
            e = torch.nn.functional.leaky_relu(e)

            # 应用邻接矩阵掩码
            e = e.masked_fill(adj == 0, float('-inf'))

            attention = torch.softmax(e, dim=1)
            attention_scores.append(attention)

        # 聚合
        h_prime = []
        for head in range(self.num_heads):
            h_head = h[:, head, :]
            h_agg = torch.matmul(attention_scores[head], h_head)
            h_prime.append(h_agg)

        h_prime = torch.cat(h_prime, dim=1)  # (N, hidden_dim * num_heads)
        output = self.output(h_prime)  # (N, hidden_dim)

        return output


class GATAlertAggregator:
    """基于 GAT 的告警聚合"""

    def __init__(self, input_dim: int = 128,
                 similarity_threshold: float = 0.5):
        """
        初始化聚合器

        Args:
            input_dim: 输入维度
            similarity_threshold: 相似度阈值
        """
        self.input_dim = input_dim
        self.similarity_threshold = similarity_threshold
        self.embedding = AlertEmbedding(input_dim)
        self.gat = GraphAttentionAggregator(input_dim)

    def fit(self, alerts: List[Alert]):
        """训练模型"""
        self.embedding.fit(alerts)

    def aggregate(self, alerts: List[Alert]) -> List[List[Alert]]:
        """聚合告警"""
        if len(alerts) < 2:
            return [alerts] if alerts else []

        # 嵌入
        embeddings = self.embedding.embed_batch(alerts)
        x = torch.FloatTensor(embeddings)

        # 构建邻接矩阵 (基于相似度)
        similarity = cosine_similarity(embeddings)
        adj = (similarity > self.similarity_threshold).astype(float)
        adj = torch.FloatTensor(adj)

        # GAT 聚合
        self.gat.eval()
        with torch.no_grad():
            h = self.gat(x, adj)

        # 基于聚合后的表示进行聚类
        clustering = DBSCAN(eps=0.3, min_samples=2, metric='cosine')
        labels = clustering.fit_predict(h.numpy())

        # 分组
        groups = defaultdict(list)
        for i, label in enumerate(labels):
            groups[label].append(alerts[i])

        return list(groups.values())

5. 综合根因分析系统

5.1 系统架构

"""
综合根因分析系统
"""

from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import asyncio


@dataclass
class RCASystemConfig:
    """系统配置"""
    enable_topology_rca: bool = True
    enable_temporal_rca: bool = True
    enable_causal_rca: bool = False
    alert_dedup_window: float = 300
    correlation_time_window: float = 300
    max_candidates: int = 5


class IntegratedRCASystem:
    """综合根因分析系统"""

    def __init__(self, config: RCASystemConfig):
        """
        初始化系统

        Args:
            config: 系统配置
        """
        self.config = config

        # 初始化组件
        self.dependency_graph = ServiceDependencyGraph()
        self.alert_deduplicator = AlertDeduplicator(config.alert_dedup_window)
        self.alert_correlator = AlertCorrelator(config.correlation_time_window)

        # 分析器
        if config.enable_topology_rca:
            self.topology_rca = TopologyBasedRCA(self.dependency_graph)
        if config.enable_temporal_rca:
            self.correlation_analyzer = TimeSeriesCorrelationAnalyzer()
            self.temporal_rca = TemporalRCA(self.correlation_analyzer)
        if config.enable_causal_rca:
            self.causal_analyzer = GrangerCausalityAnalyzer()

    def load_topology(self, resources: List[Resource],
                      relationships: List[Relationship]):
        """加载拓扑"""
        for resource in resources:
            self.dependency_graph.add_resource(resource)
        for relationship in relationships:
            self.dependency_graph.add_relationship(relationship)

    async def process_alert(self, alert: Alert) -> Optional[AlertGroup]:
        """处理单个告警"""
        return self.alert_deduplicator.process(alert)

    async def analyze(self, anomalies: List[Anomaly],
                      metric_data: Optional[Dict[str, Dict[str, np.ndarray]]] = None) -> Dict[str, Any]:
        """
        执行综合分析

        Args:
            anomalies: 异常列表
            metric_data: 指标数据 (用于时序分析)

        Returns:
            Dict: 综合分析结果
        """
        results = {
            "timestamp": time.time(),
            "total_anomalies": len(anomalies),
            "analyses": {}
        }

        # 并行执行多种分析
        tasks = []

        if self.config.enable_topology_rca:
            tasks.append(self._run_topology_analysis(anomalies))

        if self.config.enable_temporal_rca and metric_data:
            tasks.append(self._run_temporal_analysis(anomalies, metric_data))

        if self.config.enable_causal_rca and metric_data:
            tasks.append(self._run_causal_analysis(metric_data))

        analysis_results = await asyncio.gather(*tasks, return_exceptions=True)

        for result in analysis_results:
            if isinstance(result, Exception):
                continue
            if isinstance(result, dict):
                results["analyses"].update(result)

        # 融合结果
        results["fused_root_causes"] = self._fuse_results(results["analyses"])

        return results

    async def _run_topology_analysis(self, anomalies: List[Anomaly]) -> Dict[str, Any]:
        """运行拓扑分析"""
        result = self.topology_rca.analyze(anomalies)
        return {"topology": result}

    async def _run_temporal_analysis(self, anomalies: List[Anomaly],
                                      metric_data: Dict) -> Dict[str, Any]:
        """运行时序分析"""
        if not anomalies:
            return {}

        result = self.temporal_rca.analyze(anomalies[0], metric_data)
        return {"temporal": result}

    async def _run_causal_analysis(self, metric_data: Dict) -> Dict[str, Any]:
        """运行因果分析"""
        # 提取所有时序
        all_series = {}
        for resource_id, metrics in metric_data.items():
            for metric_name, values in metrics.items():
                key = f"{resource_id}:{metric_name}"
                all_series[key] = values

        if len(all_series) < 2:
            return {}

        causal_graph = self.causal_analyzer.build_causal_graph(all_series)
        return {"causal": causal_graph}

    def _fuse_results(self, analyses: Dict[str, Any]) -> List[RootCauseCandidate]:
        """融合多种分析结果"""
        candidate_scores = defaultdict(lambda: {"score": 0, "evidence": [], "sources": []})

        # 从拓扑分析提取候选
        if "topology" in analyses:
            topo_result = analyses["topology"]
            if hasattr(topo_result, 'root_causes'):
                for rc in topo_result.root_causes:
                    candidate_scores[rc.resource_id]["score"] += rc.confidence * 0.4
                    candidate_scores[rc.resource_id]["evidence"].extend(rc.evidence)
                    candidate_scores[rc.resource_id]["sources"].append("topology")

        # 从时序分析提取候选
        if "temporal" in analyses:
            temp_result = analyses["temporal"]
            if hasattr(temp_result, 'root_causes'):
                for rc in temp_result.root_causes:
                    candidate_scores[rc.resource_id]["score"] += rc.confidence * 0.3
                    candidate_scores[rc.resource_id]["evidence"].extend(rc.evidence)
                    candidate_scores[rc.resource_id]["sources"].append("temporal")

        # 从因果分析提取候选
        if "causal" in analyses:
            causal_result = analyses["causal"]
            if "edges" in causal_result:
                # 入度为 0 的节点可能是根因
                in_degree = defaultdict(int)
                out_degree = defaultdict(int)
                for edge in causal_result["edges"]:
                    in_degree[edge["to"]] += 1
                    out_degree[edge["from"]] += 1

                for node in set(out_degree.keys()) | set(in_degree.keys()):
                    if in_degree[node] == 0 and out_degree[node] > 0:
                        # 解析资源 ID
                        resource_id = node.split(":")[0]
                        candidate_scores[resource_id]["score"] += 0.3
                        candidate_scores[resource_id]["evidence"].append(f"因果分析: {node} 是因果链起点")
                        candidate_scores[resource_id]["sources"].append("causal")

        # 排序并返回
        sorted_candidates = sorted(
            candidate_scores.items(),
            key=lambda x: x[1]["score"],
            reverse=True
        )

        fused = []
        for rank, (resource_id, data) in enumerate(sorted_candidates[:self.config.max_candidates], 1):
            fused.append(RootCauseCandidate(
                resource_id=resource_id,
                resource_name=resource_id,
                confidence=min(data["score"], 1.0),
                rank=rank,
                evidence=list(set(data["evidence"])),
                impact_chain=[],
                related_anomalies=[],
                recommended_actions=[f"综合分析来源: {', '.join(set(data['sources']))}"]
            ))

        return fused

6. 最佳实践

6.1 告警聚合规则示例

# 告警关联规则配置

correlation_rules:
  # 数据库相关告警
  - name: "database_issues"
    incident_title: "数据库性能问题"
    alert_names:
      - "DatabaseConnectionPoolExhausted"
      - "DatabaseSlowQuery"
      - "DatabaseReplicationLag"
      - "DatabaseCPUHigh"
    resource_pattern: ".*database.*|.*mysql.*|.*postgres.*"
    time_window: 300

  # 网络相关告警
  - name: "network_issues"
    incident_title: "网络连接问题"
    alert_names:
      - "NetworkLatencyHigh"
      - "NetworkPacketLoss"
      - "DNSResolutionFailed"
      - "ConnectionTimeout"
    time_window: 180

  # Pod 健康告警
  - name: "pod_health"
    incident_title: "Pod 健康问题"
    alert_names:
      - "PodCrashLooping"
      - "PodOOMKilled"
      - "PodPending"
      - "ContainerRestarting"
    labels:
      namespace: "production"
    time_window: 300

  # 级联故障
  - name: "cascade_failure"
    incident_title: "级联故障"
    conditions:
      - min_alerts: 5
      - affected_services: 3
      - time_window: 120
    escalation:
      severity: critical
      notify: ["oncall", "manager"]

6.2 根因分析调优参数

"""
根因分析调优参数
"""

rca_tuning_params = {
    "topology_analysis": {
        "max_depth": 5,           # 最大依赖深度
        "time_weight": 0.3,       # 时间权重
        "position_weight": 0.3,   # 位置权重
        "propagation_weight": 0.2, # 传播权重
        "centrality_weight": 0.2  # 中心性权重
    },

    "temporal_analysis": {
        "max_lag": 60,            # 最大滞后
        "correlation_threshold": 0.5,  # 相关性阈值
        "leading_bonus": 1.2      # 领先指标加成
    },

    "causal_analysis": {
        "max_lag": 10,            # Granger 最大滞后
        "significance_level": 0.05  # 显著性水平
    },

    "alert_aggregation": {
        "dedup_window": 300,      # 去重窗口
        "correlation_window": 300, # 关联窗口
        "cluster_eps": 0.3,       # 聚类 epsilon
        "min_cluster_size": 2     # 最小聚类大小
    }
}

总结

本章深入探讨了根因分析与告警聚合:

  1. 拓扑分析:基于服务依赖图的故障传播分析
  2. 时序关联:相关性分析和因果推断
  3. 告警聚合:去重、关联和智能聚类
  4. 综合系统:多方法融合的根因分析

关键要点:

  • 结合拓扑、时序、因果多维度分析提高准确率
  • 告警聚合显著降低运维负担
  • 自动化根因分析加速故障恢复
  • 持续反馈优化分析模型

下一章将探讨智能运维决策,包括自动修复、容量预测和成本优化。

Prev
02-异常检测算法
Next
04-智能运维决策