04-智能运维决策
概述
智能运维决策是 AIOps 的高级阶段,包括自动修复、容量预测、弹性伸缩和成本优化等能力。本章探讨如何利用 AI 技术实现智能化的运维决策。
1. 自动修复系统
1.1 自动修复框架
┌─────────────────────────────────────────────────────────────────┐
│ 自动修复框架 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 触发层 │ │
│ │ 告警触发 │ 异常检测 │ 阈值触发 │ 预测触发 │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 决策层 │ │
│ │ │ │
│ │ 问题分类 修复选择 风险评估 │ │
│ │ ├─ 资源问题 ├─ 规则匹配 ├─ 影响分析 │ │
│ │ ├─ 配置问题 ├─ ML推荐 ├─ 回滚能力 │ │
│ │ ├─ 代码问题 ├─ 历史学习 ├─ SLA影响 │ │
│ │ └─ 依赖问题 └─ 专家知识 └─ 成本评估 │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 执行层 │ │
│ │ │ │
│ │ 预检查 执行修复 验证结果 │ │
│ │ ├─ 权限检查 ├─ 脚本执行 ├─ 健康检查 │ │
│ │ ├─ 状态检查 ├─ API调用 ├─ 指标验证 │ │
│ │ └─ 资源检查 └─ K8s操作 └─ 日志验证 │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 反馈层 │ │
│ │ 执行记录 │ 效果评估 │ 知识沉淀 │ 模型更新 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
1.2 自动修复引擎
"""
自动修复引擎实现
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from enum import Enum
from abc import ABC, abstractmethod
import asyncio
import time
import uuid
class RemediationStatus(Enum):
"""修复状态"""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
ROLLED_BACK = "rolled_back"
SKIPPED = "skipped"
class RiskLevel(Enum):
"""风险级别"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class RemediationAction:
"""修复动作"""
action_id: str
name: str
description: str
action_type: str # restart, scale, rollback, config_change, etc.
target_resource: str
parameters: Dict[str, Any] = field(default_factory=dict)
risk_level: RiskLevel = RiskLevel.LOW
estimated_duration: int = 60 # 秒
rollback_action: Optional['RemediationAction'] = None
@dataclass
class RemediationResult:
"""修复结果"""
action_id: str
status: RemediationStatus
start_time: float
end_time: Optional[float] = None
duration_seconds: Optional[float] = None
output: str = ""
error: Optional[str] = None
metrics_before: Dict[str, float] = field(default_factory=dict)
metrics_after: Dict[str, float] = field(default_factory=dict)
class RemediationExecutor(ABC):
"""修复执行器基类"""
@abstractmethod
async def execute(self, action: RemediationAction) -> RemediationResult:
"""执行修复"""
pass
@abstractmethod
async def validate(self, action: RemediationAction, result: RemediationResult) -> bool:
"""验证修复结果"""
pass
class KubernetesRemediationExecutor(RemediationExecutor):
"""Kubernetes 修复执行器"""
def __init__(self, k8s_client):
self.k8s_client = k8s_client
async def execute(self, action: RemediationAction) -> RemediationResult:
"""执行 Kubernetes 修复动作"""
start_time = time.time()
result = RemediationResult(
action_id=action.action_id,
status=RemediationStatus.RUNNING,
start_time=start_time
)
try:
if action.action_type == "restart_pod":
await self._restart_pod(action)
result.output = f"Pod {action.target_resource} 重启成功"
elif action.action_type == "scale_deployment":
replicas = action.parameters.get("replicas", 1)
await self._scale_deployment(action.target_resource, replicas)
result.output = f"Deployment 扩容到 {replicas} 副本"
elif action.action_type == "rollback_deployment":
revision = action.parameters.get("revision")
await self._rollback_deployment(action.target_resource, revision)
result.output = f"Deployment 回滚到版本 {revision}"
elif action.action_type == "cordon_node":
await self._cordon_node(action.target_resource)
result.output = f"Node {action.target_resource} 已隔离"
elif action.action_type == "drain_node":
await self._drain_node(action.target_resource)
result.output = f"Node {action.target_resource} 已排空"
result.status = RemediationStatus.SUCCESS
except Exception as e:
result.status = RemediationStatus.FAILED
result.error = str(e)
result.end_time = time.time()
result.duration_seconds = result.end_time - start_time
return result
async def _restart_pod(self, action: RemediationAction):
"""重启 Pod"""
namespace = action.parameters.get("namespace", "default")
pod_name = action.target_resource
# 删除 Pod (由 Deployment 重建)
await asyncio.to_thread(
self.k8s_client.delete_namespaced_pod,
name=pod_name,
namespace=namespace
)
async def _scale_deployment(self, deployment_name: str, replicas: int):
"""扩缩容 Deployment"""
namespace = "default" # 从 deployment_name 解析
# 更新副本数
body = {"spec": {"replicas": replicas}}
await asyncio.to_thread(
self.k8s_client.patch_namespaced_deployment_scale,
name=deployment_name,
namespace=namespace,
body=body
)
async def _rollback_deployment(self, deployment_name: str, revision: Optional[int]):
"""回滚 Deployment"""
namespace = "default"
# 使用 kubectl rollback
import subprocess
cmd = ["kubectl", "rollout", "undo", f"deployment/{deployment_name}",
"-n", namespace]
if revision:
cmd.extend(["--to-revision", str(revision)])
await asyncio.to_thread(subprocess.run, cmd, check=True)
async def _cordon_node(self, node_name: str):
"""隔离节点"""
body = {"spec": {"unschedulable": True}}
await asyncio.to_thread(
self.k8s_client.patch_node,
name=node_name,
body=body
)
async def _drain_node(self, node_name: str):
"""排空节点"""
import subprocess
cmd = ["kubectl", "drain", node_name, "--ignore-daemonsets",
"--delete-emptydir-data", "--force"]
await asyncio.to_thread(subprocess.run, cmd, check=True)
async def validate(self, action: RemediationAction, result: RemediationResult) -> bool:
"""验证修复结果"""
if result.status != RemediationStatus.SUCCESS:
return False
# 等待资源稳定
await asyncio.sleep(30)
try:
if action.action_type == "restart_pod":
# 检查 Pod 是否 Running
namespace = action.parameters.get("namespace", "default")
pods = await asyncio.to_thread(
self.k8s_client.list_namespaced_pod,
namespace=namespace,
label_selector=action.parameters.get("label_selector", "")
)
return all(p.status.phase == "Running" for p in pods.items)
elif action.action_type == "scale_deployment":
# 检查副本数
namespace = "default"
deployment = await asyncio.to_thread(
self.k8s_client.read_namespaced_deployment,
name=action.target_resource,
namespace=namespace
)
return deployment.status.ready_replicas == action.parameters.get("replicas")
return True
except Exception:
return False
class AutoRemediationEngine:
"""自动修复引擎"""
def __init__(self):
self.executors: Dict[str, RemediationExecutor] = {}
self.remediation_rules: List['RemediationRule'] = []
self.history: List[RemediationResult] = []
self.approval_required_levels = {RiskLevel.HIGH, RiskLevel.CRITICAL}
def register_executor(self, resource_type: str, executor: RemediationExecutor):
"""注册执行器"""
self.executors[resource_type] = executor
def add_rule(self, rule: 'RemediationRule'):
"""添加修复规则"""
self.remediation_rules.append(rule)
async def handle_incident(self, incident: Dict[str, Any]) -> List[RemediationResult]:
"""
处理事件
Args:
incident: 事件信息
Returns:
List[RemediationResult]: 修复结果列表
"""
results = []
# 1. 匹配修复规则
matched_rules = self._match_rules(incident)
for rule in matched_rules:
# 2. 生成修复动作
actions = rule.generate_actions(incident)
for action in actions:
# 3. 风险评估
if action.risk_level in self.approval_required_levels:
# 需要人工审批
approved = await self._request_approval(action, incident)
if not approved:
results.append(RemediationResult(
action_id=action.action_id,
status=RemediationStatus.SKIPPED,
start_time=time.time(),
output="需要人工审批,已跳过"
))
continue
# 4. 执行修复
result = await self._execute_action(action)
results.append(result)
# 5. 验证结果
if result.status == RemediationStatus.SUCCESS:
executor = self._get_executor(action.target_resource)
if executor:
valid = await executor.validate(action, result)
if not valid:
# 尝试回滚
if action.rollback_action:
rollback_result = await self._execute_action(action.rollback_action)
results.append(rollback_result)
# 6. 记录历史
self.history.append(result)
return results
def _match_rules(self, incident: Dict[str, Any]) -> List['RemediationRule']:
"""匹配修复规则"""
matched = []
for rule in self.remediation_rules:
if rule.matches(incident):
matched.append(rule)
return matched
async def _execute_action(self, action: RemediationAction) -> RemediationResult:
"""执行修复动作"""
executor = self._get_executor(action.target_resource)
if executor is None:
return RemediationResult(
action_id=action.action_id,
status=RemediationStatus.FAILED,
start_time=time.time(),
error="未找到合适的执行器"
)
return await executor.execute(action)
def _get_executor(self, resource: str) -> Optional[RemediationExecutor]:
"""获取执行器"""
# 根据资源类型选择执行器
if "pod" in resource.lower() or "deployment" in resource.lower():
return self.executors.get("kubernetes")
return None
async def _request_approval(self, action: RemediationAction,
incident: Dict[str, Any]) -> bool:
"""请求人工审批"""
# 实际实现会发送审批请求
# 这里简化为自动批准低风险操作
return action.risk_level == RiskLevel.MEDIUM
@dataclass
class RemediationRule:
"""修复规则"""
rule_id: str
name: str
description: str
conditions: Dict[str, Any]
actions_template: List[Dict[str, Any]]
priority: int = 0
def matches(self, incident: Dict[str, Any]) -> bool:
"""检查是否匹配"""
for key, expected in self.conditions.items():
actual = incident.get(key)
if isinstance(expected, list):
if actual not in expected:
return False
elif isinstance(expected, str) and expected.startswith("regex:"):
import re
pattern = expected[6:]
if not re.match(pattern, str(actual)):
return False
else:
if actual != expected:
return False
return True
def generate_actions(self, incident: Dict[str, Any]) -> List[RemediationAction]:
"""生成修复动作"""
actions = []
for template in self.actions_template:
# 替换模板变量
action_dict = self._render_template(template, incident)
action = RemediationAction(
action_id=str(uuid.uuid4()),
name=action_dict["name"],
description=action_dict.get("description", ""),
action_type=action_dict["action_type"],
target_resource=action_dict["target_resource"],
parameters=action_dict.get("parameters", {}),
risk_level=RiskLevel(action_dict.get("risk_level", "low"))
)
actions.append(action)
return actions
def _render_template(self, template: Dict, incident: Dict) -> Dict:
"""渲染模板"""
import json
import re
template_str = json.dumps(template)
# 替换变量 ${variable}
def replace_var(match):
var_name = match.group(1)
return str(incident.get(var_name, match.group(0)))
rendered_str = re.sub(r'\$\{(\w+)\}', replace_var, template_str)
return json.loads(rendered_str)
1.3 常见修复场景
"""
常见自动修复场景
"""
# Pod 崩溃循环修复规则
pod_crashloop_rule = RemediationRule(
rule_id="pod-crashloop-001",
name="Pod CrashLoop 修复",
description="自动重启崩溃循环的 Pod",
conditions={
"alert_name": "PodCrashLooping",
"severity": ["high", "critical"]
},
actions_template=[
{
"name": "重启 Pod",
"action_type": "restart_pod",
"target_resource": "${pod_name}",
"parameters": {
"namespace": "${namespace}"
},
"risk_level": "low"
}
],
priority=1
)
# 内存不足扩容规则
memory_scale_rule = RemediationRule(
rule_id="memory-scale-001",
name="内存不足自动扩容",
description="当内存使用超过阈值时自动扩容",
conditions={
"alert_name": "HighMemoryUsage",
"metric_value": "regex:^(8[0-9]|9[0-9]|100)$" # 80-100%
},
actions_template=[
{
"name": "增加副本数",
"action_type": "scale_deployment",
"target_resource": "${deployment_name}",
"parameters": {
"replicas": "${current_replicas + 2}"
},
"risk_level": "medium"
}
],
priority=2
)
# 部署回滚规则
deployment_rollback_rule = RemediationRule(
rule_id="rollback-001",
name="部署回滚",
description="当新版本出现问题时自动回滚",
conditions={
"alert_name": ["DeploymentFailed", "HighErrorRate"],
"time_since_deploy": "regex:^([0-9]|[1-5][0-9])$" # 60分钟内
},
actions_template=[
{
"name": "回滚到上一版本",
"action_type": "rollback_deployment",
"target_resource": "${deployment_name}",
"parameters": {
"revision": None # 回滚到上一版本
},
"risk_level": "medium"
}
],
priority=3
)
# 节点故障处理规则
node_failure_rule = RemediationRule(
rule_id="node-failure-001",
name="节点故障处理",
description="自动隔离和排空故障节点",
conditions={
"alert_name": "NodeNotReady",
"duration_minutes": "regex:^([5-9]|[1-9][0-9]+)$" # 超过5分钟
},
actions_template=[
{
"name": "隔离节点",
"action_type": "cordon_node",
"target_resource": "${node_name}",
"risk_level": "medium"
},
{
"name": "排空节点",
"action_type": "drain_node",
"target_resource": "${node_name}",
"risk_level": "high"
}
],
priority=4
)
2. 容量预测
2.1 时序预测模型
"""
容量预测模型
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
@dataclass
class CapacityForecast:
"""容量预测结果"""
resource: str
metric: str
forecast_horizon: int
timestamps: List[float]
predictions: List[float]
lower_bound: List[float]
upper_bound: List[float]
confidence: float
trend: str # increasing, decreasing, stable
saturation_time: Optional[float] = None # 预计饱和时间
class ProphetForecaster:
"""基于 Prophet 的容量预测"""
def __init__(self, seasonality_mode: str = 'multiplicative'):
"""
初始化预测器
Args:
seasonality_mode: 季节性模式 (additive, multiplicative)
"""
self.seasonality_mode = seasonality_mode
self.model = None
def fit(self, timestamps: np.ndarray, values: np.ndarray):
"""训练模型"""
from prophet import Prophet
# 准备数据
df = pd.DataFrame({
'ds': pd.to_datetime(timestamps, unit='s'),
'y': values
})
self.model = Prophet(
seasonality_mode=self.seasonality_mode,
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=True
)
self.model.fit(df)
def predict(self, horizon_hours: int = 24) -> CapacityForecast:
"""预测"""
# 生成未来时间点
future = self.model.make_future_dataframe(periods=horizon_hours, freq='H')
forecast = self.model.predict(future)
# 提取预测结果
future_forecast = forecast.tail(horizon_hours)
return CapacityForecast(
resource="",
metric="",
forecast_horizon=horizon_hours,
timestamps=future_forecast['ds'].astype(np.int64) // 10**9,
predictions=future_forecast['yhat'].tolist(),
lower_bound=future_forecast['yhat_lower'].tolist(),
upper_bound=future_forecast['yhat_upper'].tolist(),
confidence=0.95,
trend=self._determine_trend(future_forecast['yhat'].values)
)
def _determine_trend(self, values: np.ndarray) -> str:
"""判断趋势"""
slope = np.polyfit(range(len(values)), values, 1)[0]
if slope > 0.1:
return "increasing"
elif slope < -0.1:
return "decreasing"
return "stable"
class LSTMForecaster(nn.Module):
"""LSTM 容量预测模型"""
def __init__(self, input_size: int = 1, hidden_size: int = 64,
num_layers: int = 2, output_size: int = 24):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
batch_first=True, dropout=0.2)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
lstm_out, _ = self.lstm(x)
output = self.fc(lstm_out[:, -1, :])
return output
class DeepCapacityForecaster:
"""深度学习容量预测"""
def __init__(self, sequence_length: int = 168, # 7天 (小时数据)
forecast_horizon: int = 24):
"""
初始化预测器
Args:
sequence_length: 输入序列长度
forecast_horizon: 预测时长
"""
self.sequence_length = sequence_length
self.forecast_horizon = forecast_horizon
self.model = LSTMForecaster(output_size=forecast_horizon)
self.scaler = StandardScaler()
def fit(self, values: np.ndarray, epochs: int = 100,
batch_size: int = 32, lr: float = 0.001):
"""训练模型"""
# 标准化
values_scaled = self.scaler.fit_transform(values.reshape(-1, 1)).flatten()
# 创建训练数据
X, y = self._create_sequences(values_scaled)
X = torch.FloatTensor(X).unsqueeze(-1)
y = torch.FloatTensor(y)
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
criterion = nn.MSELoss()
self.model.train()
for epoch in range(epochs):
total_loss = 0
for i in range(0, len(X), batch_size):
batch_X = X[i:i+batch_size]
batch_y = y[i:i+batch_size]
optimizer.zero_grad()
outputs = self.model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
def _create_sequences(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""创建序列数据"""
X, y = [], []
for i in range(len(data) - self.sequence_length - self.forecast_horizon + 1):
X.append(data[i:i+self.sequence_length])
y.append(data[i+self.sequence_length:i+self.sequence_length+self.forecast_horizon])
return np.array(X), np.array(y)
def predict(self, recent_values: np.ndarray,
num_samples: int = 100) -> CapacityForecast:
"""
预测 (带不确定性估计)
Args:
recent_values: 最近的值序列
num_samples: MC Dropout 采样数
Returns:
CapacityForecast: 预测结果
"""
# 标准化
values_scaled = self.scaler.transform(recent_values.reshape(-1, 1)).flatten()
# 取最后 sequence_length 个点
if len(values_scaled) > self.sequence_length:
values_scaled = values_scaled[-self.sequence_length:]
X = torch.FloatTensor(values_scaled).unsqueeze(0).unsqueeze(-1)
# MC Dropout 进行不确定性估计
self.model.train() # 保持 Dropout 激活
predictions = []
with torch.no_grad():
for _ in range(num_samples):
pred = self.model(X)
predictions.append(pred.numpy()[0])
predictions = np.array(predictions)
# 反标准化
mean_pred = self.scaler.inverse_transform(
predictions.mean(axis=0).reshape(-1, 1)
).flatten()
std_pred = predictions.std(axis=0) * self.scaler.scale_[0]
# 计算置信区间
lower = mean_pred - 1.96 * std_pred
upper = mean_pred + 1.96 * std_pred
return CapacityForecast(
resource="",
metric="",
forecast_horizon=self.forecast_horizon,
timestamps=[],
predictions=mean_pred.tolist(),
lower_bound=lower.tolist(),
upper_bound=upper.tolist(),
confidence=0.95,
trend=self._determine_trend(mean_pred)
)
def _determine_trend(self, values: np.ndarray) -> str:
"""判断趋势"""
slope = np.polyfit(range(len(values)), values, 1)[0]
if slope > 0.1:
return "increasing"
elif slope < -0.1:
return "decreasing"
return "stable"
class CapacityPlanningService:
"""容量规划服务"""
def __init__(self):
self.forecasters: Dict[str, DeepCapacityForecaster] = {}
self.thresholds: Dict[str, float] = {}
def add_resource(self, resource_id: str, metric: str,
threshold: float = 0.8):
"""添加资源监控"""
key = f"{resource_id}:{metric}"
self.forecasters[key] = DeepCapacityForecaster()
self.thresholds[key] = threshold
def train(self, resource_id: str, metric: str,
historical_data: np.ndarray):
"""训练预测模型"""
key = f"{resource_id}:{metric}"
if key in self.forecasters:
self.forecasters[key].fit(historical_data)
def predict_capacity(self, resource_id: str, metric: str,
recent_data: np.ndarray) -> CapacityForecast:
"""预测容量"""
key = f"{resource_id}:{metric}"
forecaster = self.forecasters.get(key)
if forecaster is None:
raise ValueError(f"资源 {key} 未注册")
forecast = forecaster.predict(recent_data)
forecast.resource = resource_id
forecast.metric = metric
# 计算饱和时间
threshold = self.thresholds.get(key, 0.8)
forecast.saturation_time = self._estimate_saturation_time(
forecast.predictions,
threshold
)
return forecast
def _estimate_saturation_time(self, predictions: List[float],
threshold: float) -> Optional[float]:
"""估计饱和时间"""
for i, pred in enumerate(predictions):
if pred >= threshold:
return i # 小时数
return None
def generate_capacity_report(self, forecasts: List[CapacityForecast]) -> Dict[str, Any]:
"""生成容量报告"""
report = {
"generated_at": time.time(),
"resources": [],
"alerts": [],
"recommendations": []
}
for forecast in forecasts:
resource_report = {
"resource": forecast.resource,
"metric": forecast.metric,
"current_trend": forecast.trend,
"saturation_time_hours": forecast.saturation_time,
"predictions_24h": {
"min": min(forecast.predictions),
"max": max(forecast.predictions),
"avg": np.mean(forecast.predictions)
}
}
report["resources"].append(resource_report)
# 生成告警
if forecast.saturation_time and forecast.saturation_time < 24:
report["alerts"].append({
"resource": forecast.resource,
"severity": "high" if forecast.saturation_time < 6 else "medium",
"message": f"{forecast.resource} 的 {forecast.metric} "
f"预计在 {forecast.saturation_time} 小时内达到饱和"
})
# 生成建议
if forecast.trend == "increasing":
report["recommendations"].append({
"resource": forecast.resource,
"action": "scale_up",
"reason": f"{forecast.metric} 呈上升趋势"
})
return report
3. 弹性伸缩
3.1 智能 HPA
"""
智能弹性伸缩
"""
import numpy as np
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class ScalingDirection(Enum):
"""伸缩方向"""
UP = "up"
DOWN = "down"
NONE = "none"
@dataclass
class ScalingDecision:
"""伸缩决策"""
resource: str
direction: ScalingDirection
current_replicas: int
target_replicas: int
reason: str
confidence: float
metrics: Dict[str, float]
class PredictiveHPA:
"""预测性 HPA"""
def __init__(self, forecaster: DeepCapacityForecaster,
target_utilization: float = 0.7,
min_replicas: int = 1,
max_replicas: int = 100,
scale_up_cooldown: int = 180,
scale_down_cooldown: int = 300):
"""
初始化预测性 HPA
Args:
forecaster: 容量预测器
target_utilization: 目标利用率
min_replicas: 最小副本数
max_replicas: 最大副本数
scale_up_cooldown: 扩容冷却时间 (秒)
scale_down_cooldown: 缩容冷却时间 (秒)
"""
self.forecaster = forecaster
self.target_utilization = target_utilization
self.min_replicas = min_replicas
self.max_replicas = max_replicas
self.scale_up_cooldown = scale_up_cooldown
self.scale_down_cooldown = scale_down_cooldown
self.last_scale_time: Dict[str, float] = {}
self.last_scale_direction: Dict[str, ScalingDirection] = {}
def evaluate(self, resource: str,
current_metrics: Dict[str, float],
historical_metrics: np.ndarray,
current_replicas: int) -> ScalingDecision:
"""
评估伸缩需求
Args:
resource: 资源名称
current_metrics: 当前指标
historical_metrics: 历史指标
current_replicas: 当前副本数
Returns:
ScalingDecision: 伸缩决策
"""
import time
# 1. 预测未来负载
forecast = self.forecaster.predict(historical_metrics)
predicted_peak = max(forecast.predictions)
# 2. 计算所需副本数
# 基于预测峰值和目标利用率
required_capacity = predicted_peak / self.target_utilization
current_capacity = current_replicas
target_replicas = int(np.ceil(required_capacity))
target_replicas = max(self.min_replicas, min(self.max_replicas, target_replicas))
# 3. 判断伸缩方向
if target_replicas > current_replicas:
direction = ScalingDirection.UP
reason = f"预测峰值 {predicted_peak:.2f} 需要更多容量"
elif target_replicas < current_replicas:
direction = ScalingDirection.DOWN
reason = f"预测负载下降,可以减少副本"
else:
direction = ScalingDirection.NONE
reason = "当前容量适合预测负载"
# 4. 检查冷却时间
last_time = self.last_scale_time.get(resource, 0)
last_direction = self.last_scale_direction.get(resource, ScalingDirection.NONE)
cooldown = (self.scale_up_cooldown if direction == ScalingDirection.UP
else self.scale_down_cooldown)
if time.time() - last_time < cooldown:
direction = ScalingDirection.NONE
target_replicas = current_replicas
reason = f"冷却期内,跳过伸缩"
# 5. 更新状态
if direction != ScalingDirection.NONE:
self.last_scale_time[resource] = time.time()
self.last_scale_direction[resource] = direction
return ScalingDecision(
resource=resource,
direction=direction,
current_replicas=current_replicas,
target_replicas=target_replicas,
reason=reason,
confidence=forecast.confidence,
metrics={
"predicted_peak": predicted_peak,
"target_utilization": self.target_utilization,
"current_utilization": current_metrics.get("cpu_utilization", 0)
}
)
class MultiMetricHPA:
"""多指标 HPA"""
def __init__(self, metrics_config: List[Dict[str, Any]]):
"""
初始化多指标 HPA
Args:
metrics_config: 指标配置列表
"""
self.metrics_config = metrics_config
self.weights = {m["name"]: m.get("weight", 1.0) for m in metrics_config}
self.targets = {m["name"]: m["target"] for m in metrics_config}
def evaluate(self, resource: str,
current_metrics: Dict[str, float],
current_replicas: int) -> ScalingDecision:
"""
基于多指标评估伸缩
Args:
resource: 资源名称
current_metrics: 当前指标值
current_replicas: 当前副本数
Returns:
ScalingDecision: 伸缩决策
"""
# 计算每个指标的期望副本数
replica_suggestions = []
total_weight = 0
for metric_config in self.metrics_config:
metric_name = metric_config["name"]
target = metric_config["target"]
weight = metric_config.get("weight", 1.0)
current_value = current_metrics.get(metric_name)
if current_value is None:
continue
# 计算基于该指标的期望副本数
ratio = current_value / target
suggested = int(np.ceil(current_replicas * ratio))
replica_suggestions.append((suggested, weight))
total_weight += weight
if not replica_suggestions:
return ScalingDecision(
resource=resource,
direction=ScalingDirection.NONE,
current_replicas=current_replicas,
target_replicas=current_replicas,
reason="无可用指标",
confidence=0,
metrics=current_metrics
)
# 加权平均
weighted_sum = sum(s * w for s, w in replica_suggestions)
target_replicas = int(np.ceil(weighted_sum / total_weight))
# 确定方向
if target_replicas > current_replicas:
direction = ScalingDirection.UP
elif target_replicas < current_replicas:
direction = ScalingDirection.DOWN
else:
direction = ScalingDirection.NONE
return ScalingDecision(
resource=resource,
direction=direction,
current_replicas=current_replicas,
target_replicas=target_replicas,
reason=f"基于 {len(replica_suggestions)} 个指标的加权决策",
confidence=0.8,
metrics=current_metrics
)
class RLBasedAutoscaler:
"""基于强化学习的自动伸缩"""
def __init__(self, state_dim: int, action_dim: int,
hidden_dim: int = 128):
"""
初始化 RL 伸缩器
Args:
state_dim: 状态维度
action_dim: 动作维度 (伸缩级别)
hidden_dim: 隐藏层维度
"""
self.state_dim = state_dim
self.action_dim = action_dim
# DQN 网络
self.q_network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
self.target_network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
self.optimizer = torch.optim.Adam(self.q_network.parameters())
self.memory = []
self.gamma = 0.99
self.epsilon = 0.1
def get_state(self, metrics: Dict[str, float],
current_replicas: int) -> np.ndarray:
"""构建状态向量"""
return np.array([
metrics.get("cpu_utilization", 0),
metrics.get("memory_utilization", 0),
metrics.get("request_rate", 0),
metrics.get("latency_p99", 0),
current_replicas,
metrics.get("pending_requests", 0)
])
def select_action(self, state: np.ndarray,
epsilon: Optional[float] = None) -> int:
"""
选择动作
动作空间:
0: 保持不变
1: +1 副本
2: +2 副本
3: -1 副本
4: -2 副本
"""
if epsilon is None:
epsilon = self.epsilon
if np.random.random() < epsilon:
return np.random.randint(self.action_dim)
state_tensor = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
q_values = self.q_network(state_tensor)
return q_values.argmax().item()
def action_to_delta(self, action: int) -> int:
"""将动作转换为副本变化量"""
action_map = {0: 0, 1: 1, 2: 2, 3: -1, 4: -2}
return action_map.get(action, 0)
def calculate_reward(self, metrics: Dict[str, float],
action: int) -> float:
"""
计算奖励
奖励设计:
- 保持目标利用率 -> 正奖励
- 过度伸缩 -> 负奖励
- SLA 违规 -> 大负奖励
"""
cpu_util = metrics.get("cpu_utilization", 0)
latency = metrics.get("latency_p99", 0)
# 利用率奖励 (目标 70%)
util_reward = -abs(cpu_util - 0.7) * 10
# 延迟惩罚
latency_threshold = 500 # ms
latency_penalty = max(0, (latency - latency_threshold) / 100) * -5
# 伸缩成本
delta = abs(self.action_to_delta(action))
scale_cost = -delta * 0.5
return util_reward + latency_penalty + scale_cost
def train(self, batch_size: int = 32):
"""训练模型"""
if len(self.memory) < batch_size:
return
# 采样批次
indices = np.random.choice(len(self.memory), batch_size, replace=False)
batch = [self.memory[i] for i in indices]
states = torch.FloatTensor([t[0] for t in batch])
actions = torch.LongTensor([t[1] for t in batch])
rewards = torch.FloatTensor([t[2] for t in batch])
next_states = torch.FloatTensor([t[3] for t in batch])
dones = torch.FloatTensor([t[4] for t in batch])
# 计算 Q 值
current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
with torch.no_grad():
next_q = self.target_network(next_states).max(1)[0]
target_q = rewards + self.gamma * next_q * (1 - dones)
# 更新网络
loss = nn.MSELoss()(current_q.squeeze(), target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def update_target_network(self):
"""更新目标网络"""
self.target_network.load_state_dict(self.q_network.state_dict())
4. 成本优化
4.1 资源成本分析
"""
资源成本分析与优化
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
import numpy as np
@dataclass
class ResourceCost:
"""资源成本"""
resource_id: str
resource_type: str
hourly_cost: float
monthly_cost: float
utilization: float
waste_percentage: float
optimization_potential: float
@dataclass
class CostOptimizationRecommendation:
"""成本优化建议"""
recommendation_id: str
resource_id: str
action: str
estimated_savings_monthly: float
risk_level: str
implementation_effort: str
description: str
class CostAnalyzer:
"""成本分析器"""
def __init__(self, pricing: Dict[str, float]):
"""
初始化成本分析器
Args:
pricing: 定价信息 {资源类型: 每小时成本}
"""
self.pricing = pricing
def analyze_resource(self, resource_id: str,
resource_type: str,
metrics: Dict[str, float],
instance_count: int = 1) -> ResourceCost:
"""
分析单个资源成本
Args:
resource_id: 资源 ID
resource_type: 资源类型
metrics: 使用指标
instance_count: 实例数量
Returns:
ResourceCost: 成本分析结果
"""
hourly_rate = self.pricing.get(resource_type, 0)
hourly_cost = hourly_rate * instance_count
monthly_cost = hourly_cost * 24 * 30
# 计算利用率
cpu_util = metrics.get("cpu_utilization", 0)
mem_util = metrics.get("memory_utilization", 0)
avg_utilization = (cpu_util + mem_util) / 2
# 计算浪费比例
waste_percentage = max(0, 1 - avg_utilization)
# 优化潜力 (可节省的成本比例)
optimization_potential = waste_percentage * 0.8 # 保守估计
return ResourceCost(
resource_id=resource_id,
resource_type=resource_type,
hourly_cost=hourly_cost,
monthly_cost=monthly_cost,
utilization=avg_utilization,
waste_percentage=waste_percentage,
optimization_potential=optimization_potential
)
def analyze_cluster(self, resources: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
分析集群成本
Args:
resources: 资源列表
Returns:
Dict: 集群成本分析
"""
total_cost = 0
total_waste = 0
resource_costs = []
for resource in resources:
cost = self.analyze_resource(
resource["id"],
resource["type"],
resource["metrics"],
resource.get("count", 1)
)
resource_costs.append(cost)
total_cost += cost.monthly_cost
total_waste += cost.monthly_cost * cost.waste_percentage
return {
"total_monthly_cost": total_cost,
"total_waste": total_waste,
"waste_percentage": total_waste / total_cost if total_cost > 0 else 0,
"resource_breakdown": resource_costs,
"top_wasters": sorted(
resource_costs,
key=lambda x: x.monthly_cost * x.waste_percentage,
reverse=True
)[:5]
}
class CostOptimizer:
"""成本优化器"""
def __init__(self, analyzer: CostAnalyzer):
self.analyzer = analyzer
def generate_recommendations(self,
cluster_analysis: Dict[str, Any]) -> List[CostOptimizationRecommendation]:
"""
生成优化建议
Args:
cluster_analysis: 集群分析结果
Returns:
List: 优化建议列表
"""
recommendations = []
rec_id = 0
for cost in cluster_analysis["resource_breakdown"]:
# 右大小调整建议
if cost.utilization < 0.3:
rec_id += 1
savings = cost.monthly_cost * 0.5
recommendations.append(CostOptimizationRecommendation(
recommendation_id=f"rec-{rec_id}",
resource_id=cost.resource_id,
action="rightsize_down",
estimated_savings_monthly=savings,
risk_level="low",
implementation_effort="medium",
description=f"利用率仅 {cost.utilization:.1%},建议缩小规格或减少副本"
))
# 使用 Spot/抢占式实例
if cost.resource_type in ["vm", "ec2", "gce"]:
rec_id += 1
savings = cost.monthly_cost * 0.6
recommendations.append(CostOptimizationRecommendation(
recommendation_id=f"rec-{rec_id}",
resource_id=cost.resource_id,
action="use_spot_instances",
estimated_savings_monthly=savings,
risk_level="medium",
implementation_effort="high",
description="考虑使用 Spot/抢占式实例节省成本"
))
# 预留实例
if cost.utilization > 0.7 and cost.monthly_cost > 100:
rec_id += 1
savings = cost.monthly_cost * 0.3
recommendations.append(CostOptimizationRecommendation(
recommendation_id=f"rec-{rec_id}",
resource_id=cost.resource_id,
action="reserved_instance",
estimated_savings_monthly=savings,
risk_level="low",
implementation_effort="low",
description="高利用率资源,建议购买预留实例"
))
# 按节省金额排序
recommendations.sort(key=lambda x: x.estimated_savings_monthly, reverse=True)
return recommendations
def optimize_scaling_policy(self,
historical_usage: np.ndarray,
current_replicas: int) -> Dict[str, Any]:
"""
优化伸缩策略
Args:
historical_usage: 历史使用数据
current_replicas: 当前副本数
Returns:
Dict: 优化后的伸缩策略
"""
# 分析使用模式
usage_percentiles = {
"p50": np.percentile(historical_usage, 50),
"p90": np.percentile(historical_usage, 90),
"p99": np.percentile(historical_usage, 99),
"max": np.max(historical_usage)
}
# 计算最优副本数范围
min_replicas = max(1, int(usage_percentiles["p50"] / 0.8))
max_replicas = int(usage_percentiles["p99"] / 0.7) + 1
# 建议的伸缩策略
policy = {
"min_replicas": min_replicas,
"max_replicas": max_replicas,
"target_utilization": 0.7,
"scale_up_threshold": 0.8,
"scale_down_threshold": 0.5,
"scale_up_cooldown": 180,
"scale_down_cooldown": 300,
"usage_analysis": usage_percentiles,
"current_vs_optimal": {
"current_replicas": current_replicas,
"suggested_replicas": int((min_replicas + max_replicas) / 2),
"over_provisioned": current_replicas > max_replicas
}
}
return policy
class FinOpsIntegration:
"""FinOps 集成"""
def __init__(self):
self.budgets: Dict[str, float] = {}
self.alerts: List[Dict[str, Any]] = []
def set_budget(self, team: str, monthly_budget: float):
"""设置预算"""
self.budgets[team] = monthly_budget
def check_budget(self, team: str, current_spend: float) -> Dict[str, Any]:
"""
检查预算
Args:
team: 团队
current_spend: 当前支出
Returns:
Dict: 预算状态
"""
budget = self.budgets.get(team, float('inf'))
percentage_used = current_spend / budget if budget > 0 else 0
status = {
"team": team,
"budget": budget,
"current_spend": current_spend,
"remaining": budget - current_spend,
"percentage_used": percentage_used,
"status": "ok"
}
if percentage_used >= 1.0:
status["status"] = "exceeded"
self.alerts.append({
"team": team,
"type": "budget_exceeded",
"message": f"团队 {team} 已超出预算"
})
elif percentage_used >= 0.9:
status["status"] = "warning"
self.alerts.append({
"team": team,
"type": "budget_warning",
"message": f"团队 {team} 预算使用已达 {percentage_used:.0%}"
})
return status
def generate_cost_report(self, team_costs: Dict[str, float]) -> Dict[str, Any]:
"""
生成成本报告
Args:
team_costs: 各团队成本
Returns:
Dict: 成本报告
"""
total_cost = sum(team_costs.values())
report = {
"period": "monthly",
"total_cost": total_cost,
"by_team": [],
"budget_status": [],
"recommendations": []
}
for team, cost in team_costs.items():
budget_status = self.check_budget(team, cost)
report["by_team"].append({
"team": team,
"cost": cost,
"percentage_of_total": cost / total_cost if total_cost > 0 else 0
})
report["budget_status"].append(budget_status)
return report
5. 综合决策系统
5.1 决策引擎
"""
综合运维决策引擎
"""
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
class DecisionType(Enum):
"""决策类型"""
REMEDIATION = "remediation"
SCALING = "scaling"
COST_OPTIMIZATION = "cost_optimization"
CAPACITY_PLANNING = "capacity_planning"
@dataclass
class OpsDecision:
"""运维决策"""
decision_id: str
decision_type: DecisionType
priority: int
resource: str
action: str
parameters: Dict[str, Any]
confidence: float
estimated_impact: str
requires_approval: bool
auto_execute: bool
class IntelligentOpsEngine:
"""智能运维引擎"""
def __init__(self):
# 子系统
self.remediation_engine: Optional[AutoRemediationEngine] = None
self.capacity_planner: Optional[CapacityPlanningService] = None
self.autoscaler: Optional[PredictiveHPA] = None
self.cost_optimizer: Optional[CostOptimizer] = None
# 决策队列
self.pending_decisions: List[OpsDecision] = []
self.executed_decisions: List[OpsDecision] = []
# 配置
self.auto_execute_threshold = 0.8 # 置信度阈值
def register_remediation_engine(self, engine: AutoRemediationEngine):
"""注册修复引擎"""
self.remediation_engine = engine
def register_capacity_planner(self, planner: CapacityPlanningService):
"""注册容量规划器"""
self.capacity_planner = planner
def register_autoscaler(self, scaler: PredictiveHPA):
"""注册自动伸缩器"""
self.autoscaler = scaler
def register_cost_optimizer(self, optimizer: CostOptimizer):
"""注册成本优化器"""
self.cost_optimizer = optimizer
async def process_event(self, event: Dict[str, Any]) -> List[OpsDecision]:
"""
处理事件并生成决策
Args:
event: 事件数据
Returns:
List[OpsDecision]: 决策列表
"""
decisions = []
event_type = event.get("type")
if event_type == "incident":
# 生成修复决策
if self.remediation_engine:
results = await self.remediation_engine.handle_incident(event)
for result in results:
decision = OpsDecision(
decision_id=result.action_id,
decision_type=DecisionType.REMEDIATION,
priority=1,
resource=event.get("resource", ""),
action="remediate",
parameters={"result": result},
confidence=0.9,
estimated_impact="恢复服务",
requires_approval=False,
auto_execute=True
)
decisions.append(decision)
elif event_type == "capacity_alert":
# 生成容量决策
if self.capacity_planner and self.autoscaler:
resource = event.get("resource")
metrics = event.get("metrics", {})
historical = event.get("historical_data", np.array([]))
scaling_decision = self.autoscaler.evaluate(
resource,
metrics,
historical,
event.get("current_replicas", 1)
)
if scaling_decision.direction != ScalingDirection.NONE:
decision = OpsDecision(
decision_id=str(uuid.uuid4()),
decision_type=DecisionType.SCALING,
priority=2,
resource=resource,
action=f"scale_{scaling_decision.direction.value}",
parameters={
"current": scaling_decision.current_replicas,
"target": scaling_decision.target_replicas
},
confidence=scaling_decision.confidence,
estimated_impact=scaling_decision.reason,
requires_approval=scaling_decision.confidence < 0.7,
auto_execute=scaling_decision.confidence >= self.auto_execute_threshold
)
decisions.append(decision)
elif event_type == "cost_review":
# 生成成本优化决策
if self.cost_optimizer:
recommendations = self.cost_optimizer.generate_recommendations(
event.get("cluster_analysis", {})
)
for rec in recommendations[:3]: # 取前3个建议
decision = OpsDecision(
decision_id=rec.recommendation_id,
decision_type=DecisionType.COST_OPTIMIZATION,
priority=3,
resource=rec.resource_id,
action=rec.action,
parameters={
"savings": rec.estimated_savings_monthly,
"description": rec.description
},
confidence=0.8,
estimated_impact=f"节省 ${rec.estimated_savings_monthly:.0f}/月",
requires_approval=True,
auto_execute=False
)
decisions.append(decision)
# 排序决策
decisions.sort(key=lambda d: d.priority)
# 添加到队列
self.pending_decisions.extend(decisions)
return decisions
async def execute_decision(self, decision: OpsDecision) -> Dict[str, Any]:
"""
执行决策
Args:
decision: 决策
Returns:
Dict: 执行结果
"""
result = {
"decision_id": decision.decision_id,
"status": "pending",
"message": ""
}
try:
if decision.decision_type == DecisionType.REMEDIATION:
# 修复已在 handle_incident 中执行
result["status"] = "success"
result["message"] = "修复已执行"
elif decision.decision_type == DecisionType.SCALING:
# 执行伸缩
# 实际实现会调用 K8s API
result["status"] = "success"
result["message"] = f"已将 {decision.resource} 伸缩到 {decision.parameters['target']} 副本"
elif decision.decision_type == DecisionType.COST_OPTIMIZATION:
# 成本优化需要人工执行
result["status"] = "pending_manual"
result["message"] = "成本优化建议需要人工执行"
# 记录执行
self.executed_decisions.append(decision)
except Exception as e:
result["status"] = "failed"
result["message"] = str(e)
return result
async def run(self):
"""运行决策引擎"""
while True:
# 处理待执行的决策
for decision in self.pending_decisions:
if decision.auto_execute:
await self.execute_decision(decision)
self.pending_decisions.remove(decision)
await asyncio.sleep(10)
总结
本章深入探讨了智能运维决策的核心能力:
- 自动修复:规则引擎、执行器、验证机制
- 容量预测:Prophet、LSTM、不确定性估计
- 弹性伸缩:预测性 HPA、多指标、强化学习
- 成本优化:资源分析、优化建议、FinOps
关键要点:
- 自动修复需要完善的风险评估和回滚机制
- 容量预测应考虑季节性和趋势变化
- 弹性伸缩要平衡响应速度和稳定性
- 成本优化是持续过程,需要组织配合
下一章将探讨 AIOps 平台实战,包括平台架构、集成方案和最佳实践。