HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

02-异常检测算法

概述

异常检测是 AIOps 的核心能力之一,用于自动识别系统中的异常行为。本章深入探讨时序异常检测、日志异常检测和多维异常检测的算法与实现。

1. 时序异常检测

1.1 异常类型分类

┌─────────────────────────────────────────────────────────────────┐
│                    时序异常类型                                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  点异常 (Point Anomaly)                                          │
│  ┌──────────────────────────────────────────┐                   │
│  │    *                                      │                   │
│  │   ╱ ╲    ╱╲    ╱╲                         │   单个数据点      │
│  │  ╱   ╲  ╱  ╲  ╱  ╲                        │   明显偏离       │
│  │ ╱     ╲╱    ╲╱    ╲                       │                   │
│  └──────────────────────────────────────────┘                   │
│                                                                 │
│  上下文异常 (Contextual Anomaly)                                  │
│  ┌──────────────────────────────────────────┐                   │
│  │         ╱╲                                │                   │
│  │    ╱╲  ╱  ╲  ╱╲   正常时段的              │   正常值在       │
│  │   ╱  ╲╱    ╲╱  ╲  异常高值               │   异常上下文     │
│  │  ╱                ╲                       │                   │
│  └──────────────────────────────────────────┘                   │
│                                                                 │
│  集体异常 (Collective Anomaly)                                    │
│  ┌──────────────────────────────────────────┐                   │
│  │          ════════                         │                   │
│  │    ╱╲   ╱        ╲   ╱╲                  │   一段时间的      │
│  │   ╱  ╲ ╱          ╲ ╱  ╲                 │   异常模式       │
│  │  ╱    ╲            ╲    ╲                │                   │
│  └──────────────────────────────────────────┘                   │
│                                                                 │
│  趋势异常 (Trend Anomaly)                                         │
│  ┌──────────────────────────────────────────┐                   │
│  │                        ╱                 │                   │
│  │    ══════════════    ╱                   │   趋势突变        │
│  │                    ╱                     │                   │
│  │                  ╱                       │                   │
│  └──────────────────────────────────────────┘                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 统计方法

"""
基于统计的时序异常检测
"""

import numpy as np
import pandas as pd
from typing import Tuple, List, Optional, Dict, Any
from dataclasses import dataclass
from scipy import stats
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller


@dataclass
class AnomalyResult:
    """异常检测结果"""
    timestamp: float
    value: float
    is_anomaly: bool
    anomaly_score: float
    anomaly_type: str
    details: Dict[str, Any]


class StatisticalAnomalyDetector:
    """统计异常检测器"""

    def __init__(self, window_size: int = 100,
                 threshold_sigma: float = 3.0):
        """
        初始化检测器

        Args:
            window_size: 滑动窗口大小
            threshold_sigma: 标准差阈值
        """
        self.window_size = window_size
        self.threshold_sigma = threshold_sigma
        self.history: List[float] = []

    def z_score_detect(self, value: float) -> AnomalyResult:
        """
        Z-Score 异常检测

        Args:
            value: 当前值

        Returns:
            AnomalyResult: 检测结果
        """
        import time

        self.history.append(value)

        if len(self.history) < self.window_size:
            return AnomalyResult(
                timestamp=time.time(),
                value=value,
                is_anomaly=False,
                anomaly_score=0.0,
                anomaly_type="none",
                details={"reason": "insufficient_data"}
            )

        # 使用滑动窗口
        window = self.history[-self.window_size:]
        mean = np.mean(window[:-1])  # 不包含当前值
        std = np.std(window[:-1])

        if std == 0:
            z_score = 0
        else:
            z_score = (value - mean) / std

        is_anomaly = abs(z_score) > self.threshold_sigma

        return AnomalyResult(
            timestamp=time.time(),
            value=value,
            is_anomaly=is_anomaly,
            anomaly_score=abs(z_score) / self.threshold_sigma,
            anomaly_type="point" if is_anomaly else "none",
            details={
                "z_score": z_score,
                "mean": mean,
                "std": std,
                "threshold": self.threshold_sigma
            }
        )

    def mad_detect(self, value: float) -> AnomalyResult:
        """
        MAD (Median Absolute Deviation) 异常检测
        对离群值更鲁棒

        Args:
            value: 当前值

        Returns:
            AnomalyResult: 检测结果
        """
        import time

        self.history.append(value)

        if len(self.history) < self.window_size:
            return AnomalyResult(
                timestamp=time.time(),
                value=value,
                is_anomaly=False,
                anomaly_score=0.0,
                anomaly_type="none",
                details={"reason": "insufficient_data"}
            )

        window = self.history[-self.window_size:]
        median = np.median(window[:-1])
        mad = np.median(np.abs(window[:-1] - median))

        # 修正系数 (正态分布)
        mad_corrected = mad * 1.4826

        if mad_corrected == 0:
            modified_z = 0
        else:
            modified_z = (value - median) / mad_corrected

        is_anomaly = abs(modified_z) > self.threshold_sigma

        return AnomalyResult(
            timestamp=time.time(),
            value=value,
            is_anomaly=is_anomaly,
            anomaly_score=abs(modified_z) / self.threshold_sigma,
            anomaly_type="point" if is_anomaly else "none",
            details={
                "modified_z_score": modified_z,
                "median": median,
                "mad": mad_corrected,
                "threshold": self.threshold_sigma
            }
        )

    def ewma_detect(self, value: float, alpha: float = 0.3) -> AnomalyResult:
        """
        EWMA (Exponentially Weighted Moving Average) 异常检测

        Args:
            value: 当前值
            alpha: 平滑系数

        Returns:
            AnomalyResult: 检测结果
        """
        import time

        self.history.append(value)

        if len(self.history) < 2:
            return AnomalyResult(
                timestamp=time.time(),
                value=value,
                is_anomaly=False,
                anomaly_score=0.0,
                anomaly_type="none",
                details={"reason": "insufficient_data"}
            )

        # 计算 EWMA
        ewma = [self.history[0]]
        for v in self.history[1:-1]:
            ewma.append(alpha * v + (1 - alpha) * ewma[-1])

        predicted = alpha * self.history[-2] + (1 - alpha) * ewma[-1]

        # 计算残差标准差
        residuals = np.array(self.history[1:-1]) - np.array(ewma)
        residual_std = np.std(residuals) if len(residuals) > 0 else 1

        # 当前残差
        current_residual = value - predicted

        if residual_std == 0:
            z_score = 0
        else:
            z_score = current_residual / residual_std

        is_anomaly = abs(z_score) > self.threshold_sigma

        return AnomalyResult(
            timestamp=time.time(),
            value=value,
            is_anomaly=is_anomaly,
            anomaly_score=abs(z_score) / self.threshold_sigma,
            anomaly_type="point" if is_anomaly else "none",
            details={
                "predicted": predicted,
                "residual": current_residual,
                "z_score": z_score
            }
        )


class SeasonalAnomalyDetector:
    """季节性异常检测器"""

    def __init__(self, period: int = 24,
                 threshold_sigma: float = 3.0):
        """
        初始化检测器

        Args:
            period: 季节周期 (如 24 表示每日周期)
            threshold_sigma: 标准差阈值
        """
        self.period = period
        self.threshold_sigma = threshold_sigma
        self.seasonal_profile: Optional[np.ndarray] = None
        self.residual_std: float = 1.0

    def fit(self, data: np.ndarray):
        """
        拟合季节性模型

        Args:
            data: 历史数据
        """
        if len(data) < 2 * self.period:
            raise ValueError("数据长度不足以拟合季节性模型")

        # 季节分解
        series = pd.Series(data)
        decomposition = seasonal_decompose(
            series,
            model='additive',
            period=self.period,
            extrapolate_trend='freq'
        )

        # 提取季节性 profile
        self.seasonal_profile = decomposition.seasonal[:self.period].values

        # 计算残差标准差
        residuals = decomposition.resid.dropna()
        self.residual_std = residuals.std()

    def detect(self, value: float, position: int) -> AnomalyResult:
        """
        检测异常

        Args:
            value: 当前值
            position: 在周期中的位置

        Returns:
            AnomalyResult: 检测结果
        """
        import time

        if self.seasonal_profile is None:
            raise ValueError("请先调用 fit() 方法")

        # 获取季节性期望值
        expected = self.seasonal_profile[position % self.period]

        # 计算偏差
        deviation = value - expected
        z_score = deviation / self.residual_std if self.residual_std > 0 else 0

        is_anomaly = abs(z_score) > self.threshold_sigma

        return AnomalyResult(
            timestamp=time.time(),
            value=value,
            is_anomaly=is_anomaly,
            anomaly_score=abs(z_score) / self.threshold_sigma,
            anomaly_type="contextual" if is_anomaly else "none",
            details={
                "expected": expected,
                "deviation": deviation,
                "z_score": z_score,
                "position_in_cycle": position % self.period
            }
        )


class ChangePointDetector:
    """变点检测器"""

    def __init__(self, min_segment_length: int = 10):
        """
        初始化检测器

        Args:
            min_segment_length: 最小分段长度
        """
        self.min_segment_length = min_segment_length

    def detect_cusum(self, data: np.ndarray,
                     threshold: float = 5.0) -> List[int]:
        """
        CUSUM (Cumulative Sum) 变点检测

        Args:
            data: 时序数据
            threshold: 检测阈值

        Returns:
            List: 变点位置列表
        """
        mean = np.mean(data)
        std = np.std(data)

        if std == 0:
            return []

        normalized = (data - mean) / std

        # 正向和负向 CUSUM
        pos_cusum = np.zeros(len(data))
        neg_cusum = np.zeros(len(data))

        change_points = []

        for i in range(1, len(data)):
            pos_cusum[i] = max(0, pos_cusum[i-1] + normalized[i] - 0.5)
            neg_cusum[i] = max(0, neg_cusum[i-1] - normalized[i] - 0.5)

            if pos_cusum[i] > threshold or neg_cusum[i] > threshold:
                change_points.append(i)
                pos_cusum[i] = 0
                neg_cusum[i] = 0

        return change_points

    def detect_pelt(self, data: np.ndarray,
                    penalty: float = None) -> List[int]:
        """
        PELT (Pruned Exact Linear Time) 变点检测

        Args:
            data: 时序数据
            penalty: 惩罚参数

        Returns:
            List: 变点位置列表
        """
        import ruptures as rpt

        if penalty is None:
            penalty = np.log(len(data)) * np.var(data)

        # 使用 PELT 算法
        model = rpt.Pelt(model="rbf", min_size=self.min_segment_length)
        model.fit(data)
        change_points = model.predict(pen=penalty)

        # 移除最后一个点 (序列结尾)
        return change_points[:-1]

    def detect_bocpd(self, data: np.ndarray,
                     hazard: float = 1/200) -> List[Tuple[int, float]]:
        """
        BOCPD (Bayesian Online Changepoint Detection)

        Args:
            data: 时序数据
            hazard: 危险率

        Returns:
            List: (位置, 概率) 列表
        """
        n = len(data)

        # 运行长度概率
        R = np.zeros((n + 1, n + 1))
        R[0, 0] = 1

        change_points = []

        # 观测似然参数
        mu0 = 0
        kappa0 = 1
        alpha0 = 1
        beta0 = 1

        muT = mu0
        kappaT = kappa0
        alphaT = alpha0
        betaT = beta0

        for t in range(n):
            x = data[t]

            # 预测概率
            predprobs = self._student_t_pdf(
                x, muT, betaT * (kappaT + 1) / (alphaT * kappaT), 2 * alphaT
            )

            # 增长概率
            R[1:t+2, t+1] = R[:t+1, t] * predprobs * (1 - hazard)

            # 变点概率
            R[0, t+1] = np.sum(R[:t+1, t] * predprobs * hazard)

            # 归一化
            R[:t+2, t+1] = R[:t+2, t+1] / R[:t+2, t+1].sum()

            # 检测变点
            if R[0, t+1] > 0.5:
                change_points.append((t, R[0, t+1]))

            # 更新参数
            muT = (kappaT * muT + x) / (kappaT + 1)
            kappaT = kappaT + 1
            alphaT = alphaT + 0.5
            betaT = betaT + kappaT * (x - muT) ** 2 / (2 * (kappaT + 1))

        return change_points

    def _student_t_pdf(self, x: float, mu: float,
                       sigma: float, nu: float) -> float:
        """Student-t 分布概率密度"""
        return stats.t.pdf(x, nu, loc=mu, scale=np.sqrt(sigma))

1.3 机器学习方法

"""
基于机器学习的时序异常检测
"""

import numpy as np
from typing import List, Tuple, Optional
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
import torch
import torch.nn as nn


class IsolationForestDetector:
    """孤立森林异常检测"""

    def __init__(self, n_estimators: int = 100,
                 contamination: float = 0.1,
                 window_size: int = 10):
        """
        初始化检测器

        Args:
            n_estimators: 树的数量
            contamination: 异常比例
            window_size: 特征窗口大小
        """
        self.n_estimators = n_estimators
        self.contamination = contamination
        self.window_size = window_size
        self.model = None
        self.history: List[float] = []

    def _extract_features(self, data: List[float]) -> np.ndarray:
        """提取时序特征"""
        features = []

        for i in range(self.window_size, len(data)):
            window = data[i-self.window_size:i]

            feat = [
                np.mean(window),           # 均值
                np.std(window),            # 标准差
                np.min(window),            # 最小值
                np.max(window),            # 最大值
                np.percentile(window, 25), # 25分位数
                np.percentile(window, 75), # 75分位数
                data[i] - window[-1],      # 差分
                (data[i] - np.mean(window)) / (np.std(window) + 1e-10)  # z-score
            ]
            features.append(feat)

        return np.array(features)

    def fit(self, data: List[float]):
        """拟合模型"""
        self.history = list(data)
        X = self._extract_features(self.history)

        self.model = IsolationForest(
            n_estimators=self.n_estimators,
            contamination=self.contamination,
            random_state=42
        )
        self.model.fit(X)

    def detect(self, value: float) -> Tuple[bool, float]:
        """
        检测异常

        Args:
            value: 当前值

        Returns:
            (是否异常, 异常分数)
        """
        self.history.append(value)

        if len(self.history) < self.window_size + 1:
            return False, 0.0

        X = self._extract_features(self.history[-self.window_size-1:])

        prediction = self.model.predict(X[-1:])
        score = self.model.score_samples(X[-1:])

        is_anomaly = prediction[0] == -1

        return is_anomaly, float(-score[0])


class LOFDetector:
    """Local Outlier Factor 异常检测"""

    def __init__(self, n_neighbors: int = 20,
                 window_size: int = 10):
        """
        初始化检测器

        Args:
            n_neighbors: 近邻数量
            window_size: 特征窗口大小
        """
        self.n_neighbors = n_neighbors
        self.window_size = window_size
        self.model = None
        self.reference_data: Optional[np.ndarray] = None

    def fit(self, data: List[float]):
        """拟合模型"""
        features = self._extract_features(data)
        self.reference_data = features

        self.model = LocalOutlierFactor(
            n_neighbors=self.n_neighbors,
            novelty=True
        )
        self.model.fit(features)

    def _extract_features(self, data: List[float]) -> np.ndarray:
        """提取时序特征"""
        features = []

        for i in range(self.window_size, len(data)):
            window = data[i-self.window_size:i]
            feat = [
                np.mean(window),
                np.std(window),
                data[i],
                data[i] - window[-1]
            ]
            features.append(feat)

        return np.array(features)

    def detect(self, window: List[float]) -> Tuple[bool, float]:
        """检测异常"""
        if len(window) < self.window_size + 1:
            return False, 0.0

        X = self._extract_features(window)
        prediction = self.model.predict(X[-1:])
        score = self.model.score_samples(X[-1:])

        return prediction[0] == -1, float(-score[0])


class LSTMAutoencoder(nn.Module):
    """LSTM 自编码器"""

    def __init__(self, input_dim: int, hidden_dim: int,
                 latent_dim: int, num_layers: int = 2):
        super().__init__()

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim
        self.num_layers = num_layers

        # 编码器
        self.encoder_lstm = nn.LSTM(
            input_dim, hidden_dim, num_layers,
            batch_first=True, dropout=0.2
        )
        self.encoder_fc = nn.Linear(hidden_dim, latent_dim)

        # 解码器
        self.decoder_fc = nn.Linear(latent_dim, hidden_dim)
        self.decoder_lstm = nn.LSTM(
            hidden_dim, hidden_dim, num_layers,
            batch_first=True, dropout=0.2
        )
        self.output_fc = nn.Linear(hidden_dim, input_dim)

    def encode(self, x):
        """编码"""
        lstm_out, _ = self.encoder_lstm(x)
        latent = self.encoder_fc(lstm_out[:, -1, :])
        return latent

    def decode(self, latent, seq_len):
        """解码"""
        hidden = self.decoder_fc(latent)
        hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1)
        lstm_out, _ = self.decoder_lstm(hidden)
        output = self.output_fc(lstm_out)
        return output

    def forward(self, x):
        """前向传播"""
        latent = self.encode(x)
        reconstructed = self.decode(latent, x.size(1))
        return reconstructed


class LSTMAutoencoderDetector:
    """基于 LSTM 自编码器的异常检测"""

    def __init__(self, sequence_length: int = 50,
                 hidden_dim: int = 64,
                 latent_dim: int = 16,
                 threshold_percentile: float = 95):
        """
        初始化检测器

        Args:
            sequence_length: 序列长度
            hidden_dim: 隐藏层维度
            latent_dim: 潜在空间维度
            threshold_percentile: 阈值百分位数
        """
        self.sequence_length = sequence_length
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim
        self.threshold_percentile = threshold_percentile

        self.model = LSTMAutoencoder(1, hidden_dim, latent_dim)
        self.threshold = None
        self.scaler_mean = 0
        self.scaler_std = 1

    def fit(self, data: np.ndarray, epochs: int = 100,
            batch_size: int = 32, lr: float = 0.001):
        """训练模型"""
        # 数据标准化
        self.scaler_mean = np.mean(data)
        self.scaler_std = np.std(data)
        normalized = (data - self.scaler_mean) / (self.scaler_std + 1e-10)

        # 创建序列
        sequences = self._create_sequences(normalized)
        X = torch.FloatTensor(sequences).unsqueeze(-1)

        # 训练
        optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
        criterion = nn.MSELoss()

        self.model.train()
        for epoch in range(epochs):
            total_loss = 0
            for i in range(0, len(X), batch_size):
                batch = X[i:i+batch_size]

                optimizer.zero_grad()
                reconstructed = self.model(batch)
                loss = criterion(reconstructed, batch)
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

        # 计算阈值
        self.model.eval()
        with torch.no_grad():
            reconstructed = self.model(X)
            reconstruction_errors = torch.mean((X - reconstructed) ** 2, dim=(1, 2))
            self.threshold = np.percentile(
                reconstruction_errors.numpy(),
                self.threshold_percentile
            )

    def _create_sequences(self, data: np.ndarray) -> np.ndarray:
        """创建序列"""
        sequences = []
        for i in range(len(data) - self.sequence_length + 1):
            sequences.append(data[i:i+self.sequence_length])
        return np.array(sequences)

    def detect(self, sequence: np.ndarray) -> Tuple[bool, float]:
        """检测异常"""
        # 标准化
        normalized = (sequence - self.scaler_mean) / (self.scaler_std + 1e-10)

        X = torch.FloatTensor(normalized).unsqueeze(0).unsqueeze(-1)

        self.model.eval()
        with torch.no_grad():
            reconstructed = self.model(X)
            error = torch.mean((X - reconstructed) ** 2).item()

        is_anomaly = error > self.threshold
        anomaly_score = error / self.threshold

        return is_anomaly, anomaly_score

    def detect_in_series(self, data: np.ndarray) -> List[Tuple[int, bool, float]]:
        """在整个序列中检测异常"""
        results = []

        for i in range(len(data) - self.sequence_length + 1):
            sequence = data[i:i+self.sequence_length]
            is_anomaly, score = self.detect(sequence)
            results.append((i + self.sequence_length - 1, is_anomaly, score))

        return results

2. 日志异常检测

2.1 日志解析

"""
日志解析与模板提取
"""

import re
from typing import List, Dict, Tuple, Optional
from collections import defaultdict
import hashlib


class DrainLogParser:
    """Drain 日志解析器"""

    def __init__(self, depth: int = 4,
                 sim_th: float = 0.4,
                 max_children: int = 100):
        """
        初始化解析器

        Args:
            depth: 解析树深度
            sim_th: 相似度阈值
            max_children: 最大子节点数
        """
        self.depth = depth
        self.sim_th = sim_th
        self.max_children = max_children

        # 解析树根节点
        self.root_node = Node()
        self.log_clusters: Dict[str, LogCluster] = {}

        # 正则预处理
        self.regex_patterns = [
            (r'\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}', '<TIMESTAMP>'),
            (r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', '<IP>'),
            (r':\d+', ':<PORT>'),
            (r'/[a-zA-Z0-9_\-/]+', '<PATH>'),
            (r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b', '<UUID>'),
            (r'\b\d+\b', '<NUM>'),
        ]

    def preprocess(self, log_line: str) -> List[str]:
        """预处理日志行"""
        # 应用正则替换
        for pattern, replacement in self.regex_patterns:
            log_line = re.sub(pattern, replacement, log_line)

        # 分词
        tokens = log_line.strip().split()
        return tokens

    def parse(self, log_line: str) -> Tuple[str, str]:
        """
        解析日志行

        Args:
            log_line: 原始日志行

        Returns:
            (模板 ID, 模板)
        """
        tokens = self.preprocess(log_line)

        if not tokens:
            return "empty", ""

        # 在解析树中查找匹配的簇
        cluster = self._tree_search(tokens)

        if cluster is None:
            # 创建新簇
            cluster = LogCluster(tokens)
            self._add_cluster(cluster, tokens)

        # 更新簇
        cluster.update(tokens)

        return cluster.cluster_id, cluster.get_template()

    def _tree_search(self, tokens: List[str]) -> Optional['LogCluster']:
        """在解析树中搜索匹配的簇"""
        # 第一层: 日志长度
        token_count = len(tokens)
        if token_count not in self.root_node.children:
            return None

        parent_node = self.root_node.children[token_count]

        # 中间层: 前几个 token
        current_depth = 1
        for token in tokens:
            if current_depth >= self.depth:
                break

            if token in parent_node.children:
                parent_node = parent_node.children[token]
            elif '<*>' in parent_node.children:
                parent_node = parent_node.children['<*>']
            else:
                return None

            current_depth += 1

        # 叶子节点: 查找最匹配的簇
        return self._fast_match(parent_node.clusters, tokens)

    def _fast_match(self, clusters: List['LogCluster'],
                    tokens: List[str]) -> Optional['LogCluster']:
        """快速匹配"""
        best_cluster = None
        best_sim = -1

        for cluster in clusters:
            sim = cluster.similarity(tokens)
            if sim > self.sim_th and sim > best_sim:
                best_sim = sim
                best_cluster = cluster

        return best_cluster

    def _add_cluster(self, cluster: 'LogCluster', tokens: List[str]):
        """添加簇到解析树"""
        token_count = len(tokens)

        # 第一层
        if token_count not in self.root_node.children:
            self.root_node.children[token_count] = Node()

        parent_node = self.root_node.children[token_count]

        # 中间层
        current_depth = 1
        for token in tokens:
            if current_depth >= self.depth:
                break

            # 检查是否为变量
            if self._is_variable(token):
                token = '<*>'

            if token not in parent_node.children:
                if len(parent_node.children) >= self.max_children:
                    token = '<*>'
                    if token not in parent_node.children:
                        parent_node.children[token] = Node()

                else:
                    parent_node.children[token] = Node()

            parent_node = parent_node.children[token]
            current_depth += 1

        # 添加到叶子节点
        parent_node.clusters.append(cluster)
        self.log_clusters[cluster.cluster_id] = cluster

    def _is_variable(self, token: str) -> bool:
        """判断 token 是否为变量"""
        variable_patterns = ['<', '>', '*', '=']
        for p in variable_patterns:
            if p in token:
                return True

        # 包含数字的 token 可能是变量
        if re.search(r'\d', token):
            return True

        return False

    def get_all_templates(self) -> Dict[str, str]:
        """获取所有模板"""
        return {
            cluster_id: cluster.get_template()
            for cluster_id, cluster in self.log_clusters.items()
        }


class Node:
    """解析树节点"""

    def __init__(self):
        self.children: Dict[str, 'Node'] = {}
        self.clusters: List['LogCluster'] = []


class LogCluster:
    """日志簇"""

    def __init__(self, tokens: List[str]):
        self.cluster_id = self._generate_id(tokens)
        self.template_tokens = list(tokens)
        self.count = 1

    def _generate_id(self, tokens: List[str]) -> str:
        """生成簇 ID"""
        template = " ".join(tokens)
        return hashlib.md5(template.encode()).hexdigest()[:8]

    def similarity(self, tokens: List[str]) -> float:
        """计算相似度"""
        if len(tokens) != len(self.template_tokens):
            return 0

        match_count = sum(
            1 for t1, t2 in zip(tokens, self.template_tokens)
            if t1 == t2 or t2 == '<*>'
        )

        return match_count / len(tokens)

    def update(self, tokens: List[str]):
        """更新模板"""
        self.count += 1

        # 合并模板: 不匹配的位置变为通配符
        for i, (t1, t2) in enumerate(zip(tokens, self.template_tokens)):
            if t1 != t2:
                self.template_tokens[i] = '<*>'

    def get_template(self) -> str:
        """获取模板"""
        return " ".join(self.template_tokens)


class LogSequenceBuilder:
    """日志序列构建器"""

    def __init__(self, parser: DrainLogParser,
                 window_size: int = 100):
        """
        初始化构建器

        Args:
            parser: 日志解析器
            window_size: 序列窗口大小
        """
        self.parser = parser
        self.window_size = window_size
        self.template_vocab: Dict[str, int] = {}
        self.next_id = 0

    def build_sequences(self, logs: List[str]) -> List[List[int]]:
        """构建日志序列"""
        # 解析所有日志
        template_ids = []
        for log in logs:
            template_id, _ = self.parser.parse(log)

            if template_id not in self.template_vocab:
                self.template_vocab[template_id] = self.next_id
                self.next_id += 1

            template_ids.append(self.template_vocab[template_id])

        # 构建序列
        sequences = []
        for i in range(len(template_ids) - self.window_size + 1):
            sequences.append(template_ids[i:i+self.window_size])

        return sequences

2.2 日志异常检测模型

"""
日志异常检测模型
"""

import numpy as np
import torch
import torch.nn as nn
from typing import List, Dict, Tuple, Optional
from collections import Counter


class LogAnomaly:
    """基于计数的日志异常检测"""

    def __init__(self, threshold_percentile: float = 95):
        """
        初始化检测器

        Args:
            threshold_percentile: 阈值百分位数
        """
        self.threshold_percentile = threshold_percentile
        self.template_counts: Counter = Counter()
        self.total_count = 0
        self.thresholds: Dict[str, float] = {}

    def fit(self, template_sequences: List[List[str]]):
        """训练模型"""
        # 统计模板出现次数
        for sequence in template_sequences:
            self.template_counts.update(sequence)
            self.total_count += len(sequence)

        # 计算每个模板的正常频率范围
        sequence_template_counts = []
        for sequence in template_sequences:
            counts = Counter(sequence)
            sequence_template_counts.append(counts)

        # 计算每个模板的阈值
        all_templates = set(self.template_counts.keys())
        for template in all_templates:
            counts = [sc.get(template, 0) for sc in sequence_template_counts]
            self.thresholds[template] = np.percentile(counts, self.threshold_percentile)

    def detect(self, sequence: List[str]) -> Tuple[bool, Dict[str, float]]:
        """
        检测异常

        Args:
            sequence: 模板序列

        Returns:
            (是否异常, 异常详情)
        """
        counts = Counter(sequence)
        anomalies = {}

        for template, count in counts.items():
            if template in self.thresholds:
                if count > self.thresholds[template]:
                    anomalies[template] = {
                        "count": count,
                        "threshold": self.thresholds[template],
                        "type": "excess"
                    }
            else:
                # 新模板
                anomalies[template] = {
                    "count": count,
                    "threshold": 0,
                    "type": "new_template"
                }

        is_anomaly = len(anomalies) > 0
        return is_anomaly, anomalies


class DeepLogModel(nn.Module):
    """DeepLog LSTM 模型"""

    def __init__(self, vocab_size: int, embedding_dim: int = 64,
                 hidden_dim: int = 128, num_layers: int = 2):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(
            embedding_dim, hidden_dim, num_layers,
            batch_first=True, dropout=0.2
        )
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        output = self.fc(lstm_out[:, -1, :])
        return output


class DeepLogDetector:
    """DeepLog 日志异常检测"""

    def __init__(self, vocab_size: int,
                 window_size: int = 10,
                 top_k: int = 9):
        """
        初始化检测器

        Args:
            vocab_size: 词表大小
            window_size: 窗口大小
            top_k: Top-K 候选
        """
        self.vocab_size = vocab_size
        self.window_size = window_size
        self.top_k = top_k

        self.model = DeepLogModel(vocab_size)

    def fit(self, sequences: List[List[int]],
            epochs: int = 50, batch_size: int = 64, lr: float = 0.001):
        """训练模型"""
        # 创建训练数据
        X, y = self._create_training_data(sequences)
        X = torch.LongTensor(X)
        y = torch.LongTensor(y)

        optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss()

        self.model.train()
        for epoch in range(epochs):
            total_loss = 0
            for i in range(0, len(X), batch_size):
                batch_X = X[i:i+batch_size]
                batch_y = y[i:i+batch_size]

                optimizer.zero_grad()
                outputs = self.model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

            if (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch+1}, Loss: {total_loss/len(X):.4f}")

    def _create_training_data(self, sequences: List[List[int]]) -> Tuple[np.ndarray, np.ndarray]:
        """创建训练数据"""
        X, y = [], []

        for seq in sequences:
            for i in range(len(seq) - self.window_size):
                X.append(seq[i:i+self.window_size])
                y.append(seq[i+self.window_size])

        return np.array(X), np.array(y)

    def detect(self, sequence: List[int]) -> List[Tuple[int, bool, float]]:
        """
        检测异常

        Args:
            sequence: 日志 ID 序列

        Returns:
            List[(位置, 是否异常, 异常分数)]
        """
        results = []
        self.model.eval()

        with torch.no_grad():
            for i in range(len(sequence) - self.window_size):
                window = sequence[i:i+self.window_size]
                actual = sequence[i+self.window_size]

                X = torch.LongTensor([window])
                output = self.model(X)
                probs = torch.softmax(output, dim=1)

                # 获取 Top-K 预测
                top_k_probs, top_k_indices = torch.topk(probs, self.top_k)

                is_anomaly = actual not in top_k_indices[0].tolist()
                anomaly_score = 1 - probs[0, actual].item()

                results.append((i + self.window_size, is_anomaly, anomaly_score))

        return results


class LogBERTDetector:
    """基于 BERT 的日志异常检测"""

    def __init__(self, model_name: str = "bert-base-uncased",
                 max_length: int = 128):
        """
        初始化检测器

        Args:
            model_name: BERT 模型名称
            max_length: 最大序列长度
        """
        from transformers import BertTokenizer, BertForSequenceClassification

        self.tokenizer = BertTokenizer.from_pretrained(model_name)
        self.model = BertForSequenceClassification.from_pretrained(
            model_name,
            num_labels=2  # normal, anomaly
        )
        self.max_length = max_length

    def fit(self, logs: List[str], labels: List[int],
            epochs: int = 3, batch_size: int = 16, lr: float = 2e-5):
        """微调模型"""
        from torch.utils.data import DataLoader, TensorDataset

        # 编码
        encodings = self.tokenizer(
            logs,
            truncation=True,
            padding=True,
            max_length=self.max_length,
            return_tensors="pt"
        )

        dataset = TensorDataset(
            encodings['input_ids'],
            encodings['attention_mask'],
            torch.LongTensor(labels)
        )
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

        optimizer = torch.optim.AdamW(self.model.parameters(), lr=lr)

        self.model.train()
        for epoch in range(epochs):
            total_loss = 0
            for batch in dataloader:
                input_ids, attention_mask, batch_labels = batch

                optimizer.zero_grad()
                outputs = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    labels=batch_labels
                )
                loss = outputs.loss
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

            print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}")

    def detect(self, logs: List[str]) -> List[Tuple[bool, float]]:
        """检测异常"""
        self.model.eval()
        results = []

        with torch.no_grad():
            for log in logs:
                encoding = self.tokenizer(
                    log,
                    truncation=True,
                    padding=True,
                    max_length=self.max_length,
                    return_tensors="pt"
                )

                outputs = self.model(**encoding)
                probs = torch.softmax(outputs.logits, dim=1)

                is_anomaly = probs[0, 1] > 0.5
                anomaly_score = probs[0, 1].item()

                results.append((bool(is_anomaly), anomaly_score))

        return results

3. 多维异常检测

3.1 多维数据建模

"""
多维异常检测
"""

import numpy as np
from typing import List, Dict, Tuple, Optional
from sklearn.covariance import EllipticEnvelope
from sklearn.decomposition import PCA
import torch
import torch.nn as nn


class MultivariateAnomalyDetector:
    """多维异常检测器"""

    def __init__(self, contamination: float = 0.1):
        """
        初始化检测器

        Args:
            contamination: 异常比例
        """
        self.contamination = contamination
        self.model = None
        self.feature_names: List[str] = []

    def fit(self, X: np.ndarray, feature_names: List[str]):
        """拟合模型"""
        self.feature_names = feature_names

        # 使用椭圆包络
        self.model = EllipticEnvelope(
            contamination=self.contamination,
            random_state=42
        )
        self.model.fit(X)

    def detect(self, x: np.ndarray) -> Tuple[bool, float, Dict[str, float]]:
        """
        检测异常

        Args:
            x: 多维数据点

        Returns:
            (是否异常, 异常分数, 特征贡献)
        """
        if x.ndim == 1:
            x = x.reshape(1, -1)

        prediction = self.model.predict(x)
        score = self.model.score_samples(x)

        is_anomaly = prediction[0] == -1
        anomaly_score = -score[0]

        # 计算特征贡献
        contributions = self._compute_contributions(x[0])

        return is_anomaly, anomaly_score, contributions

    def _compute_contributions(self, x: np.ndarray) -> Dict[str, float]:
        """计算各特征对异常的贡献"""
        mean = self.model.location_
        precision = self.model.precision_

        # 马氏距离分解
        diff = x - mean
        contributions = {}

        for i, name in enumerate(self.feature_names):
            # 单特征贡献
            contrib = abs(diff[i]) * np.sqrt(precision[i, i])
            contributions[name] = float(contrib)

        # 归一化
        total = sum(contributions.values())
        if total > 0:
            contributions = {k: v/total for k, v in contributions.items()}

        return contributions


class PCABasedDetector:
    """基于 PCA 的异常检测"""

    def __init__(self, n_components: float = 0.95,
                 threshold_percentile: float = 95):
        """
        初始化检测器

        Args:
            n_components: 保留的方差比例
            threshold_percentile: 异常阈值百分位
        """
        self.n_components = n_components
        self.threshold_percentile = threshold_percentile
        self.pca = None
        self.threshold = None

    def fit(self, X: np.ndarray):
        """拟合模型"""
        self.pca = PCA(n_components=self.n_components)
        self.pca.fit(X)

        # 计算重构误差
        X_transformed = self.pca.transform(X)
        X_reconstructed = self.pca.inverse_transform(X_transformed)
        reconstruction_errors = np.sum((X - X_reconstructed) ** 2, axis=1)

        self.threshold = np.percentile(reconstruction_errors, self.threshold_percentile)

    def detect(self, x: np.ndarray) -> Tuple[bool, float, np.ndarray]:
        """
        检测异常

        Args:
            x: 数据点

        Returns:
            (是否异常, 重构误差, 重构数据)
        """
        if x.ndim == 1:
            x = x.reshape(1, -1)

        x_transformed = self.pca.transform(x)
        x_reconstructed = self.pca.inverse_transform(x_transformed)
        error = np.sum((x - x_reconstructed) ** 2)

        is_anomaly = error > self.threshold
        return is_anomaly, error, x_reconstructed[0]


class VAEAnomalyDetector(nn.Module):
    """VAE 多维异常检测"""

    def __init__(self, input_dim: int, hidden_dims: List[int],
                 latent_dim: int):
        super().__init__()

        self.input_dim = input_dim
        self.latent_dim = latent_dim

        # 编码器
        encoder_layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            encoder_layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.BatchNorm1d(hidden_dim)
            ])
            prev_dim = hidden_dim

        self.encoder = nn.Sequential(*encoder_layers)
        self.fc_mu = nn.Linear(prev_dim, latent_dim)
        self.fc_var = nn.Linear(prev_dim, latent_dim)

        # 解码器
        decoder_layers = []
        prev_dim = latent_dim
        for hidden_dim in reversed(hidden_dims):
            decoder_layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.BatchNorm1d(hidden_dim)
            ])
            prev_dim = hidden_dim

        decoder_layers.append(nn.Linear(prev_dim, input_dim))
        self.decoder = nn.Sequential(*decoder_layers)

    def encode(self, x):
        h = self.encoder(x)
        return self.fc_mu(h), self.fc_var(h)

    def reparameterize(self, mu, log_var):
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        return self.decoder(z)

    def forward(self, x):
        mu, log_var = self.encode(x)
        z = self.reparameterize(mu, log_var)
        return self.decode(z), mu, log_var


class VAEDetector:
    """VAE 异常检测封装"""

    def __init__(self, input_dim: int,
                 hidden_dims: List[int] = [128, 64],
                 latent_dim: int = 16,
                 threshold_percentile: float = 95):
        self.model = VAEAnomalyDetector(input_dim, hidden_dims, latent_dim)
        self.threshold_percentile = threshold_percentile
        self.threshold = None

    def fit(self, X: np.ndarray, epochs: int = 100,
            batch_size: int = 64, lr: float = 0.001):
        """训练模型"""
        X_tensor = torch.FloatTensor(X)

        optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)

        self.model.train()
        for epoch in range(epochs):
            total_loss = 0
            for i in range(0, len(X_tensor), batch_size):
                batch = X_tensor[i:i+batch_size]

                optimizer.zero_grad()
                recon, mu, log_var = self.model(batch)

                # 重构损失
                recon_loss = nn.MSELoss()(recon, batch)
                # KL 散度
                kl_loss = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())

                loss = recon_loss + 0.001 * kl_loss
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

        # 计算阈值
        self.model.eval()
        with torch.no_grad():
            recon, _, _ = self.model(X_tensor)
            errors = torch.sum((X_tensor - recon) ** 2, dim=1).numpy()
            self.threshold = np.percentile(errors, self.threshold_percentile)

    def detect(self, x: np.ndarray) -> Tuple[bool, float]:
        """检测异常"""
        self.model.eval()
        x_tensor = torch.FloatTensor(x.reshape(1, -1))

        with torch.no_grad():
            recon, _, _ = self.model(x_tensor)
            error = torch.sum((x_tensor - recon) ** 2).item()

        is_anomaly = error > self.threshold
        return is_anomaly, error / self.threshold

3.2 关联异常检测

"""
关联异常检测 - 检测指标间的异常关联
"""

import numpy as np
from typing import Dict, List, Tuple, Optional
from scipy.stats import pearsonr, spearmanr
from sklearn.covariance import GraphicalLasso
import networkx as nx


class CorrelationAnomalyDetector:
    """相关性异常检测"""

    def __init__(self, window_size: int = 100,
                 correlation_method: str = "pearson"):
        """
        初始化检测器

        Args:
            window_size: 窗口大小
            correlation_method: 相关性方法 (pearson, spearman)
        """
        self.window_size = window_size
        self.correlation_method = correlation_method
        self.baseline_correlation: Optional[np.ndarray] = None
        self.feature_names: List[str] = []

    def fit(self, X: np.ndarray, feature_names: List[str]):
        """建立基线相关性"""
        self.feature_names = feature_names
        self.baseline_correlation = self._compute_correlation(X)

    def _compute_correlation(self, X: np.ndarray) -> np.ndarray:
        """计算相关性矩阵"""
        n_features = X.shape[1]
        corr_matrix = np.zeros((n_features, n_features))

        for i in range(n_features):
            for j in range(n_features):
                if self.correlation_method == "pearson":
                    corr, _ = pearsonr(X[:, i], X[:, j])
                else:
                    corr, _ = spearmanr(X[:, i], X[:, j])
                corr_matrix[i, j] = corr

        return corr_matrix

    def detect(self, X: np.ndarray,
               threshold: float = 0.3) -> Dict[str, Any]:
        """
        检测相关性异常

        Args:
            X: 当前窗口数据
            threshold: 相关性变化阈值

        Returns:
            Dict: 异常信息
        """
        current_correlation = self._compute_correlation(X)

        # 计算相关性变化
        correlation_change = np.abs(current_correlation - self.baseline_correlation)

        # 找出显著变化的特征对
        anomalous_pairs = []
        for i in range(len(self.feature_names)):
            for j in range(i + 1, len(self.feature_names)):
                if correlation_change[i, j] > threshold:
                    anomalous_pairs.append({
                        "feature_1": self.feature_names[i],
                        "feature_2": self.feature_names[j],
                        "baseline_corr": self.baseline_correlation[i, j],
                        "current_corr": current_correlation[i, j],
                        "change": correlation_change[i, j]
                    })

        return {
            "is_anomaly": len(anomalous_pairs) > 0,
            "anomalous_pairs": anomalous_pairs,
            "max_change": float(correlation_change.max()),
            "correlation_matrix": current_correlation.tolist()
        }


class GraphAnomalyDetector:
    """基于图的异常检测"""

    def __init__(self, alpha: float = 0.01):
        """
        初始化检测器

        Args:
            alpha: Graphical Lasso 正则化参数
        """
        self.alpha = alpha
        self.baseline_graph = None
        self.glasso = None
        self.feature_names: List[str] = []

    def fit(self, X: np.ndarray, feature_names: List[str]):
        """建立基线图"""
        self.feature_names = feature_names

        # 使用 Graphical Lasso 学习稀疏精度矩阵
        self.glasso = GraphicalLasso(alpha=self.alpha)
        self.glasso.fit(X)

        # 构建基线图
        self.baseline_graph = self._precision_to_graph(self.glasso.precision_)

    def _precision_to_graph(self, precision: np.ndarray,
                            threshold: float = 0.1) -> nx.Graph:
        """从精度矩阵构建图"""
        G = nx.Graph()

        # 添加节点
        for name in self.feature_names:
            G.add_node(name)

        # 添加边 (基于偏相关)
        n = len(self.feature_names)
        for i in range(n):
            for j in range(i + 1, n):
                # 偏相关
                partial_corr = -precision[i, j] / np.sqrt(precision[i, i] * precision[j, j])
                if abs(partial_corr) > threshold:
                    G.add_edge(
                        self.feature_names[i],
                        self.feature_names[j],
                        weight=abs(partial_corr)
                    )

        return G

    def detect(self, X: np.ndarray) -> Dict[str, Any]:
        """检测图结构异常"""
        # 学习当前图
        current_glasso = GraphicalLasso(alpha=self.alpha)
        current_glasso.fit(X)
        current_graph = self._precision_to_graph(current_glasso.precision_)

        # 比较图结构
        baseline_edges = set(self.baseline_graph.edges())
        current_edges = set(current_graph.edges())

        new_edges = current_edges - baseline_edges
        removed_edges = baseline_edges - current_edges

        # 计算图相似度
        jaccard = len(baseline_edges & current_edges) / len(baseline_edges | current_edges)

        return {
            "is_anomaly": len(new_edges) > 0 or len(removed_edges) > 0,
            "new_edges": list(new_edges),
            "removed_edges": list(removed_edges),
            "jaccard_similarity": jaccard,
            "baseline_edge_count": len(baseline_edges),
            "current_edge_count": len(current_edges)
        }

4. 实时异常检测系统

4.1 流式处理架构

"""
实时异常检测系统
"""

import asyncio
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from collections import deque
import time


@dataclass
class AnomalyAlert:
    """异常告警"""
    alert_id: str
    metric_name: str
    resource: str
    timestamp: float
    value: float
    anomaly_score: float
    anomaly_type: str
    details: Dict[str, Any]


class StreamingAnomalyDetector:
    """流式异常检测器"""

    def __init__(self, detectors: Dict[str, Any],
                 window_size: int = 100,
                 alert_callback: Optional[Callable] = None):
        """
        初始化流式检测器

        Args:
            detectors: 检测器字典 {指标名: 检测器}
            window_size: 滑动窗口大小
            alert_callback: 告警回调函数
        """
        self.detectors = detectors
        self.window_size = window_size
        self.alert_callback = alert_callback

        # 数据缓冲
        self.buffers: Dict[str, deque] = {}
        for metric in detectors:
            self.buffers[metric] = deque(maxlen=window_size)

        # 告警抑制
        self.alert_cooldown: Dict[str, float] = {}
        self.cooldown_seconds = 60

    async def process(self, metric_name: str, resource: str,
                      timestamp: float, value: float) -> Optional[AnomalyAlert]:
        """
        处理数据点

        Args:
            metric_name: 指标名称
            resource: 资源标识
            timestamp: 时间戳
            value: 指标值

        Returns:
            Optional[AnomalyAlert]: 告警 (如果检测到异常)
        """
        key = f"{metric_name}:{resource}"

        # 添加到缓冲
        if metric_name not in self.buffers:
            self.buffers[metric_name] = deque(maxlen=self.window_size)
        self.buffers[metric_name].append((timestamp, value))

        # 获取检测器
        detector = self.detectors.get(metric_name)
        if detector is None:
            return None

        # 检测异常
        result = detector.detect(value)

        if result.is_anomaly:
            # 检查告警抑制
            if key in self.alert_cooldown:
                if time.time() - self.alert_cooldown[key] < self.cooldown_seconds:
                    return None

            # 生成告警
            alert = AnomalyAlert(
                alert_id=f"alert-{int(timestamp)}",
                metric_name=metric_name,
                resource=resource,
                timestamp=timestamp,
                value=value,
                anomaly_score=result.anomaly_score,
                anomaly_type=result.anomaly_type,
                details=result.details
            )

            # 更新抑制时间
            self.alert_cooldown[key] = time.time()

            # 回调
            if self.alert_callback:
                await self.alert_callback(alert)

            return alert

        return None

    async def process_batch(self, data_points: List[Dict[str, Any]]) -> List[AnomalyAlert]:
        """批量处理数据点"""
        alerts = []
        for dp in data_points:
            alert = await self.process(
                dp["metric_name"],
                dp["resource"],
                dp["timestamp"],
                dp["value"]
            )
            if alert:
                alerts.append(alert)
        return alerts


class AnomalyDetectionService:
    """异常检测服务"""

    def __init__(self):
        self.detectors: Dict[str, StreamingAnomalyDetector] = {}
        self.alert_handlers: List[Callable] = []

    def register_detector(self, name: str, detector: StreamingAnomalyDetector):
        """注册检测器"""
        self.detectors[name] = detector

    def add_alert_handler(self, handler: Callable):
        """添加告警处理器"""
        self.alert_handlers.append(handler)

    async def start(self, kafka_consumer):
        """启动服务"""
        async for message in kafka_consumer:
            data = message.value

            # 路由到对应检测器
            detector_name = data.get("detector", "default")
            detector = self.detectors.get(detector_name)

            if detector:
                alert = await detector.process(
                    data["metric_name"],
                    data["resource"],
                    data["timestamp"],
                    data["value"]
                )

                if alert:
                    # 触发告警处理
                    for handler in self.alert_handlers:
                        await handler(alert)

总结

本章详细介绍了 AIOps 中的异常检测算法:

  1. 时序异常检测:统计方法 (Z-Score, MAD, EWMA)、机器学习方法 (孤立森林, LOF, LSTM 自编码器)
  2. 日志异常检测:日志解析 (Drain)、DeepLog、LogBERT
  3. 多维异常检测:多变量检测、关联异常、图异常检测
  4. 实时检测系统:流式处理架构设计

关键要点:

  • 选择合适的检测方法取决于数据特性和场景需求
  • 结合多种检测方法可以提高检测准确率
  • 实时检测需要考虑性能和延迟
  • 告警抑制和聚合是减少告警噪声的关键

下一章将探讨根因分析算法,包括拓扑分析、时序关联和因果推断方法。

Prev
01-AIOps概述与架构
Next
03-根因分析与告警聚合