02-异常检测算法
概述
异常检测是 AIOps 的核心能力之一,用于自动识别系统中的异常行为。本章深入探讨时序异常检测、日志异常检测和多维异常检测的算法与实现。
1. 时序异常检测
1.1 异常类型分类
┌─────────────────────────────────────────────────────────────────┐
│ 时序异常类型 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 点异常 (Point Anomaly) │
│ ┌──────────────────────────────────────────┐ │
│ │ * │ │
│ │ ╱ ╲ ╱╲ ╱╲ │ 单个数据点 │
│ │ ╱ ╲ ╱ ╲ ╱ ╲ │ 明显偏离 │
│ │ ╱ ╲╱ ╲╱ ╲ │ │
│ └──────────────────────────────────────────┘ │
│ │
│ 上下文异常 (Contextual Anomaly) │
│ ┌──────────────────────────────────────────┐ │
│ │ ╱╲ │ │
│ │ ╱╲ ╱ ╲ ╱╲ 正常时段的 │ 正常值在 │
│ │ ╱ ╲╱ ╲╱ ╲ 异常高值 │ 异常上下文 │
│ │ ╱ ╲ │ │
│ └──────────────────────────────────────────┘ │
│ │
│ 集体异常 (Collective Anomaly) │
│ ┌──────────────────────────────────────────┐ │
│ │ ════════ │ │
│ │ ╱╲ ╱ ╲ ╱╲ │ 一段时间的 │
│ │ ╱ ╲ ╱ ╲ ╱ ╲ │ 异常模式 │
│ │ ╱ ╲ ╲ ╲ │ │
│ └──────────────────────────────────────────┘ │
│ │
│ 趋势异常 (Trend Anomaly) │
│ ┌──────────────────────────────────────────┐ │
│ │ ╱ │ │
│ │ ══════════════ ╱ │ 趋势突变 │
│ │ ╱ │ │
│ │ ╱ │ │
│ └──────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
1.2 统计方法
"""
基于统计的时序异常检测
"""
import numpy as np
import pandas as pd
from typing import Tuple, List, Optional, Dict, Any
from dataclasses import dataclass
from scipy import stats
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller
@dataclass
class AnomalyResult:
"""异常检测结果"""
timestamp: float
value: float
is_anomaly: bool
anomaly_score: float
anomaly_type: str
details: Dict[str, Any]
class StatisticalAnomalyDetector:
"""统计异常检测器"""
def __init__(self, window_size: int = 100,
threshold_sigma: float = 3.0):
"""
初始化检测器
Args:
window_size: 滑动窗口大小
threshold_sigma: 标准差阈值
"""
self.window_size = window_size
self.threshold_sigma = threshold_sigma
self.history: List[float] = []
def z_score_detect(self, value: float) -> AnomalyResult:
"""
Z-Score 异常检测
Args:
value: 当前值
Returns:
AnomalyResult: 检测结果
"""
import time
self.history.append(value)
if len(self.history) < self.window_size:
return AnomalyResult(
timestamp=time.time(),
value=value,
is_anomaly=False,
anomaly_score=0.0,
anomaly_type="none",
details={"reason": "insufficient_data"}
)
# 使用滑动窗口
window = self.history[-self.window_size:]
mean = np.mean(window[:-1]) # 不包含当前值
std = np.std(window[:-1])
if std == 0:
z_score = 0
else:
z_score = (value - mean) / std
is_anomaly = abs(z_score) > self.threshold_sigma
return AnomalyResult(
timestamp=time.time(),
value=value,
is_anomaly=is_anomaly,
anomaly_score=abs(z_score) / self.threshold_sigma,
anomaly_type="point" if is_anomaly else "none",
details={
"z_score": z_score,
"mean": mean,
"std": std,
"threshold": self.threshold_sigma
}
)
def mad_detect(self, value: float) -> AnomalyResult:
"""
MAD (Median Absolute Deviation) 异常检测
对离群值更鲁棒
Args:
value: 当前值
Returns:
AnomalyResult: 检测结果
"""
import time
self.history.append(value)
if len(self.history) < self.window_size:
return AnomalyResult(
timestamp=time.time(),
value=value,
is_anomaly=False,
anomaly_score=0.0,
anomaly_type="none",
details={"reason": "insufficient_data"}
)
window = self.history[-self.window_size:]
median = np.median(window[:-1])
mad = np.median(np.abs(window[:-1] - median))
# 修正系数 (正态分布)
mad_corrected = mad * 1.4826
if mad_corrected == 0:
modified_z = 0
else:
modified_z = (value - median) / mad_corrected
is_anomaly = abs(modified_z) > self.threshold_sigma
return AnomalyResult(
timestamp=time.time(),
value=value,
is_anomaly=is_anomaly,
anomaly_score=abs(modified_z) / self.threshold_sigma,
anomaly_type="point" if is_anomaly else "none",
details={
"modified_z_score": modified_z,
"median": median,
"mad": mad_corrected,
"threshold": self.threshold_sigma
}
)
def ewma_detect(self, value: float, alpha: float = 0.3) -> AnomalyResult:
"""
EWMA (Exponentially Weighted Moving Average) 异常检测
Args:
value: 当前值
alpha: 平滑系数
Returns:
AnomalyResult: 检测结果
"""
import time
self.history.append(value)
if len(self.history) < 2:
return AnomalyResult(
timestamp=time.time(),
value=value,
is_anomaly=False,
anomaly_score=0.0,
anomaly_type="none",
details={"reason": "insufficient_data"}
)
# 计算 EWMA
ewma = [self.history[0]]
for v in self.history[1:-1]:
ewma.append(alpha * v + (1 - alpha) * ewma[-1])
predicted = alpha * self.history[-2] + (1 - alpha) * ewma[-1]
# 计算残差标准差
residuals = np.array(self.history[1:-1]) - np.array(ewma)
residual_std = np.std(residuals) if len(residuals) > 0 else 1
# 当前残差
current_residual = value - predicted
if residual_std == 0:
z_score = 0
else:
z_score = current_residual / residual_std
is_anomaly = abs(z_score) > self.threshold_sigma
return AnomalyResult(
timestamp=time.time(),
value=value,
is_anomaly=is_anomaly,
anomaly_score=abs(z_score) / self.threshold_sigma,
anomaly_type="point" if is_anomaly else "none",
details={
"predicted": predicted,
"residual": current_residual,
"z_score": z_score
}
)
class SeasonalAnomalyDetector:
"""季节性异常检测器"""
def __init__(self, period: int = 24,
threshold_sigma: float = 3.0):
"""
初始化检测器
Args:
period: 季节周期 (如 24 表示每日周期)
threshold_sigma: 标准差阈值
"""
self.period = period
self.threshold_sigma = threshold_sigma
self.seasonal_profile: Optional[np.ndarray] = None
self.residual_std: float = 1.0
def fit(self, data: np.ndarray):
"""
拟合季节性模型
Args:
data: 历史数据
"""
if len(data) < 2 * self.period:
raise ValueError("数据长度不足以拟合季节性模型")
# 季节分解
series = pd.Series(data)
decomposition = seasonal_decompose(
series,
model='additive',
period=self.period,
extrapolate_trend='freq'
)
# 提取季节性 profile
self.seasonal_profile = decomposition.seasonal[:self.period].values
# 计算残差标准差
residuals = decomposition.resid.dropna()
self.residual_std = residuals.std()
def detect(self, value: float, position: int) -> AnomalyResult:
"""
检测异常
Args:
value: 当前值
position: 在周期中的位置
Returns:
AnomalyResult: 检测结果
"""
import time
if self.seasonal_profile is None:
raise ValueError("请先调用 fit() 方法")
# 获取季节性期望值
expected = self.seasonal_profile[position % self.period]
# 计算偏差
deviation = value - expected
z_score = deviation / self.residual_std if self.residual_std > 0 else 0
is_anomaly = abs(z_score) > self.threshold_sigma
return AnomalyResult(
timestamp=time.time(),
value=value,
is_anomaly=is_anomaly,
anomaly_score=abs(z_score) / self.threshold_sigma,
anomaly_type="contextual" if is_anomaly else "none",
details={
"expected": expected,
"deviation": deviation,
"z_score": z_score,
"position_in_cycle": position % self.period
}
)
class ChangePointDetector:
"""变点检测器"""
def __init__(self, min_segment_length: int = 10):
"""
初始化检测器
Args:
min_segment_length: 最小分段长度
"""
self.min_segment_length = min_segment_length
def detect_cusum(self, data: np.ndarray,
threshold: float = 5.0) -> List[int]:
"""
CUSUM (Cumulative Sum) 变点检测
Args:
data: 时序数据
threshold: 检测阈值
Returns:
List: 变点位置列表
"""
mean = np.mean(data)
std = np.std(data)
if std == 0:
return []
normalized = (data - mean) / std
# 正向和负向 CUSUM
pos_cusum = np.zeros(len(data))
neg_cusum = np.zeros(len(data))
change_points = []
for i in range(1, len(data)):
pos_cusum[i] = max(0, pos_cusum[i-1] + normalized[i] - 0.5)
neg_cusum[i] = max(0, neg_cusum[i-1] - normalized[i] - 0.5)
if pos_cusum[i] > threshold or neg_cusum[i] > threshold:
change_points.append(i)
pos_cusum[i] = 0
neg_cusum[i] = 0
return change_points
def detect_pelt(self, data: np.ndarray,
penalty: float = None) -> List[int]:
"""
PELT (Pruned Exact Linear Time) 变点检测
Args:
data: 时序数据
penalty: 惩罚参数
Returns:
List: 变点位置列表
"""
import ruptures as rpt
if penalty is None:
penalty = np.log(len(data)) * np.var(data)
# 使用 PELT 算法
model = rpt.Pelt(model="rbf", min_size=self.min_segment_length)
model.fit(data)
change_points = model.predict(pen=penalty)
# 移除最后一个点 (序列结尾)
return change_points[:-1]
def detect_bocpd(self, data: np.ndarray,
hazard: float = 1/200) -> List[Tuple[int, float]]:
"""
BOCPD (Bayesian Online Changepoint Detection)
Args:
data: 时序数据
hazard: 危险率
Returns:
List: (位置, 概率) 列表
"""
n = len(data)
# 运行长度概率
R = np.zeros((n + 1, n + 1))
R[0, 0] = 1
change_points = []
# 观测似然参数
mu0 = 0
kappa0 = 1
alpha0 = 1
beta0 = 1
muT = mu0
kappaT = kappa0
alphaT = alpha0
betaT = beta0
for t in range(n):
x = data[t]
# 预测概率
predprobs = self._student_t_pdf(
x, muT, betaT * (kappaT + 1) / (alphaT * kappaT), 2 * alphaT
)
# 增长概率
R[1:t+2, t+1] = R[:t+1, t] * predprobs * (1 - hazard)
# 变点概率
R[0, t+1] = np.sum(R[:t+1, t] * predprobs * hazard)
# 归一化
R[:t+2, t+1] = R[:t+2, t+1] / R[:t+2, t+1].sum()
# 检测变点
if R[0, t+1] > 0.5:
change_points.append((t, R[0, t+1]))
# 更新参数
muT = (kappaT * muT + x) / (kappaT + 1)
kappaT = kappaT + 1
alphaT = alphaT + 0.5
betaT = betaT + kappaT * (x - muT) ** 2 / (2 * (kappaT + 1))
return change_points
def _student_t_pdf(self, x: float, mu: float,
sigma: float, nu: float) -> float:
"""Student-t 分布概率密度"""
return stats.t.pdf(x, nu, loc=mu, scale=np.sqrt(sigma))
1.3 机器学习方法
"""
基于机器学习的时序异常检测
"""
import numpy as np
from typing import List, Tuple, Optional
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
import torch
import torch.nn as nn
class IsolationForestDetector:
"""孤立森林异常检测"""
def __init__(self, n_estimators: int = 100,
contamination: float = 0.1,
window_size: int = 10):
"""
初始化检测器
Args:
n_estimators: 树的数量
contamination: 异常比例
window_size: 特征窗口大小
"""
self.n_estimators = n_estimators
self.contamination = contamination
self.window_size = window_size
self.model = None
self.history: List[float] = []
def _extract_features(self, data: List[float]) -> np.ndarray:
"""提取时序特征"""
features = []
for i in range(self.window_size, len(data)):
window = data[i-self.window_size:i]
feat = [
np.mean(window), # 均值
np.std(window), # 标准差
np.min(window), # 最小值
np.max(window), # 最大值
np.percentile(window, 25), # 25分位数
np.percentile(window, 75), # 75分位数
data[i] - window[-1], # 差分
(data[i] - np.mean(window)) / (np.std(window) + 1e-10) # z-score
]
features.append(feat)
return np.array(features)
def fit(self, data: List[float]):
"""拟合模型"""
self.history = list(data)
X = self._extract_features(self.history)
self.model = IsolationForest(
n_estimators=self.n_estimators,
contamination=self.contamination,
random_state=42
)
self.model.fit(X)
def detect(self, value: float) -> Tuple[bool, float]:
"""
检测异常
Args:
value: 当前值
Returns:
(是否异常, 异常分数)
"""
self.history.append(value)
if len(self.history) < self.window_size + 1:
return False, 0.0
X = self._extract_features(self.history[-self.window_size-1:])
prediction = self.model.predict(X[-1:])
score = self.model.score_samples(X[-1:])
is_anomaly = prediction[0] == -1
return is_anomaly, float(-score[0])
class LOFDetector:
"""Local Outlier Factor 异常检测"""
def __init__(self, n_neighbors: int = 20,
window_size: int = 10):
"""
初始化检测器
Args:
n_neighbors: 近邻数量
window_size: 特征窗口大小
"""
self.n_neighbors = n_neighbors
self.window_size = window_size
self.model = None
self.reference_data: Optional[np.ndarray] = None
def fit(self, data: List[float]):
"""拟合模型"""
features = self._extract_features(data)
self.reference_data = features
self.model = LocalOutlierFactor(
n_neighbors=self.n_neighbors,
novelty=True
)
self.model.fit(features)
def _extract_features(self, data: List[float]) -> np.ndarray:
"""提取时序特征"""
features = []
for i in range(self.window_size, len(data)):
window = data[i-self.window_size:i]
feat = [
np.mean(window),
np.std(window),
data[i],
data[i] - window[-1]
]
features.append(feat)
return np.array(features)
def detect(self, window: List[float]) -> Tuple[bool, float]:
"""检测异常"""
if len(window) < self.window_size + 1:
return False, 0.0
X = self._extract_features(window)
prediction = self.model.predict(X[-1:])
score = self.model.score_samples(X[-1:])
return prediction[0] == -1, float(-score[0])
class LSTMAutoencoder(nn.Module):
"""LSTM 自编码器"""
def __init__(self, input_dim: int, hidden_dim: int,
latent_dim: int, num_layers: int = 2):
super().__init__()
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.latent_dim = latent_dim
self.num_layers = num_layers
# 编码器
self.encoder_lstm = nn.LSTM(
input_dim, hidden_dim, num_layers,
batch_first=True, dropout=0.2
)
self.encoder_fc = nn.Linear(hidden_dim, latent_dim)
# 解码器
self.decoder_fc = nn.Linear(latent_dim, hidden_dim)
self.decoder_lstm = nn.LSTM(
hidden_dim, hidden_dim, num_layers,
batch_first=True, dropout=0.2
)
self.output_fc = nn.Linear(hidden_dim, input_dim)
def encode(self, x):
"""编码"""
lstm_out, _ = self.encoder_lstm(x)
latent = self.encoder_fc(lstm_out[:, -1, :])
return latent
def decode(self, latent, seq_len):
"""解码"""
hidden = self.decoder_fc(latent)
hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1)
lstm_out, _ = self.decoder_lstm(hidden)
output = self.output_fc(lstm_out)
return output
def forward(self, x):
"""前向传播"""
latent = self.encode(x)
reconstructed = self.decode(latent, x.size(1))
return reconstructed
class LSTMAutoencoderDetector:
"""基于 LSTM 自编码器的异常检测"""
def __init__(self, sequence_length: int = 50,
hidden_dim: int = 64,
latent_dim: int = 16,
threshold_percentile: float = 95):
"""
初始化检测器
Args:
sequence_length: 序列长度
hidden_dim: 隐藏层维度
latent_dim: 潜在空间维度
threshold_percentile: 阈值百分位数
"""
self.sequence_length = sequence_length
self.hidden_dim = hidden_dim
self.latent_dim = latent_dim
self.threshold_percentile = threshold_percentile
self.model = LSTMAutoencoder(1, hidden_dim, latent_dim)
self.threshold = None
self.scaler_mean = 0
self.scaler_std = 1
def fit(self, data: np.ndarray, epochs: int = 100,
batch_size: int = 32, lr: float = 0.001):
"""训练模型"""
# 数据标准化
self.scaler_mean = np.mean(data)
self.scaler_std = np.std(data)
normalized = (data - self.scaler_mean) / (self.scaler_std + 1e-10)
# 创建序列
sequences = self._create_sequences(normalized)
X = torch.FloatTensor(sequences).unsqueeze(-1)
# 训练
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
criterion = nn.MSELoss()
self.model.train()
for epoch in range(epochs):
total_loss = 0
for i in range(0, len(X), batch_size):
batch = X[i:i+batch_size]
optimizer.zero_grad()
reconstructed = self.model(batch)
loss = criterion(reconstructed, batch)
loss.backward()
optimizer.step()
total_loss += loss.item()
# 计算阈值
self.model.eval()
with torch.no_grad():
reconstructed = self.model(X)
reconstruction_errors = torch.mean((X - reconstructed) ** 2, dim=(1, 2))
self.threshold = np.percentile(
reconstruction_errors.numpy(),
self.threshold_percentile
)
def _create_sequences(self, data: np.ndarray) -> np.ndarray:
"""创建序列"""
sequences = []
for i in range(len(data) - self.sequence_length + 1):
sequences.append(data[i:i+self.sequence_length])
return np.array(sequences)
def detect(self, sequence: np.ndarray) -> Tuple[bool, float]:
"""检测异常"""
# 标准化
normalized = (sequence - self.scaler_mean) / (self.scaler_std + 1e-10)
X = torch.FloatTensor(normalized).unsqueeze(0).unsqueeze(-1)
self.model.eval()
with torch.no_grad():
reconstructed = self.model(X)
error = torch.mean((X - reconstructed) ** 2).item()
is_anomaly = error > self.threshold
anomaly_score = error / self.threshold
return is_anomaly, anomaly_score
def detect_in_series(self, data: np.ndarray) -> List[Tuple[int, bool, float]]:
"""在整个序列中检测异常"""
results = []
for i in range(len(data) - self.sequence_length + 1):
sequence = data[i:i+self.sequence_length]
is_anomaly, score = self.detect(sequence)
results.append((i + self.sequence_length - 1, is_anomaly, score))
return results
2. 日志异常检测
2.1 日志解析
"""
日志解析与模板提取
"""
import re
from typing import List, Dict, Tuple, Optional
from collections import defaultdict
import hashlib
class DrainLogParser:
"""Drain 日志解析器"""
def __init__(self, depth: int = 4,
sim_th: float = 0.4,
max_children: int = 100):
"""
初始化解析器
Args:
depth: 解析树深度
sim_th: 相似度阈值
max_children: 最大子节点数
"""
self.depth = depth
self.sim_th = sim_th
self.max_children = max_children
# 解析树根节点
self.root_node = Node()
self.log_clusters: Dict[str, LogCluster] = {}
# 正则预处理
self.regex_patterns = [
(r'\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}', '<TIMESTAMP>'),
(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', '<IP>'),
(r':\d+', ':<PORT>'),
(r'/[a-zA-Z0-9_\-/]+', '<PATH>'),
(r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b', '<UUID>'),
(r'\b\d+\b', '<NUM>'),
]
def preprocess(self, log_line: str) -> List[str]:
"""预处理日志行"""
# 应用正则替换
for pattern, replacement in self.regex_patterns:
log_line = re.sub(pattern, replacement, log_line)
# 分词
tokens = log_line.strip().split()
return tokens
def parse(self, log_line: str) -> Tuple[str, str]:
"""
解析日志行
Args:
log_line: 原始日志行
Returns:
(模板 ID, 模板)
"""
tokens = self.preprocess(log_line)
if not tokens:
return "empty", ""
# 在解析树中查找匹配的簇
cluster = self._tree_search(tokens)
if cluster is None:
# 创建新簇
cluster = LogCluster(tokens)
self._add_cluster(cluster, tokens)
# 更新簇
cluster.update(tokens)
return cluster.cluster_id, cluster.get_template()
def _tree_search(self, tokens: List[str]) -> Optional['LogCluster']:
"""在解析树中搜索匹配的簇"""
# 第一层: 日志长度
token_count = len(tokens)
if token_count not in self.root_node.children:
return None
parent_node = self.root_node.children[token_count]
# 中间层: 前几个 token
current_depth = 1
for token in tokens:
if current_depth >= self.depth:
break
if token in parent_node.children:
parent_node = parent_node.children[token]
elif '<*>' in parent_node.children:
parent_node = parent_node.children['<*>']
else:
return None
current_depth += 1
# 叶子节点: 查找最匹配的簇
return self._fast_match(parent_node.clusters, tokens)
def _fast_match(self, clusters: List['LogCluster'],
tokens: List[str]) -> Optional['LogCluster']:
"""快速匹配"""
best_cluster = None
best_sim = -1
for cluster in clusters:
sim = cluster.similarity(tokens)
if sim > self.sim_th and sim > best_sim:
best_sim = sim
best_cluster = cluster
return best_cluster
def _add_cluster(self, cluster: 'LogCluster', tokens: List[str]):
"""添加簇到解析树"""
token_count = len(tokens)
# 第一层
if token_count not in self.root_node.children:
self.root_node.children[token_count] = Node()
parent_node = self.root_node.children[token_count]
# 中间层
current_depth = 1
for token in tokens:
if current_depth >= self.depth:
break
# 检查是否为变量
if self._is_variable(token):
token = '<*>'
if token not in parent_node.children:
if len(parent_node.children) >= self.max_children:
token = '<*>'
if token not in parent_node.children:
parent_node.children[token] = Node()
else:
parent_node.children[token] = Node()
parent_node = parent_node.children[token]
current_depth += 1
# 添加到叶子节点
parent_node.clusters.append(cluster)
self.log_clusters[cluster.cluster_id] = cluster
def _is_variable(self, token: str) -> bool:
"""判断 token 是否为变量"""
variable_patterns = ['<', '>', '*', '=']
for p in variable_patterns:
if p in token:
return True
# 包含数字的 token 可能是变量
if re.search(r'\d', token):
return True
return False
def get_all_templates(self) -> Dict[str, str]:
"""获取所有模板"""
return {
cluster_id: cluster.get_template()
for cluster_id, cluster in self.log_clusters.items()
}
class Node:
"""解析树节点"""
def __init__(self):
self.children: Dict[str, 'Node'] = {}
self.clusters: List['LogCluster'] = []
class LogCluster:
"""日志簇"""
def __init__(self, tokens: List[str]):
self.cluster_id = self._generate_id(tokens)
self.template_tokens = list(tokens)
self.count = 1
def _generate_id(self, tokens: List[str]) -> str:
"""生成簇 ID"""
template = " ".join(tokens)
return hashlib.md5(template.encode()).hexdigest()[:8]
def similarity(self, tokens: List[str]) -> float:
"""计算相似度"""
if len(tokens) != len(self.template_tokens):
return 0
match_count = sum(
1 for t1, t2 in zip(tokens, self.template_tokens)
if t1 == t2 or t2 == '<*>'
)
return match_count / len(tokens)
def update(self, tokens: List[str]):
"""更新模板"""
self.count += 1
# 合并模板: 不匹配的位置变为通配符
for i, (t1, t2) in enumerate(zip(tokens, self.template_tokens)):
if t1 != t2:
self.template_tokens[i] = '<*>'
def get_template(self) -> str:
"""获取模板"""
return " ".join(self.template_tokens)
class LogSequenceBuilder:
"""日志序列构建器"""
def __init__(self, parser: DrainLogParser,
window_size: int = 100):
"""
初始化构建器
Args:
parser: 日志解析器
window_size: 序列窗口大小
"""
self.parser = parser
self.window_size = window_size
self.template_vocab: Dict[str, int] = {}
self.next_id = 0
def build_sequences(self, logs: List[str]) -> List[List[int]]:
"""构建日志序列"""
# 解析所有日志
template_ids = []
for log in logs:
template_id, _ = self.parser.parse(log)
if template_id not in self.template_vocab:
self.template_vocab[template_id] = self.next_id
self.next_id += 1
template_ids.append(self.template_vocab[template_id])
# 构建序列
sequences = []
for i in range(len(template_ids) - self.window_size + 1):
sequences.append(template_ids[i:i+self.window_size])
return sequences
2.2 日志异常检测模型
"""
日志异常检测模型
"""
import numpy as np
import torch
import torch.nn as nn
from typing import List, Dict, Tuple, Optional
from collections import Counter
class LogAnomaly:
"""基于计数的日志异常检测"""
def __init__(self, threshold_percentile: float = 95):
"""
初始化检测器
Args:
threshold_percentile: 阈值百分位数
"""
self.threshold_percentile = threshold_percentile
self.template_counts: Counter = Counter()
self.total_count = 0
self.thresholds: Dict[str, float] = {}
def fit(self, template_sequences: List[List[str]]):
"""训练模型"""
# 统计模板出现次数
for sequence in template_sequences:
self.template_counts.update(sequence)
self.total_count += len(sequence)
# 计算每个模板的正常频率范围
sequence_template_counts = []
for sequence in template_sequences:
counts = Counter(sequence)
sequence_template_counts.append(counts)
# 计算每个模板的阈值
all_templates = set(self.template_counts.keys())
for template in all_templates:
counts = [sc.get(template, 0) for sc in sequence_template_counts]
self.thresholds[template] = np.percentile(counts, self.threshold_percentile)
def detect(self, sequence: List[str]) -> Tuple[bool, Dict[str, float]]:
"""
检测异常
Args:
sequence: 模板序列
Returns:
(是否异常, 异常详情)
"""
counts = Counter(sequence)
anomalies = {}
for template, count in counts.items():
if template in self.thresholds:
if count > self.thresholds[template]:
anomalies[template] = {
"count": count,
"threshold": self.thresholds[template],
"type": "excess"
}
else:
# 新模板
anomalies[template] = {
"count": count,
"threshold": 0,
"type": "new_template"
}
is_anomaly = len(anomalies) > 0
return is_anomaly, anomalies
class DeepLogModel(nn.Module):
"""DeepLog LSTM 模型"""
def __init__(self, vocab_size: int, embedding_dim: int = 64,
hidden_dim: int = 128, num_layers: int = 2):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(
embedding_dim, hidden_dim, num_layers,
batch_first=True, dropout=0.2
)
self.fc = nn.Linear(hidden_dim, vocab_size)
def forward(self, x):
embedded = self.embedding(x)
lstm_out, _ = self.lstm(embedded)
output = self.fc(lstm_out[:, -1, :])
return output
class DeepLogDetector:
"""DeepLog 日志异常检测"""
def __init__(self, vocab_size: int,
window_size: int = 10,
top_k: int = 9):
"""
初始化检测器
Args:
vocab_size: 词表大小
window_size: 窗口大小
top_k: Top-K 候选
"""
self.vocab_size = vocab_size
self.window_size = window_size
self.top_k = top_k
self.model = DeepLogModel(vocab_size)
def fit(self, sequences: List[List[int]],
epochs: int = 50, batch_size: int = 64, lr: float = 0.001):
"""训练模型"""
# 创建训练数据
X, y = self._create_training_data(sequences)
X = torch.LongTensor(X)
y = torch.LongTensor(y)
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
self.model.train()
for epoch in range(epochs):
total_loss = 0
for i in range(0, len(X), batch_size):
batch_X = X[i:i+batch_size]
batch_y = y[i:i+batch_size]
optimizer.zero_grad()
outputs = self.model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}, Loss: {total_loss/len(X):.4f}")
def _create_training_data(self, sequences: List[List[int]]) -> Tuple[np.ndarray, np.ndarray]:
"""创建训练数据"""
X, y = [], []
for seq in sequences:
for i in range(len(seq) - self.window_size):
X.append(seq[i:i+self.window_size])
y.append(seq[i+self.window_size])
return np.array(X), np.array(y)
def detect(self, sequence: List[int]) -> List[Tuple[int, bool, float]]:
"""
检测异常
Args:
sequence: 日志 ID 序列
Returns:
List[(位置, 是否异常, 异常分数)]
"""
results = []
self.model.eval()
with torch.no_grad():
for i in range(len(sequence) - self.window_size):
window = sequence[i:i+self.window_size]
actual = sequence[i+self.window_size]
X = torch.LongTensor([window])
output = self.model(X)
probs = torch.softmax(output, dim=1)
# 获取 Top-K 预测
top_k_probs, top_k_indices = torch.topk(probs, self.top_k)
is_anomaly = actual not in top_k_indices[0].tolist()
anomaly_score = 1 - probs[0, actual].item()
results.append((i + self.window_size, is_anomaly, anomaly_score))
return results
class LogBERTDetector:
"""基于 BERT 的日志异常检测"""
def __init__(self, model_name: str = "bert-base-uncased",
max_length: int = 128):
"""
初始化检测器
Args:
model_name: BERT 模型名称
max_length: 最大序列长度
"""
from transformers import BertTokenizer, BertForSequenceClassification
self.tokenizer = BertTokenizer.from_pretrained(model_name)
self.model = BertForSequenceClassification.from_pretrained(
model_name,
num_labels=2 # normal, anomaly
)
self.max_length = max_length
def fit(self, logs: List[str], labels: List[int],
epochs: int = 3, batch_size: int = 16, lr: float = 2e-5):
"""微调模型"""
from torch.utils.data import DataLoader, TensorDataset
# 编码
encodings = self.tokenizer(
logs,
truncation=True,
padding=True,
max_length=self.max_length,
return_tensors="pt"
)
dataset = TensorDataset(
encodings['input_ids'],
encodings['attention_mask'],
torch.LongTensor(labels)
)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
optimizer = torch.optim.AdamW(self.model.parameters(), lr=lr)
self.model.train()
for epoch in range(epochs):
total_loss = 0
for batch in dataloader:
input_ids, attention_mask, batch_labels = batch
optimizer.zero_grad()
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=batch_labels
)
loss = outputs.loss
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}")
def detect(self, logs: List[str]) -> List[Tuple[bool, float]]:
"""检测异常"""
self.model.eval()
results = []
with torch.no_grad():
for log in logs:
encoding = self.tokenizer(
log,
truncation=True,
padding=True,
max_length=self.max_length,
return_tensors="pt"
)
outputs = self.model(**encoding)
probs = torch.softmax(outputs.logits, dim=1)
is_anomaly = probs[0, 1] > 0.5
anomaly_score = probs[0, 1].item()
results.append((bool(is_anomaly), anomaly_score))
return results
3. 多维异常检测
3.1 多维数据建模
"""
多维异常检测
"""
import numpy as np
from typing import List, Dict, Tuple, Optional
from sklearn.covariance import EllipticEnvelope
from sklearn.decomposition import PCA
import torch
import torch.nn as nn
class MultivariateAnomalyDetector:
"""多维异常检测器"""
def __init__(self, contamination: float = 0.1):
"""
初始化检测器
Args:
contamination: 异常比例
"""
self.contamination = contamination
self.model = None
self.feature_names: List[str] = []
def fit(self, X: np.ndarray, feature_names: List[str]):
"""拟合模型"""
self.feature_names = feature_names
# 使用椭圆包络
self.model = EllipticEnvelope(
contamination=self.contamination,
random_state=42
)
self.model.fit(X)
def detect(self, x: np.ndarray) -> Tuple[bool, float, Dict[str, float]]:
"""
检测异常
Args:
x: 多维数据点
Returns:
(是否异常, 异常分数, 特征贡献)
"""
if x.ndim == 1:
x = x.reshape(1, -1)
prediction = self.model.predict(x)
score = self.model.score_samples(x)
is_anomaly = prediction[0] == -1
anomaly_score = -score[0]
# 计算特征贡献
contributions = self._compute_contributions(x[0])
return is_anomaly, anomaly_score, contributions
def _compute_contributions(self, x: np.ndarray) -> Dict[str, float]:
"""计算各特征对异常的贡献"""
mean = self.model.location_
precision = self.model.precision_
# 马氏距离分解
diff = x - mean
contributions = {}
for i, name in enumerate(self.feature_names):
# 单特征贡献
contrib = abs(diff[i]) * np.sqrt(precision[i, i])
contributions[name] = float(contrib)
# 归一化
total = sum(contributions.values())
if total > 0:
contributions = {k: v/total for k, v in contributions.items()}
return contributions
class PCABasedDetector:
"""基于 PCA 的异常检测"""
def __init__(self, n_components: float = 0.95,
threshold_percentile: float = 95):
"""
初始化检测器
Args:
n_components: 保留的方差比例
threshold_percentile: 异常阈值百分位
"""
self.n_components = n_components
self.threshold_percentile = threshold_percentile
self.pca = None
self.threshold = None
def fit(self, X: np.ndarray):
"""拟合模型"""
self.pca = PCA(n_components=self.n_components)
self.pca.fit(X)
# 计算重构误差
X_transformed = self.pca.transform(X)
X_reconstructed = self.pca.inverse_transform(X_transformed)
reconstruction_errors = np.sum((X - X_reconstructed) ** 2, axis=1)
self.threshold = np.percentile(reconstruction_errors, self.threshold_percentile)
def detect(self, x: np.ndarray) -> Tuple[bool, float, np.ndarray]:
"""
检测异常
Args:
x: 数据点
Returns:
(是否异常, 重构误差, 重构数据)
"""
if x.ndim == 1:
x = x.reshape(1, -1)
x_transformed = self.pca.transform(x)
x_reconstructed = self.pca.inverse_transform(x_transformed)
error = np.sum((x - x_reconstructed) ** 2)
is_anomaly = error > self.threshold
return is_anomaly, error, x_reconstructed[0]
class VAEAnomalyDetector(nn.Module):
"""VAE 多维异常检测"""
def __init__(self, input_dim: int, hidden_dims: List[int],
latent_dim: int):
super().__init__()
self.input_dim = input_dim
self.latent_dim = latent_dim
# 编码器
encoder_layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
encoder_layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.BatchNorm1d(hidden_dim)
])
prev_dim = hidden_dim
self.encoder = nn.Sequential(*encoder_layers)
self.fc_mu = nn.Linear(prev_dim, latent_dim)
self.fc_var = nn.Linear(prev_dim, latent_dim)
# 解码器
decoder_layers = []
prev_dim = latent_dim
for hidden_dim in reversed(hidden_dims):
decoder_layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.BatchNorm1d(hidden_dim)
])
prev_dim = hidden_dim
decoder_layers.append(nn.Linear(prev_dim, input_dim))
self.decoder = nn.Sequential(*decoder_layers)
def encode(self, x):
h = self.encoder(x)
return self.fc_mu(h), self.fc_var(h)
def reparameterize(self, mu, log_var):
std = torch.exp(0.5 * log_var)
eps = torch.randn_like(std)
return mu + eps * std
def decode(self, z):
return self.decoder(z)
def forward(self, x):
mu, log_var = self.encode(x)
z = self.reparameterize(mu, log_var)
return self.decode(z), mu, log_var
class VAEDetector:
"""VAE 异常检测封装"""
def __init__(self, input_dim: int,
hidden_dims: List[int] = [128, 64],
latent_dim: int = 16,
threshold_percentile: float = 95):
self.model = VAEAnomalyDetector(input_dim, hidden_dims, latent_dim)
self.threshold_percentile = threshold_percentile
self.threshold = None
def fit(self, X: np.ndarray, epochs: int = 100,
batch_size: int = 64, lr: float = 0.001):
"""训练模型"""
X_tensor = torch.FloatTensor(X)
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
self.model.train()
for epoch in range(epochs):
total_loss = 0
for i in range(0, len(X_tensor), batch_size):
batch = X_tensor[i:i+batch_size]
optimizer.zero_grad()
recon, mu, log_var = self.model(batch)
# 重构损失
recon_loss = nn.MSELoss()(recon, batch)
# KL 散度
kl_loss = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
loss = recon_loss + 0.001 * kl_loss
loss.backward()
optimizer.step()
total_loss += loss.item()
# 计算阈值
self.model.eval()
with torch.no_grad():
recon, _, _ = self.model(X_tensor)
errors = torch.sum((X_tensor - recon) ** 2, dim=1).numpy()
self.threshold = np.percentile(errors, self.threshold_percentile)
def detect(self, x: np.ndarray) -> Tuple[bool, float]:
"""检测异常"""
self.model.eval()
x_tensor = torch.FloatTensor(x.reshape(1, -1))
with torch.no_grad():
recon, _, _ = self.model(x_tensor)
error = torch.sum((x_tensor - recon) ** 2).item()
is_anomaly = error > self.threshold
return is_anomaly, error / self.threshold
3.2 关联异常检测
"""
关联异常检测 - 检测指标间的异常关联
"""
import numpy as np
from typing import Dict, List, Tuple, Optional
from scipy.stats import pearsonr, spearmanr
from sklearn.covariance import GraphicalLasso
import networkx as nx
class CorrelationAnomalyDetector:
"""相关性异常检测"""
def __init__(self, window_size: int = 100,
correlation_method: str = "pearson"):
"""
初始化检测器
Args:
window_size: 窗口大小
correlation_method: 相关性方法 (pearson, spearman)
"""
self.window_size = window_size
self.correlation_method = correlation_method
self.baseline_correlation: Optional[np.ndarray] = None
self.feature_names: List[str] = []
def fit(self, X: np.ndarray, feature_names: List[str]):
"""建立基线相关性"""
self.feature_names = feature_names
self.baseline_correlation = self._compute_correlation(X)
def _compute_correlation(self, X: np.ndarray) -> np.ndarray:
"""计算相关性矩阵"""
n_features = X.shape[1]
corr_matrix = np.zeros((n_features, n_features))
for i in range(n_features):
for j in range(n_features):
if self.correlation_method == "pearson":
corr, _ = pearsonr(X[:, i], X[:, j])
else:
corr, _ = spearmanr(X[:, i], X[:, j])
corr_matrix[i, j] = corr
return corr_matrix
def detect(self, X: np.ndarray,
threshold: float = 0.3) -> Dict[str, Any]:
"""
检测相关性异常
Args:
X: 当前窗口数据
threshold: 相关性变化阈值
Returns:
Dict: 异常信息
"""
current_correlation = self._compute_correlation(X)
# 计算相关性变化
correlation_change = np.abs(current_correlation - self.baseline_correlation)
# 找出显著变化的特征对
anomalous_pairs = []
for i in range(len(self.feature_names)):
for j in range(i + 1, len(self.feature_names)):
if correlation_change[i, j] > threshold:
anomalous_pairs.append({
"feature_1": self.feature_names[i],
"feature_2": self.feature_names[j],
"baseline_corr": self.baseline_correlation[i, j],
"current_corr": current_correlation[i, j],
"change": correlation_change[i, j]
})
return {
"is_anomaly": len(anomalous_pairs) > 0,
"anomalous_pairs": anomalous_pairs,
"max_change": float(correlation_change.max()),
"correlation_matrix": current_correlation.tolist()
}
class GraphAnomalyDetector:
"""基于图的异常检测"""
def __init__(self, alpha: float = 0.01):
"""
初始化检测器
Args:
alpha: Graphical Lasso 正则化参数
"""
self.alpha = alpha
self.baseline_graph = None
self.glasso = None
self.feature_names: List[str] = []
def fit(self, X: np.ndarray, feature_names: List[str]):
"""建立基线图"""
self.feature_names = feature_names
# 使用 Graphical Lasso 学习稀疏精度矩阵
self.glasso = GraphicalLasso(alpha=self.alpha)
self.glasso.fit(X)
# 构建基线图
self.baseline_graph = self._precision_to_graph(self.glasso.precision_)
def _precision_to_graph(self, precision: np.ndarray,
threshold: float = 0.1) -> nx.Graph:
"""从精度矩阵构建图"""
G = nx.Graph()
# 添加节点
for name in self.feature_names:
G.add_node(name)
# 添加边 (基于偏相关)
n = len(self.feature_names)
for i in range(n):
for j in range(i + 1, n):
# 偏相关
partial_corr = -precision[i, j] / np.sqrt(precision[i, i] * precision[j, j])
if abs(partial_corr) > threshold:
G.add_edge(
self.feature_names[i],
self.feature_names[j],
weight=abs(partial_corr)
)
return G
def detect(self, X: np.ndarray) -> Dict[str, Any]:
"""检测图结构异常"""
# 学习当前图
current_glasso = GraphicalLasso(alpha=self.alpha)
current_glasso.fit(X)
current_graph = self._precision_to_graph(current_glasso.precision_)
# 比较图结构
baseline_edges = set(self.baseline_graph.edges())
current_edges = set(current_graph.edges())
new_edges = current_edges - baseline_edges
removed_edges = baseline_edges - current_edges
# 计算图相似度
jaccard = len(baseline_edges & current_edges) / len(baseline_edges | current_edges)
return {
"is_anomaly": len(new_edges) > 0 or len(removed_edges) > 0,
"new_edges": list(new_edges),
"removed_edges": list(removed_edges),
"jaccard_similarity": jaccard,
"baseline_edge_count": len(baseline_edges),
"current_edge_count": len(current_edges)
}
4. 实时异常检测系统
4.1 流式处理架构
"""
实时异常检测系统
"""
import asyncio
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from collections import deque
import time
@dataclass
class AnomalyAlert:
"""异常告警"""
alert_id: str
metric_name: str
resource: str
timestamp: float
value: float
anomaly_score: float
anomaly_type: str
details: Dict[str, Any]
class StreamingAnomalyDetector:
"""流式异常检测器"""
def __init__(self, detectors: Dict[str, Any],
window_size: int = 100,
alert_callback: Optional[Callable] = None):
"""
初始化流式检测器
Args:
detectors: 检测器字典 {指标名: 检测器}
window_size: 滑动窗口大小
alert_callback: 告警回调函数
"""
self.detectors = detectors
self.window_size = window_size
self.alert_callback = alert_callback
# 数据缓冲
self.buffers: Dict[str, deque] = {}
for metric in detectors:
self.buffers[metric] = deque(maxlen=window_size)
# 告警抑制
self.alert_cooldown: Dict[str, float] = {}
self.cooldown_seconds = 60
async def process(self, metric_name: str, resource: str,
timestamp: float, value: float) -> Optional[AnomalyAlert]:
"""
处理数据点
Args:
metric_name: 指标名称
resource: 资源标识
timestamp: 时间戳
value: 指标值
Returns:
Optional[AnomalyAlert]: 告警 (如果检测到异常)
"""
key = f"{metric_name}:{resource}"
# 添加到缓冲
if metric_name not in self.buffers:
self.buffers[metric_name] = deque(maxlen=self.window_size)
self.buffers[metric_name].append((timestamp, value))
# 获取检测器
detector = self.detectors.get(metric_name)
if detector is None:
return None
# 检测异常
result = detector.detect(value)
if result.is_anomaly:
# 检查告警抑制
if key in self.alert_cooldown:
if time.time() - self.alert_cooldown[key] < self.cooldown_seconds:
return None
# 生成告警
alert = AnomalyAlert(
alert_id=f"alert-{int(timestamp)}",
metric_name=metric_name,
resource=resource,
timestamp=timestamp,
value=value,
anomaly_score=result.anomaly_score,
anomaly_type=result.anomaly_type,
details=result.details
)
# 更新抑制时间
self.alert_cooldown[key] = time.time()
# 回调
if self.alert_callback:
await self.alert_callback(alert)
return alert
return None
async def process_batch(self, data_points: List[Dict[str, Any]]) -> List[AnomalyAlert]:
"""批量处理数据点"""
alerts = []
for dp in data_points:
alert = await self.process(
dp["metric_name"],
dp["resource"],
dp["timestamp"],
dp["value"]
)
if alert:
alerts.append(alert)
return alerts
class AnomalyDetectionService:
"""异常检测服务"""
def __init__(self):
self.detectors: Dict[str, StreamingAnomalyDetector] = {}
self.alert_handlers: List[Callable] = []
def register_detector(self, name: str, detector: StreamingAnomalyDetector):
"""注册检测器"""
self.detectors[name] = detector
def add_alert_handler(self, handler: Callable):
"""添加告警处理器"""
self.alert_handlers.append(handler)
async def start(self, kafka_consumer):
"""启动服务"""
async for message in kafka_consumer:
data = message.value
# 路由到对应检测器
detector_name = data.get("detector", "default")
detector = self.detectors.get(detector_name)
if detector:
alert = await detector.process(
data["metric_name"],
data["resource"],
data["timestamp"],
data["value"]
)
if alert:
# 触发告警处理
for handler in self.alert_handlers:
await handler(alert)
总结
本章详细介绍了 AIOps 中的异常检测算法:
- 时序异常检测:统计方法 (Z-Score, MAD, EWMA)、机器学习方法 (孤立森林, LOF, LSTM 自编码器)
- 日志异常检测:日志解析 (Drain)、DeepLog、LogBERT
- 多维异常检测:多变量检测、关联异常、图异常检测
- 实时检测系统:流式处理架构设计
关键要点:
- 选择合适的检测方法取决于数据特性和场景需求
- 结合多种检测方法可以提高检测准确率
- 实时检测需要考虑性能和延迟
- 告警抑制和聚合是减少告警噪声的关键
下一章将探讨根因分析算法,包括拓扑分析、时序关联和因果推断方法。