05-AIOps平台实战
概述
本章将前面章节的理论和技术整合为完整的 AIOps 平台实战方案,包括平台架构设计、核心模块实现、系统集成和最佳实践。
1. 平台架构设计
1.1 整体架构
┌─────────────────────────────────────────────────────────────────┐
│ AIOps 平台架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 前端层 │ │
│ │ Dashboard │ 告警中心 │ 分析控制台 │ 配置管理 │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ REST/WebSocket │
│ ┌──────────────────────┴──────────────────────────────────┐ │
│ │ API 网关层 │ │
│ │ 认证授权 │ 限流 │ 路由 │ 负载均衡 │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 应用服务层 │ │
│ │ │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ 异常检测 │ │ 根因分析 │ │ 告警管理 │ │ │
│ │ │ Service │ │ Service │ │ Service │ │ │
│ │ └───────────┘ └───────────┘ └───────────┘ │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ 容量预测 │ │ 自动修复 │ │ 知识库 │ │ │
│ │ │ Service │ │ Service │ │ Service │ │ │
│ │ └───────────┘ └───────────┘ └───────────┘ │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ AI 引擎层 │ │
│ │ 模型服务 │ 模型训练 │ 特征工程 │ 模型管理 │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 数据处理层 │ │
│ │ 实时流处理 (Flink) │ 批处理 (Spark) │ ETL │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 数据存储层 │ │
│ │ 时序库 │ 日志库 │ 图数据库 │ 对象存储 │ 缓存 │ │
│ │ (VictoriaMetrics) (ES) (Neo4j) (MinIO) (Redis) │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 数据采集层 │ │
│ │ Prometheus │ Fluentd │ Jaeger │ Kafka │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
1.2 Kubernetes 部署架构
# AIOps 平台 Kubernetes 部署
apiVersion: v1
kind: Namespace
metadata:
name: aiops-platform
---
# 配置管理
apiVersion: v1
kind: ConfigMap
metadata:
name: aiops-config
namespace: aiops-platform
data:
config.yaml: |
database:
timeseries:
type: victoriametrics
endpoints:
- http://victoria-metrics:8428
logs:
type: elasticsearch
endpoints:
- http://elasticsearch:9200
graph:
type: neo4j
endpoint: bolt://neo4j:7687
kafka:
brokers:
- kafka:9092
topics:
metrics: aiops-metrics
logs: aiops-logs
alerts: aiops-alerts
ai:
model_registry: http://mlflow:5000
inference_endpoint: http://triton:8000
features:
anomaly_detection: true
root_cause_analysis: true
auto_remediation: true
capacity_planning: true
---
# 异常检测服务
apiVersion: apps/v1
kind: Deployment
metadata:
name: anomaly-detection
namespace: aiops-platform
spec:
replicas: 3
selector:
matchLabels:
app: anomaly-detection
template:
metadata:
labels:
app: anomaly-detection
spec:
containers:
- name: anomaly-detection
image: aiops/anomaly-detection:v1.0
ports:
- containerPort: 8080
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
env:
- name: CONFIG_PATH
value: /config/config.yaml
volumeMounts:
- name: config
mountPath: /config
volumes:
- name: config
configMap:
name: aiops-config
---
# 根因分析服务
apiVersion: apps/v1
kind: Deployment
metadata:
name: rca-service
namespace: aiops-platform
spec:
replicas: 2
selector:
matchLabels:
app: rca-service
template:
metadata:
labels:
app: rca-service
spec:
containers:
- name: rca-service
image: aiops/rca-service:v1.0
ports:
- containerPort: 8080
resources:
requests:
memory: "1Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "4000m"
---
# 告警管理服务
apiVersion: apps/v1
kind: Deployment
metadata:
name: alert-manager
namespace: aiops-platform
spec:
replicas: 2
selector:
matchLabels:
app: alert-manager
template:
metadata:
labels:
app: alert-manager
spec:
containers:
- name: alert-manager
image: aiops/alert-manager:v1.0
ports:
- containerPort: 8080
---
# API 网关
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: aiops-ingress
namespace: aiops-platform
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
- host: aiops.example.com
http:
paths:
- path: /api/anomaly
pathType: Prefix
backend:
service:
name: anomaly-detection
port:
number: 8080
- path: /api/rca
pathType: Prefix
backend:
service:
name: rca-service
port:
number: 8080
- path: /api/alerts
pathType: Prefix
backend:
service:
name: alert-manager
port:
number: 8080
2. 核心模块实现
2.1 统一数据采集
"""
统一数据采集模块
"""
import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from aiokafka import AIOKafkaProducer
import json
import time
@dataclass
class DataPoint:
"""数据点"""
source: str
data_type: str # metric, log, trace, event
timestamp: float
resource: str
data: Dict[str, Any]
tags: Dict[str, str] = field(default_factory=dict)
class DataCollector(ABC):
"""数据采集器基类"""
@abstractmethod
async def collect(self) -> List[DataPoint]:
"""采集数据"""
pass
@abstractmethod
async def start(self):
"""启动采集"""
pass
@abstractmethod
async def stop(self):
"""停止采集"""
pass
class PrometheusCollector(DataCollector):
"""Prometheus 指标采集器"""
def __init__(self, prometheus_url: str,
scrape_interval: int = 15):
self.prometheus_url = prometheus_url
self.scrape_interval = scrape_interval
self.running = False
async def collect(self) -> List[DataPoint]:
"""采集 Prometheus 指标"""
import aiohttp
async with aiohttp.ClientSession() as session:
# 查询所有活跃的指标
url = f"{self.prometheus_url}/api/v1/query"
queries = [
"up",
"node_cpu_seconds_total",
"node_memory_MemAvailable_bytes",
"container_cpu_usage_seconds_total",
"container_memory_usage_bytes"
]
data_points = []
for query in queries:
async with session.get(url, params={"query": query}) as resp:
if resp.status == 200:
result = await resp.json()
for item in result.get("data", {}).get("result", []):
metric = item["metric"]
value = item["value"]
data_points.append(DataPoint(
source="prometheus",
data_type="metric",
timestamp=float(value[0]),
resource=metric.get("instance", "unknown"),
data={
"metric_name": metric.get("__name__", query),
"value": float(value[1])
},
tags=metric
))
return data_points
async def start(self):
"""启动采集循环"""
self.running = True
while self.running:
await self.collect()
await asyncio.sleep(self.scrape_interval)
async def stop(self):
"""停止采集"""
self.running = False
class ElasticsearchCollector(DataCollector):
"""Elasticsearch 日志采集器"""
def __init__(self, es_url: str,
index_pattern: str = "logs-*",
poll_interval: int = 5):
self.es_url = es_url
self.index_pattern = index_pattern
self.poll_interval = poll_interval
self.last_timestamp = None
self.running = False
async def collect(self) -> List[DataPoint]:
"""采集日志"""
import aiohttp
query = {
"query": {
"range": {
"@timestamp": {
"gte": self.last_timestamp or "now-5m",
"lt": "now"
}
}
},
"size": 1000,
"sort": [{"@timestamp": "asc"}]
}
async with aiohttp.ClientSession() as session:
url = f"{self.es_url}/{self.index_pattern}/_search"
async with session.post(url, json=query) as resp:
if resp.status != 200:
return []
result = await resp.json()
data_points = []
for hit in result.get("hits", {}).get("hits", []):
source = hit["_source"]
data_points.append(DataPoint(
source="elasticsearch",
data_type="log",
timestamp=source.get("@timestamp", time.time()),
resource=source.get("host", {}).get("name", "unknown"),
data={
"message": source.get("message", ""),
"level": source.get("level", "INFO"),
"service": source.get("service", {}).get("name", "")
},
tags={
"index": hit["_index"],
"log_type": source.get("type", "application")
}
))
if data_points:
self.last_timestamp = data_points[-1].timestamp
return data_points
async def start(self):
self.running = True
while self.running:
await self.collect()
await asyncio.sleep(self.poll_interval)
async def stop(self):
self.running = False
class UnifiedDataPipeline:
"""统一数据管道"""
def __init__(self, kafka_brokers: List[str],
output_topic: str = "aiops-raw-data"):
self.kafka_brokers = kafka_brokers
self.output_topic = output_topic
self.collectors: List[DataCollector] = []
self.producer: Optional[AIOKafkaProducer] = None
def add_collector(self, collector: DataCollector):
"""添加采集器"""
self.collectors.append(collector)
async def start(self):
"""启动管道"""
# 初始化 Kafka 生产者
self.producer = AIOKafkaProducer(
bootstrap_servers=",".join(self.kafka_brokers),
value_serializer=lambda v: json.dumps(v).encode()
)
await self.producer.start()
# 启动所有采集器
tasks = []
for collector in self.collectors:
task = asyncio.create_task(self._run_collector(collector))
tasks.append(task)
await asyncio.gather(*tasks)
async def _run_collector(self, collector: DataCollector):
"""运行单个采集器"""
while True:
try:
data_points = await collector.collect()
for dp in data_points:
await self._send_to_kafka(dp)
except Exception as e:
print(f"采集错误: {e}")
await asyncio.sleep(1)
async def _send_to_kafka(self, data_point: DataPoint):
"""发送到 Kafka"""
message = {
"source": data_point.source,
"data_type": data_point.data_type,
"timestamp": data_point.timestamp,
"resource": data_point.resource,
"data": data_point.data,
"tags": data_point.tags
}
await self.producer.send_and_wait(self.output_topic, message)
async def stop(self):
"""停止管道"""
for collector in self.collectors:
await collector.stop()
if self.producer:
await self.producer.stop()
2.2 实时流处理
"""
实时流处理模块 (基于 Flink Python API)
"""
from typing import Dict, List, Any
import json
class StreamProcessor:
"""流处理器 (简化实现)"""
def __init__(self):
self.processors = []
def add_processor(self, name: str, func):
"""添加处理函数"""
self.processors.append((name, func))
async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""处理单条数据"""
result = data.copy()
for name, func in self.processors:
try:
result = func(result)
except Exception as e:
result["_processing_errors"] = result.get("_processing_errors", [])
result["_processing_errors"].append({
"processor": name,
"error": str(e)
})
return result
def feature_extraction_processor(data: Dict[str, Any]) -> Dict[str, Any]:
"""特征提取处理器"""
if data.get("data_type") == "metric":
# 提取指标特征
value = data.get("data", {}).get("value", 0)
data["features"] = {
"value": value,
"is_high": value > 80,
"is_low": value < 20
}
elif data.get("data_type") == "log":
# 提取日志特征
message = data.get("data", {}).get("message", "")
data["features"] = {
"length": len(message),
"has_error": "error" in message.lower(),
"has_exception": "exception" in message.lower(),
"has_warning": "warning" in message.lower()
}
return data
def anomaly_scoring_processor(data: Dict[str, Any]) -> Dict[str, Any]:
"""异常评分处理器"""
features = data.get("features", {})
# 简单的规则评分
score = 0
if data.get("data_type") == "metric":
if features.get("is_high"):
score += 0.5
if features.get("is_low"):
score += 0.3
elif data.get("data_type") == "log":
if features.get("has_error"):
score += 0.7
if features.get("has_exception"):
score += 0.8
if features.get("has_warning"):
score += 0.3
data["anomaly_score"] = min(score, 1.0)
data["is_anomaly"] = score > 0.5
return data
def enrichment_processor(data: Dict[str, Any]) -> Dict[str, Any]:
"""数据富化处理器"""
resource = data.get("resource", "")
# 添加资源元数据 (实际实现会查询 CMDB)
data["resource_metadata"] = {
"type": "pod" if "pod" in resource else "host",
"namespace": resource.split("/")[0] if "/" in resource else "default",
"cluster": "production"
}
return data
# Flink 作业定义 (伪代码)
"""
# flink_job.py (需要 PyFlink 环境)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
def create_flink_job():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Kafka Source
kafka_consumer = FlinkKafkaConsumer(
topics='aiops-raw-data',
deserialization_schema=SimpleStringSchema(),
properties={
'bootstrap.servers': 'kafka:9092',
'group.id': 'aiops-stream-processor'
}
)
# 处理流
stream = env.add_source(kafka_consumer)
processed = stream \
.map(lambda x: json.loads(x)) \
.map(feature_extraction_processor) \
.map(anomaly_scoring_processor) \
.map(enrichment_processor)
# 分流
anomalies = processed.filter(lambda x: x.get('is_anomaly', False))
normal = processed.filter(lambda x: not x.get('is_anomaly', False))
# 输出
anomalies.add_sink(FlinkKafkaProducer(
'aiops-anomalies',
SimpleStringSchema(),
{'bootstrap.servers': 'kafka:9092'}
))
env.execute('AIOps Stream Processing')
"""
2.3 API 服务实现
"""
AIOps API 服务
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, List, Any, Optional
from datetime import datetime
import asyncio
app = FastAPI(
title="AIOps Platform API",
description="智能运维平台 API",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ==================== 数据模型 ====================
class AnomalyQuery(BaseModel):
resource: Optional[str] = None
metric: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
severity: Optional[str] = None
limit: int = 100
class RCARequest(BaseModel):
incident_id: Optional[str] = None
anomaly_ids: Optional[List[str]] = None
time_window_minutes: int = 30
class AlertRule(BaseModel):
name: str
condition: Dict[str, Any]
severity: str
notification_channels: List[str]
enabled: bool = True
class RemediationRequest(BaseModel):
incident_id: str
action: str
parameters: Dict[str, Any] = {}
dry_run: bool = False
class CapacityQuery(BaseModel):
resources: List[str]
metric: str
forecast_hours: int = 24
# ==================== 依赖注入 ====================
async def get_anomaly_service():
"""获取异常检测服务"""
# 实际实现会返回服务实例
return {}
async def get_rca_service():
"""获取根因分析服务"""
return {}
async def get_alert_service():
"""获取告警服务"""
return {}
# ==================== API 端点 ====================
@app.get("/api/v1/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0"
}
# 异常检测 API
@app.post("/api/v1/anomalies/detect")
async def detect_anomalies(
data: Dict[str, Any],
background_tasks: BackgroundTasks
):
"""实时异常检测"""
# 调用异常检测服务
result = {
"request_id": "req-123",
"is_anomaly": False,
"anomaly_score": 0.2,
"detection_time_ms": 15
}
return result
@app.get("/api/v1/anomalies")
async def list_anomalies(
query: AnomalyQuery = Depends()
):
"""查询异常列表"""
return {
"total": 10,
"anomalies": [
{
"anomaly_id": "anom-001",
"resource": "api-server-pod-1",
"metric": "cpu_usage",
"value": 95.5,
"threshold": 80,
"severity": "high",
"detected_at": datetime.now().isoformat()
}
]
}
@app.get("/api/v1/anomalies/{anomaly_id}")
async def get_anomaly_detail(anomaly_id: str):
"""获取异常详情"""
return {
"anomaly_id": anomaly_id,
"resource": "api-server-pod-1",
"metric": "cpu_usage",
"timeline": [],
"related_metrics": [],
"suggested_causes": []
}
# 根因分析 API
@app.post("/api/v1/rca/analyze")
async def analyze_root_cause(request: RCARequest):
"""执行根因分析"""
return {
"analysis_id": "rca-001",
"status": "completed",
"root_causes": [
{
"rank": 1,
"resource": "database-primary",
"issue": "Connection pool exhausted",
"confidence": 0.85,
"evidence": [
"DB connection count spiked",
"Multiple timeout errors"
]
}
],
"impact_analysis": {
"affected_services": ["api-server", "user-service"],
"estimated_users_affected": 1500
},
"recommended_actions": [
"增加数据库连接池大小",
"检查慢查询"
]
}
@app.get("/api/v1/rca/{analysis_id}")
async def get_rca_result(analysis_id: str):
"""获取根因分析结果"""
return {"analysis_id": analysis_id, "status": "completed"}
# 告警管理 API
@app.get("/api/v1/alerts")
async def list_alerts(
status: Optional[str] = None,
severity: Optional[str] = None,
limit: int = 100
):
"""查询告警列表"""
return {
"total": 5,
"alerts": []
}
@app.post("/api/v1/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: str, comment: Optional[str] = None):
"""确认告警"""
return {"alert_id": alert_id, "status": "acknowledged"}
@app.post("/api/v1/alerts/{alert_id}/resolve")
async def resolve_alert(alert_id: str, resolution: str):
"""解决告警"""
return {"alert_id": alert_id, "status": "resolved"}
@app.post("/api/v1/alert-rules")
async def create_alert_rule(rule: AlertRule):
"""创建告警规则"""
return {"rule_id": "rule-001", **rule.dict()}
@app.get("/api/v1/alert-rules")
async def list_alert_rules():
"""列出告警规则"""
return {"rules": []}
# 自动修复 API
@app.post("/api/v1/remediation/execute")
async def execute_remediation(request: RemediationRequest):
"""执行自动修复"""
if request.dry_run:
return {
"status": "dry_run",
"would_execute": request.action,
"parameters": request.parameters
}
return {
"execution_id": "exec-001",
"status": "running",
"action": request.action
}
@app.get("/api/v1/remediation/{execution_id}")
async def get_remediation_status(execution_id: str):
"""获取修复状态"""
return {
"execution_id": execution_id,
"status": "completed",
"result": "success"
}
# 容量规划 API
@app.post("/api/v1/capacity/forecast")
async def forecast_capacity(query: CapacityQuery):
"""容量预测"""
return {
"forecasts": [
{
"resource": "api-server",
"metric": "cpu_usage",
"predictions": [
{"timestamp": datetime.now().isoformat(), "value": 65, "lower": 60, "upper": 70}
],
"trend": "increasing",
"saturation_time_hours": 48
}
]
}
@app.get("/api/v1/capacity/recommendations")
async def get_capacity_recommendations():
"""获取容量建议"""
return {
"recommendations": [
{
"resource": "api-server",
"action": "scale_up",
"reason": "预测 48 小时内达到容量上限",
"suggested_replicas": 5
}
]
}
# 拓扑与依赖 API
@app.get("/api/v1/topology")
async def get_topology():
"""获取服务拓扑"""
return {
"nodes": [],
"edges": []
}
@app.get("/api/v1/topology/{resource_id}/dependencies")
async def get_dependencies(resource_id: str, direction: str = "both"):
"""获取资源依赖"""
return {
"resource_id": resource_id,
"upstream": [],
"downstream": []
}
# 知识库 API
@app.get("/api/v1/knowledge/runbooks")
async def list_runbooks():
"""列出运维手册"""
return {"runbooks": []}
@app.get("/api/v1/knowledge/search")
async def search_knowledge(query: str):
"""搜索知识库"""
return {"results": []}
# WebSocket 实时推送
from fastapi import WebSocket, WebSocketDisconnect
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
for connection in self.active_connections:
await connection.send_json(message)
manager = ConnectionManager()
@app.websocket("/ws/alerts")
async def websocket_alerts(websocket: WebSocket):
"""告警实时推送"""
await manager.connect(websocket)
try:
while True:
# 等待并推送新告警
await asyncio.sleep(1)
except WebSocketDisconnect:
manager.disconnect(websocket)
@app.websocket("/ws/metrics/{resource_id}")
async def websocket_metrics(websocket: WebSocket, resource_id: str):
"""指标实时推送"""
await websocket.accept()
try:
while True:
# 推送实时指标
await websocket.send_json({
"resource_id": resource_id,
"timestamp": datetime.now().isoformat(),
"metrics": {
"cpu": 45.5,
"memory": 60.2
}
})
await asyncio.sleep(1)
except WebSocketDisconnect:
pass
3. 系统集成
3.1 监控系统集成
"""
监控系统集成
"""
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod
import aiohttp
class MonitoringIntegration(ABC):
"""监控集成基类"""
@abstractmethod
async def get_metrics(self, query: str) -> List[Dict]:
pass
@abstractmethod
async def create_alert_rule(self, rule: Dict) -> str:
pass
class PrometheusIntegration(MonitoringIntegration):
"""Prometheus 集成"""
def __init__(self, url: str):
self.url = url
async def get_metrics(self, query: str,
start: Optional[float] = None,
end: Optional[float] = None,
step: str = "1m") -> List[Dict]:
"""查询 Prometheus 指标"""
async with aiohttp.ClientSession() as session:
if start and end:
# 范围查询
url = f"{self.url}/api/v1/query_range"
params = {
"query": query,
"start": start,
"end": end,
"step": step
}
else:
# 即时查询
url = f"{self.url}/api/v1/query"
params = {"query": query}
async with session.get(url, params=params) as resp:
data = await resp.json()
return data.get("data", {}).get("result", [])
async def create_alert_rule(self, rule: Dict) -> str:
"""创建告警规则 (通过 Alertmanager)"""
# 实际需要写入 Prometheus 规则文件或使用 Thanos Ruler
return "rule-created"
class GrafanaIntegration:
"""Grafana 集成"""
def __init__(self, url: str, api_key: str):
self.url = url
self.headers = {"Authorization": f"Bearer {api_key}"}
async def create_dashboard(self, dashboard: Dict) -> str:
"""创建仪表盘"""
async with aiohttp.ClientSession() as session:
url = f"{self.url}/api/dashboards/db"
async with session.post(url, json=dashboard,
headers=self.headers) as resp:
result = await resp.json()
return result.get("uid", "")
async def add_annotation(self, dashboard_id: str,
text: str, tags: List[str],
time_from: int, time_to: int):
"""添加注释"""
async with aiohttp.ClientSession() as session:
url = f"{self.url}/api/annotations"
annotation = {
"dashboardId": dashboard_id,
"text": text,
"tags": tags,
"time": time_from,
"timeEnd": time_to
}
await session.post(url, json=annotation, headers=self.headers)
class AlertManagerIntegration:
"""Alertmanager 集成"""
def __init__(self, url: str):
self.url = url
async def get_alerts(self, active: bool = True) -> List[Dict]:
"""获取告警"""
async with aiohttp.ClientSession() as session:
url = f"{self.url}/api/v2/alerts"
params = {"active": str(active).lower()}
async with session.get(url, params=params) as resp:
return await resp.json()
async def silence_alert(self, matchers: List[Dict],
starts_at: str, ends_at: str,
comment: str, created_by: str) -> str:
"""静默告警"""
async with aiohttp.ClientSession() as session:
url = f"{self.url}/api/v2/silences"
silence = {
"matchers": matchers,
"startsAt": starts_at,
"endsAt": ends_at,
"comment": comment,
"createdBy": created_by
}
async with session.post(url, json=silence) as resp:
result = await resp.json()
return result.get("silenceID", "")
3.2 工单系统集成
"""
工单系统集成
"""
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod
import aiohttp
class TicketingIntegration(ABC):
"""工单系统集成基类"""
@abstractmethod
async def create_ticket(self, ticket: Dict) -> str:
pass
@abstractmethod
async def update_ticket(self, ticket_id: str, updates: Dict):
pass
@abstractmethod
async def close_ticket(self, ticket_id: str, resolution: str):
pass
class JiraIntegration(TicketingIntegration):
"""Jira 集成"""
def __init__(self, url: str, username: str, api_token: str):
self.url = url
self.auth = aiohttp.BasicAuth(username, api_token)
async def create_ticket(self, ticket: Dict) -> str:
"""创建 Jira Issue"""
async with aiohttp.ClientSession() as session:
url = f"{self.url}/rest/api/3/issue"
issue = {
"fields": {
"project": {"key": ticket.get("project", "OPS")},
"summary": ticket["title"],
"description": {
"type": "doc",
"version": 1,
"content": [
{
"type": "paragraph",
"content": [
{"type": "text", "text": ticket["description"]}
]
}
]
},
"issuetype": {"name": ticket.get("type", "Bug")},
"priority": {"name": ticket.get("priority", "Medium")}
}
}
async with session.post(url, json=issue, auth=self.auth) as resp:
result = await resp.json()
return result.get("key", "")
async def update_ticket(self, ticket_id: str, updates: Dict):
"""更新 Issue"""
async with aiohttp.ClientSession() as session:
url = f"{self.url}/rest/api/3/issue/{ticket_id}"
await session.put(url, json={"fields": updates}, auth=self.auth)
async def close_ticket(self, ticket_id: str, resolution: str):
"""关闭 Issue"""
async with aiohttp.ClientSession() as session:
# 获取可用的转换
url = f"{self.url}/rest/api/3/issue/{ticket_id}/transitions"
async with session.get(url, auth=self.auth) as resp:
transitions = await resp.json()
# 找到 "Done" 转换
done_transition = None
for t in transitions.get("transitions", []):
if t["name"].lower() in ["done", "closed", "resolved"]:
done_transition = t["id"]
break
if done_transition:
await session.post(url, json={
"transition": {"id": done_transition},
"update": {
"comment": [
{
"add": {
"body": {
"type": "doc",
"version": 1,
"content": [
{
"type": "paragraph",
"content": [
{"type": "text", "text": resolution}
]
}
]
}
}
}
]
}
}, auth=self.auth)
class ServiceNowIntegration(TicketingIntegration):
"""ServiceNow 集成"""
def __init__(self, instance: str, username: str, password: str):
self.base_url = f"https://{instance}.service-now.com/api/now"
self.auth = aiohttp.BasicAuth(username, password)
async def create_ticket(self, ticket: Dict) -> str:
"""创建 Incident"""
async with aiohttp.ClientSession() as session:
url = f"{self.base_url}/table/incident"
incident = {
"short_description": ticket["title"],
"description": ticket["description"],
"urgency": ticket.get("urgency", "2"),
"impact": ticket.get("impact", "2"),
"category": ticket.get("category", "software")
}
async with session.post(url, json=incident, auth=self.auth) as resp:
result = await resp.json()
return result.get("result", {}).get("number", "")
async def update_ticket(self, ticket_id: str, updates: Dict):
"""更新 Incident"""
async with aiohttp.ClientSession() as session:
url = f"{self.base_url}/table/incident/{ticket_id}"
await session.patch(url, json=updates, auth=self.auth)
async def close_ticket(self, ticket_id: str, resolution: str):
"""关闭 Incident"""
await self.update_ticket(ticket_id, {
"state": "6", # Resolved
"close_code": "Solved",
"close_notes": resolution
})
3.3 通知系统集成
"""
通知系统集成
"""
from typing import Dict, List, Any
from abc import ABC, abstractmethod
import aiohttp
class NotificationChannel(ABC):
"""通知渠道基类"""
@abstractmethod
async def send(self, message: Dict) -> bool:
pass
class SlackNotification(NotificationChannel):
"""Slack 通知"""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def send(self, message: Dict) -> bool:
"""发送 Slack 消息"""
async with aiohttp.ClientSession() as session:
# 构建 Slack 消息
slack_message = {
"text": message.get("title", "AIOps 通知"),
"attachments": [
{
"color": self._severity_to_color(message.get("severity", "info")),
"title": message.get("title"),
"text": message.get("description"),
"fields": [
{"title": "资源", "value": message.get("resource", "N/A"), "short": True},
{"title": "严重程度", "value": message.get("severity", "N/A"), "short": True}
],
"footer": "AIOps Platform",
"ts": message.get("timestamp")
}
]
}
async with session.post(self.webhook_url, json=slack_message) as resp:
return resp.status == 200
def _severity_to_color(self, severity: str) -> str:
colors = {
"critical": "#ff0000",
"high": "#ff6600",
"medium": "#ffcc00",
"low": "#00ff00",
"info": "#0066ff"
}
return colors.get(severity.lower(), "#808080")
class DingTalkNotification(NotificationChannel):
"""钉钉通知"""
def __init__(self, webhook_url: str, secret: str = None):
self.webhook_url = webhook_url
self.secret = secret
async def send(self, message: Dict) -> bool:
"""发送钉钉消息"""
import hashlib
import hmac
import base64
import urllib.parse
import time
url = self.webhook_url
# 签名
if self.secret:
timestamp = str(round(time.time() * 1000))
string_to_sign = f"{timestamp}\n{self.secret}"
hmac_code = hmac.new(
self.secret.encode(),
string_to_sign.encode(),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = f"{url}×tamp={timestamp}&sign={sign}"
# 构建消息
dingtalk_message = {
"msgtype": "markdown",
"markdown": {
"title": message.get("title", "AIOps 通知"),
"text": f"""### {message.get('title')}
> **资源**: {message.get('resource', 'N/A')}
> **严重程度**: {message.get('severity', 'N/A')}
{message.get('description', '')}
"""
}
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=dingtalk_message) as resp:
result = await resp.json()
return result.get("errcode") == 0
class EmailNotification(NotificationChannel):
"""邮件通知"""
def __init__(self, smtp_host: str, smtp_port: int,
username: str, password: str,
from_addr: str):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.username = username
self.password = password
self.from_addr = from_addr
async def send(self, message: Dict) -> bool:
"""发送邮件"""
import aiosmtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
msg = MIMEMultipart('alternative')
msg['Subject'] = f"[AIOps] {message.get('title', 'Alert')}"
msg['From'] = self.from_addr
msg['To'] = ", ".join(message.get('recipients', []))
# HTML 内容
html = f"""
<html>
<body>
<h2>{message.get('title')}</h2>
<table border="1" cellpadding="5">
<tr><td><b>资源</b></td><td>{message.get('resource', 'N/A')}</td></tr>
<tr><td><b>严重程度</b></td><td>{message.get('severity', 'N/A')}</td></tr>
<tr><td><b>时间</b></td><td>{message.get('timestamp', 'N/A')}</td></tr>
</table>
<p>{message.get('description', '')}</p>
</body>
</html>
"""
msg.attach(MIMEText(html, 'html'))
try:
await aiosmtplib.send(
msg,
hostname=self.smtp_host,
port=self.smtp_port,
username=self.username,
password=self.password,
use_tls=True
)
return True
except Exception:
return False
class NotificationService:
"""通知服务"""
def __init__(self):
self.channels: Dict[str, NotificationChannel] = {}
def add_channel(self, name: str, channel: NotificationChannel):
"""添加通知渠道"""
self.channels[name] = channel
async def notify(self, message: Dict, channels: List[str] = None):
"""发送通知"""
if channels is None:
channels = list(self.channels.keys())
results = {}
for channel_name in channels:
channel = self.channels.get(channel_name)
if channel:
success = await channel.send(message)
results[channel_name] = success
return results
4. 最佳实践
4.1 运维流程自动化
# 运维自动化流程配置
workflows:
# 告警响应流程
alert_response:
trigger:
type: alert
conditions:
severity: [critical, high]
steps:
- name: enrich_alert
action: fetch_context
params:
include:
- recent_changes
- related_alerts
- resource_health
- name: analyze_root_cause
action: run_rca
params:
time_window: 30m
methods: [topology, temporal]
- name: create_incident
action: create_ticket
params:
system: jira
project: OPS
type: Incident
priority: "{{ alert.severity }}"
- name: notify_team
action: send_notification
params:
channels: [slack, pagerduty]
escalation_policy: default
- name: attempt_auto_remediation
condition: "{{ rca.confidence > 0.8 }}"
action: execute_remediation
params:
max_attempts: 1
rollback_on_failure: true
# 容量预警流程
capacity_alert:
trigger:
type: schedule
cron: "0 */6 * * *" # 每6小时
steps:
- name: forecast_capacity
action: run_capacity_forecast
params:
horizon: 7d
resources: all
- name: check_thresholds
action: evaluate_thresholds
params:
warning: 70%
critical: 85%
- name: generate_recommendations
condition: "{{ forecast.saturation_time < 7d }}"
action: create_recommendations
params:
include_cost_analysis: true
- name: create_capacity_ticket
condition: "{{ forecast.saturation_time < 3d }}"
action: create_ticket
params:
system: jira
project: CAP
type: Task
# 部署监控流程
deployment_monitoring:
trigger:
type: event
source: kubernetes
event_type: deployment
steps:
- name: track_deployment
action: create_deployment_marker
params:
dashboards: [production-overview]
- name: monitor_metrics
action: start_monitoring
params:
duration: 30m
metrics:
- error_rate
- latency_p99
- cpu_usage
- name: detect_regression
action: compare_baselines
params:
baseline_window: 1h
threshold_multiplier: 1.5
- name: auto_rollback
condition: "{{ regression.detected and regression.severity == 'critical' }}"
action: rollback_deployment
params:
notify: true
ticket: create
4.2 模型运维最佳实践
"""
AIOps 模型运维最佳实践
"""
model_ops_config = {
# 模型监控
"monitoring": {
"metrics": [
"prediction_latency",
"prediction_accuracy",
"feature_drift",
"concept_drift",
"model_staleness"
],
"alerting": {
"accuracy_drop": {
"threshold": 0.1,
"window": "1h",
"action": "notify_ml_team"
},
"latency_spike": {
"threshold": "100ms",
"action": "scale_inference"
}
}
},
# 模型更新策略
"update_strategy": {
"anomaly_detection": {
"retrain_schedule": "weekly",
"online_learning": True,
"validation_required": True,
"canary_rollout": True
},
"root_cause": {
"retrain_schedule": "monthly",
"requires_labeled_data": True,
"human_review": True
},
"capacity_forecast": {
"retrain_schedule": "daily",
"incremental_training": True,
"auto_deploy": True
}
},
# 特征管理
"feature_management": {
"feature_store": "feast",
"version_control": True,
"lineage_tracking": True,
"freshness_requirements": {
"real_time": "< 1s",
"near_real_time": "< 1m",
"batch": "< 1h"
}
},
# 模型版本控制
"version_control": {
"registry": "mlflow",
"artifact_storage": "s3://aiops-models",
"metadata": [
"training_data_version",
"feature_versions",
"hyperparameters",
"performance_metrics"
]
}
}
4.3 安全与合规
# 安全与合规配置
security:
authentication:
method: oidc
provider: keycloak
required_claims:
- email
- groups
authorization:
rbac:
enabled: true
roles:
- name: viewer
permissions:
- read:alerts
- read:dashboards
- read:reports
- name: operator
permissions:
- read:*
- write:alerts
- execute:remediation
- acknowledge:incidents
- name: admin
permissions:
- "*"
audit:
enabled: true
events:
- type: auth
level: all
- type: config_change
level: all
- type: remediation
level: all
- type: data_access
level: sensitive
data_protection:
encryption:
at_rest: aes-256-gcm
in_transit: tls-1.3
pii_handling:
detection: enabled
masking: enabled
retention: 90d
data_retention:
metrics: 90d
logs: 30d
traces: 7d
alerts: 365d
compliance:
frameworks:
- soc2
- iso27001
- gdpr
controls:
access_review:
frequency: quarterly
automated: true
change_management:
approval_required: true
rollback_capability: required
incident_response:
sla:
critical: 15m
high: 1h
medium: 4h
low: 24h
5. 平台演进路线
5.1 成熟度演进
┌─────────────────────────────────────────────────────────────────┐
│ AIOps 平台演进路线 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Phase 1: 基础建设 (3-6月) │
│ ├─ 统一数据采集平台 │
│ ├─ 基础异常检测 (阈值 + 统计) │
│ ├─ 告警聚合与降噪 │
│ └─ 仪表盘与可视化 │
│ │
│ Phase 2: 智能化 (6-12月) │
│ ├─ ML 异常检测 (时序 + 日志) │
│ ├─ 基础根因分析 │
│ ├─ 容量预测 │
│ └─ 知识库建设 │
│ │
│ Phase 3: 自动化 (12-18月) │
│ ├─ 自动修复引擎 │
│ ├─ 智能伸缩 │
│ ├─ 预测性维护 │
│ └─ ChatOps 集成 │
│ │
│ Phase 4: 高级 (18-24月) │
│ ├─ 因果推断 RCA │
│ ├─ RL 自动调优 │
│ ├─ 全链路可观测性 │
│ └─ 自适应学习系统 │
│ │
└─────────────────────────────────────────────────────────────────┘
5.2 关键指标
"""
AIOps 平台关键指标
"""
kpis = {
"运维效率": {
"MTTR": {
"定义": "平均修复时间",
"目标": "< 30分钟",
"当前": "2小时"
},
"MTTD": {
"定义": "平均检测时间",
"目标": "< 5分钟",
"当前": "15分钟"
},
"incident_reduction": {
"定义": "事件减少率",
"目标": "> 50%",
"当前": "30%"
}
},
"AI 效果": {
"anomaly_precision": {
"定义": "异常检测精确率",
"目标": "> 90%",
"当前": "85%"
},
"rca_accuracy": {
"定义": "根因分析准确率",
"目标": "> 80%",
"当前": "70%"
},
"auto_remediation_success": {
"定义": "自动修复成功率",
"目标": "> 95%",
"当前": "80%"
}
},
"业务价值": {
"downtime_reduction": {
"定义": "停机时间减少",
"目标": "> 70%",
"当前": "40%"
},
"cost_savings": {
"定义": "成本节省",
"目标": "> 30%",
"当前": "15%"
},
"engineer_productivity": {
"定义": "工程师效率提升",
"目标": "> 50%",
"当前": "25%"
}
}
}
总结
本章完成了 AIOps 平台实战的全面探讨:
- 平台架构:分层架构设计与 K8s 部署方案
- 核心模块:数据采集、流处理、API 服务实现
- 系统集成:监控、工单、通知系统对接
- 最佳实践:运维流程自动化、模型运维、安全合规
关键收获:
- AIOps 是渐进式演进的过程
- 数据质量是 AI 效果的基础
- 人机协作比完全自动化更现实
- 持续反馈和优化至关重要
通过本章的学习,读者应该能够设计和实现一个企业级的 AIOps 平台,提升运维效率和系统可靠性。