HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

04-智能运维决策

概述

智能运维决策是 AIOps 的高级阶段,包括自动修复、容量预测、弹性伸缩和成本优化等能力。本章探讨如何利用 AI 技术实现智能化的运维决策。

1. 自动修复系统

1.1 自动修复框架

┌─────────────────────────────────────────────────────────────────┐
│                    自动修复框架                                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   触发层                                  │   │
│  │  告警触发 │ 异常检测 │ 阈值触发 │ 预测触发               │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   决策层                                  │   │
│  │                                                         │   │
│  │   问题分类        修复选择        风险评估               │   │
│  │   ├─ 资源问题     ├─ 规则匹配     ├─ 影响分析            │   │
│  │   ├─ 配置问题     ├─ ML推荐       ├─ 回滚能力            │   │
│  │   ├─ 代码问题     ├─ 历史学习     ├─ SLA影响             │   │
│  │   └─ 依赖问题     └─ 专家知识     └─ 成本评估            │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   执行层                                  │   │
│  │                                                         │   │
│  │   预检查          执行修复        验证结果               │   │
│  │   ├─ 权限检查     ├─ 脚本执行     ├─ 健康检查            │   │
│  │   ├─ 状态检查     ├─ API调用      ├─ 指标验证            │   │
│  │   └─ 资源检查     └─ K8s操作      └─ 日志验证            │   │
│  └──────────────────────┬──────────────────────────────────┘   │
│                         │                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   反馈层                                  │   │
│  │  执行记录 │ 效果评估 │ 知识沉淀 │ 模型更新               │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 自动修复引擎

"""
自动修复引擎实现
"""

from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from enum import Enum
from abc import ABC, abstractmethod
import asyncio
import time
import uuid


class RemediationStatus(Enum):
    """修复状态"""
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    ROLLED_BACK = "rolled_back"
    SKIPPED = "skipped"


class RiskLevel(Enum):
    """风险级别"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


@dataclass
class RemediationAction:
    """修复动作"""
    action_id: str
    name: str
    description: str
    action_type: str  # restart, scale, rollback, config_change, etc.
    target_resource: str
    parameters: Dict[str, Any] = field(default_factory=dict)
    risk_level: RiskLevel = RiskLevel.LOW
    estimated_duration: int = 60  # 秒
    rollback_action: Optional['RemediationAction'] = None


@dataclass
class RemediationResult:
    """修复结果"""
    action_id: str
    status: RemediationStatus
    start_time: float
    end_time: Optional[float] = None
    duration_seconds: Optional[float] = None
    output: str = ""
    error: Optional[str] = None
    metrics_before: Dict[str, float] = field(default_factory=dict)
    metrics_after: Dict[str, float] = field(default_factory=dict)


class RemediationExecutor(ABC):
    """修复执行器基类"""

    @abstractmethod
    async def execute(self, action: RemediationAction) -> RemediationResult:
        """执行修复"""
        pass

    @abstractmethod
    async def validate(self, action: RemediationAction, result: RemediationResult) -> bool:
        """验证修复结果"""
        pass


class KubernetesRemediationExecutor(RemediationExecutor):
    """Kubernetes 修复执行器"""

    def __init__(self, k8s_client):
        self.k8s_client = k8s_client

    async def execute(self, action: RemediationAction) -> RemediationResult:
        """执行 Kubernetes 修复动作"""
        start_time = time.time()
        result = RemediationResult(
            action_id=action.action_id,
            status=RemediationStatus.RUNNING,
            start_time=start_time
        )

        try:
            if action.action_type == "restart_pod":
                await self._restart_pod(action)
                result.output = f"Pod {action.target_resource} 重启成功"

            elif action.action_type == "scale_deployment":
                replicas = action.parameters.get("replicas", 1)
                await self._scale_deployment(action.target_resource, replicas)
                result.output = f"Deployment 扩容到 {replicas} 副本"

            elif action.action_type == "rollback_deployment":
                revision = action.parameters.get("revision")
                await self._rollback_deployment(action.target_resource, revision)
                result.output = f"Deployment 回滚到版本 {revision}"

            elif action.action_type == "cordon_node":
                await self._cordon_node(action.target_resource)
                result.output = f"Node {action.target_resource} 已隔离"

            elif action.action_type == "drain_node":
                await self._drain_node(action.target_resource)
                result.output = f"Node {action.target_resource} 已排空"

            result.status = RemediationStatus.SUCCESS

        except Exception as e:
            result.status = RemediationStatus.FAILED
            result.error = str(e)

        result.end_time = time.time()
        result.duration_seconds = result.end_time - start_time
        return result

    async def _restart_pod(self, action: RemediationAction):
        """重启 Pod"""
        namespace = action.parameters.get("namespace", "default")
        pod_name = action.target_resource

        # 删除 Pod (由 Deployment 重建)
        await asyncio.to_thread(
            self.k8s_client.delete_namespaced_pod,
            name=pod_name,
            namespace=namespace
        )

    async def _scale_deployment(self, deployment_name: str, replicas: int):
        """扩缩容 Deployment"""
        namespace = "default"  # 从 deployment_name 解析

        # 更新副本数
        body = {"spec": {"replicas": replicas}}
        await asyncio.to_thread(
            self.k8s_client.patch_namespaced_deployment_scale,
            name=deployment_name,
            namespace=namespace,
            body=body
        )

    async def _rollback_deployment(self, deployment_name: str, revision: Optional[int]):
        """回滚 Deployment"""
        namespace = "default"

        # 使用 kubectl rollback
        import subprocess
        cmd = ["kubectl", "rollout", "undo", f"deployment/{deployment_name}",
               "-n", namespace]
        if revision:
            cmd.extend(["--to-revision", str(revision)])

        await asyncio.to_thread(subprocess.run, cmd, check=True)

    async def _cordon_node(self, node_name: str):
        """隔离节点"""
        body = {"spec": {"unschedulable": True}}
        await asyncio.to_thread(
            self.k8s_client.patch_node,
            name=node_name,
            body=body
        )

    async def _drain_node(self, node_name: str):
        """排空节点"""
        import subprocess
        cmd = ["kubectl", "drain", node_name, "--ignore-daemonsets",
               "--delete-emptydir-data", "--force"]
        await asyncio.to_thread(subprocess.run, cmd, check=True)

    async def validate(self, action: RemediationAction, result: RemediationResult) -> bool:
        """验证修复结果"""
        if result.status != RemediationStatus.SUCCESS:
            return False

        # 等待资源稳定
        await asyncio.sleep(30)

        try:
            if action.action_type == "restart_pod":
                # 检查 Pod 是否 Running
                namespace = action.parameters.get("namespace", "default")
                pods = await asyncio.to_thread(
                    self.k8s_client.list_namespaced_pod,
                    namespace=namespace,
                    label_selector=action.parameters.get("label_selector", "")
                )
                return all(p.status.phase == "Running" for p in pods.items)

            elif action.action_type == "scale_deployment":
                # 检查副本数
                namespace = "default"
                deployment = await asyncio.to_thread(
                    self.k8s_client.read_namespaced_deployment,
                    name=action.target_resource,
                    namespace=namespace
                )
                return deployment.status.ready_replicas == action.parameters.get("replicas")

            return True

        except Exception:
            return False


class AutoRemediationEngine:
    """自动修复引擎"""

    def __init__(self):
        self.executors: Dict[str, RemediationExecutor] = {}
        self.remediation_rules: List['RemediationRule'] = []
        self.history: List[RemediationResult] = []
        self.approval_required_levels = {RiskLevel.HIGH, RiskLevel.CRITICAL}

    def register_executor(self, resource_type: str, executor: RemediationExecutor):
        """注册执行器"""
        self.executors[resource_type] = executor

    def add_rule(self, rule: 'RemediationRule'):
        """添加修复规则"""
        self.remediation_rules.append(rule)

    async def handle_incident(self, incident: Dict[str, Any]) -> List[RemediationResult]:
        """
        处理事件

        Args:
            incident: 事件信息

        Returns:
            List[RemediationResult]: 修复结果列表
        """
        results = []

        # 1. 匹配修复规则
        matched_rules = self._match_rules(incident)

        for rule in matched_rules:
            # 2. 生成修复动作
            actions = rule.generate_actions(incident)

            for action in actions:
                # 3. 风险评估
                if action.risk_level in self.approval_required_levels:
                    # 需要人工审批
                    approved = await self._request_approval(action, incident)
                    if not approved:
                        results.append(RemediationResult(
                            action_id=action.action_id,
                            status=RemediationStatus.SKIPPED,
                            start_time=time.time(),
                            output="需要人工审批,已跳过"
                        ))
                        continue

                # 4. 执行修复
                result = await self._execute_action(action)
                results.append(result)

                # 5. 验证结果
                if result.status == RemediationStatus.SUCCESS:
                    executor = self._get_executor(action.target_resource)
                    if executor:
                        valid = await executor.validate(action, result)
                        if not valid:
                            # 尝试回滚
                            if action.rollback_action:
                                rollback_result = await self._execute_action(action.rollback_action)
                                results.append(rollback_result)

                # 6. 记录历史
                self.history.append(result)

        return results

    def _match_rules(self, incident: Dict[str, Any]) -> List['RemediationRule']:
        """匹配修复规则"""
        matched = []
        for rule in self.remediation_rules:
            if rule.matches(incident):
                matched.append(rule)
        return matched

    async def _execute_action(self, action: RemediationAction) -> RemediationResult:
        """执行修复动作"""
        executor = self._get_executor(action.target_resource)
        if executor is None:
            return RemediationResult(
                action_id=action.action_id,
                status=RemediationStatus.FAILED,
                start_time=time.time(),
                error="未找到合适的执行器"
            )

        return await executor.execute(action)

    def _get_executor(self, resource: str) -> Optional[RemediationExecutor]:
        """获取执行器"""
        # 根据资源类型选择执行器
        if "pod" in resource.lower() or "deployment" in resource.lower():
            return self.executors.get("kubernetes")
        return None

    async def _request_approval(self, action: RemediationAction,
                                  incident: Dict[str, Any]) -> bool:
        """请求人工审批"""
        # 实际实现会发送审批请求
        # 这里简化为自动批准低风险操作
        return action.risk_level == RiskLevel.MEDIUM


@dataclass
class RemediationRule:
    """修复规则"""
    rule_id: str
    name: str
    description: str
    conditions: Dict[str, Any]
    actions_template: List[Dict[str, Any]]
    priority: int = 0

    def matches(self, incident: Dict[str, Any]) -> bool:
        """检查是否匹配"""
        for key, expected in self.conditions.items():
            actual = incident.get(key)

            if isinstance(expected, list):
                if actual not in expected:
                    return False
            elif isinstance(expected, str) and expected.startswith("regex:"):
                import re
                pattern = expected[6:]
                if not re.match(pattern, str(actual)):
                    return False
            else:
                if actual != expected:
                    return False

        return True

    def generate_actions(self, incident: Dict[str, Any]) -> List[RemediationAction]:
        """生成修复动作"""
        actions = []

        for template in self.actions_template:
            # 替换模板变量
            action_dict = self._render_template(template, incident)

            action = RemediationAction(
                action_id=str(uuid.uuid4()),
                name=action_dict["name"],
                description=action_dict.get("description", ""),
                action_type=action_dict["action_type"],
                target_resource=action_dict["target_resource"],
                parameters=action_dict.get("parameters", {}),
                risk_level=RiskLevel(action_dict.get("risk_level", "low"))
            )
            actions.append(action)

        return actions

    def _render_template(self, template: Dict, incident: Dict) -> Dict:
        """渲染模板"""
        import json
        import re

        template_str = json.dumps(template)

        # 替换变量 ${variable}
        def replace_var(match):
            var_name = match.group(1)
            return str(incident.get(var_name, match.group(0)))

        rendered_str = re.sub(r'\$\{(\w+)\}', replace_var, template_str)
        return json.loads(rendered_str)

1.3 常见修复场景

"""
常见自动修复场景
"""

# Pod 崩溃循环修复规则
pod_crashloop_rule = RemediationRule(
    rule_id="pod-crashloop-001",
    name="Pod CrashLoop 修复",
    description="自动重启崩溃循环的 Pod",
    conditions={
        "alert_name": "PodCrashLooping",
        "severity": ["high", "critical"]
    },
    actions_template=[
        {
            "name": "重启 Pod",
            "action_type": "restart_pod",
            "target_resource": "${pod_name}",
            "parameters": {
                "namespace": "${namespace}"
            },
            "risk_level": "low"
        }
    ],
    priority=1
)

# 内存不足扩容规则
memory_scale_rule = RemediationRule(
    rule_id="memory-scale-001",
    name="内存不足自动扩容",
    description="当内存使用超过阈值时自动扩容",
    conditions={
        "alert_name": "HighMemoryUsage",
        "metric_value": "regex:^(8[0-9]|9[0-9]|100)$"  # 80-100%
    },
    actions_template=[
        {
            "name": "增加副本数",
            "action_type": "scale_deployment",
            "target_resource": "${deployment_name}",
            "parameters": {
                "replicas": "${current_replicas + 2}"
            },
            "risk_level": "medium"
        }
    ],
    priority=2
)

# 部署回滚规则
deployment_rollback_rule = RemediationRule(
    rule_id="rollback-001",
    name="部署回滚",
    description="当新版本出现问题时自动回滚",
    conditions={
        "alert_name": ["DeploymentFailed", "HighErrorRate"],
        "time_since_deploy": "regex:^([0-9]|[1-5][0-9])$"  # 60分钟内
    },
    actions_template=[
        {
            "name": "回滚到上一版本",
            "action_type": "rollback_deployment",
            "target_resource": "${deployment_name}",
            "parameters": {
                "revision": None  # 回滚到上一版本
            },
            "risk_level": "medium"
        }
    ],
    priority=3
)

# 节点故障处理规则
node_failure_rule = RemediationRule(
    rule_id="node-failure-001",
    name="节点故障处理",
    description="自动隔离和排空故障节点",
    conditions={
        "alert_name": "NodeNotReady",
        "duration_minutes": "regex:^([5-9]|[1-9][0-9]+)$"  # 超过5分钟
    },
    actions_template=[
        {
            "name": "隔离节点",
            "action_type": "cordon_node",
            "target_resource": "${node_name}",
            "risk_level": "medium"
        },
        {
            "name": "排空节点",
            "action_type": "drain_node",
            "target_resource": "${node_name}",
            "risk_level": "high"
        }
    ],
    priority=4
)

2. 容量预测

2.1 时序预测模型

"""
容量预测模型
"""

import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn


@dataclass
class CapacityForecast:
    """容量预测结果"""
    resource: str
    metric: str
    forecast_horizon: int
    timestamps: List[float]
    predictions: List[float]
    lower_bound: List[float]
    upper_bound: List[float]
    confidence: float
    trend: str  # increasing, decreasing, stable
    saturation_time: Optional[float] = None  # 预计饱和时间


class ProphetForecaster:
    """基于 Prophet 的容量预测"""

    def __init__(self, seasonality_mode: str = 'multiplicative'):
        """
        初始化预测器

        Args:
            seasonality_mode: 季节性模式 (additive, multiplicative)
        """
        self.seasonality_mode = seasonality_mode
        self.model = None

    def fit(self, timestamps: np.ndarray, values: np.ndarray):
        """训练模型"""
        from prophet import Prophet

        # 准备数据
        df = pd.DataFrame({
            'ds': pd.to_datetime(timestamps, unit='s'),
            'y': values
        })

        self.model = Prophet(
            seasonality_mode=self.seasonality_mode,
            yearly_seasonality=True,
            weekly_seasonality=True,
            daily_seasonality=True
        )
        self.model.fit(df)

    def predict(self, horizon_hours: int = 24) -> CapacityForecast:
        """预测"""
        # 生成未来时间点
        future = self.model.make_future_dataframe(periods=horizon_hours, freq='H')
        forecast = self.model.predict(future)

        # 提取预测结果
        future_forecast = forecast.tail(horizon_hours)

        return CapacityForecast(
            resource="",
            metric="",
            forecast_horizon=horizon_hours,
            timestamps=future_forecast['ds'].astype(np.int64) // 10**9,
            predictions=future_forecast['yhat'].tolist(),
            lower_bound=future_forecast['yhat_lower'].tolist(),
            upper_bound=future_forecast['yhat_upper'].tolist(),
            confidence=0.95,
            trend=self._determine_trend(future_forecast['yhat'].values)
        )

    def _determine_trend(self, values: np.ndarray) -> str:
        """判断趋势"""
        slope = np.polyfit(range(len(values)), values, 1)[0]
        if slope > 0.1:
            return "increasing"
        elif slope < -0.1:
            return "decreasing"
        return "stable"


class LSTMForecaster(nn.Module):
    """LSTM 容量预测模型"""

    def __init__(self, input_size: int = 1, hidden_size: int = 64,
                 num_layers: int = 2, output_size: int = 24):
        super().__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                           batch_first=True, dropout=0.2)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        output = self.fc(lstm_out[:, -1, :])
        return output


class DeepCapacityForecaster:
    """深度学习容量预测"""

    def __init__(self, sequence_length: int = 168,  # 7天 (小时数据)
                 forecast_horizon: int = 24):
        """
        初始化预测器

        Args:
            sequence_length: 输入序列长度
            forecast_horizon: 预测时长
        """
        self.sequence_length = sequence_length
        self.forecast_horizon = forecast_horizon
        self.model = LSTMForecaster(output_size=forecast_horizon)
        self.scaler = StandardScaler()

    def fit(self, values: np.ndarray, epochs: int = 100,
            batch_size: int = 32, lr: float = 0.001):
        """训练模型"""
        # 标准化
        values_scaled = self.scaler.fit_transform(values.reshape(-1, 1)).flatten()

        # 创建训练数据
        X, y = self._create_sequences(values_scaled)
        X = torch.FloatTensor(X).unsqueeze(-1)
        y = torch.FloatTensor(y)

        optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
        criterion = nn.MSELoss()

        self.model.train()
        for epoch in range(epochs):
            total_loss = 0
            for i in range(0, len(X), batch_size):
                batch_X = X[i:i+batch_size]
                batch_y = y[i:i+batch_size]

                optimizer.zero_grad()
                outputs = self.model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

    def _create_sequences(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """创建序列数据"""
        X, y = [], []
        for i in range(len(data) - self.sequence_length - self.forecast_horizon + 1):
            X.append(data[i:i+self.sequence_length])
            y.append(data[i+self.sequence_length:i+self.sequence_length+self.forecast_horizon])
        return np.array(X), np.array(y)

    def predict(self, recent_values: np.ndarray,
                num_samples: int = 100) -> CapacityForecast:
        """
        预测 (带不确定性估计)

        Args:
            recent_values: 最近的值序列
            num_samples: MC Dropout 采样数

        Returns:
            CapacityForecast: 预测结果
        """
        # 标准化
        values_scaled = self.scaler.transform(recent_values.reshape(-1, 1)).flatten()

        # 取最后 sequence_length 个点
        if len(values_scaled) > self.sequence_length:
            values_scaled = values_scaled[-self.sequence_length:]

        X = torch.FloatTensor(values_scaled).unsqueeze(0).unsqueeze(-1)

        # MC Dropout 进行不确定性估计
        self.model.train()  # 保持 Dropout 激活
        predictions = []

        with torch.no_grad():
            for _ in range(num_samples):
                pred = self.model(X)
                predictions.append(pred.numpy()[0])

        predictions = np.array(predictions)

        # 反标准化
        mean_pred = self.scaler.inverse_transform(
            predictions.mean(axis=0).reshape(-1, 1)
        ).flatten()
        std_pred = predictions.std(axis=0) * self.scaler.scale_[0]

        # 计算置信区间
        lower = mean_pred - 1.96 * std_pred
        upper = mean_pred + 1.96 * std_pred

        return CapacityForecast(
            resource="",
            metric="",
            forecast_horizon=self.forecast_horizon,
            timestamps=[],
            predictions=mean_pred.tolist(),
            lower_bound=lower.tolist(),
            upper_bound=upper.tolist(),
            confidence=0.95,
            trend=self._determine_trend(mean_pred)
        )

    def _determine_trend(self, values: np.ndarray) -> str:
        """判断趋势"""
        slope = np.polyfit(range(len(values)), values, 1)[0]
        if slope > 0.1:
            return "increasing"
        elif slope < -0.1:
            return "decreasing"
        return "stable"


class CapacityPlanningService:
    """容量规划服务"""

    def __init__(self):
        self.forecasters: Dict[str, DeepCapacityForecaster] = {}
        self.thresholds: Dict[str, float] = {}

    def add_resource(self, resource_id: str, metric: str,
                     threshold: float = 0.8):
        """添加资源监控"""
        key = f"{resource_id}:{metric}"
        self.forecasters[key] = DeepCapacityForecaster()
        self.thresholds[key] = threshold

    def train(self, resource_id: str, metric: str,
              historical_data: np.ndarray):
        """训练预测模型"""
        key = f"{resource_id}:{metric}"
        if key in self.forecasters:
            self.forecasters[key].fit(historical_data)

    def predict_capacity(self, resource_id: str, metric: str,
                         recent_data: np.ndarray) -> CapacityForecast:
        """预测容量"""
        key = f"{resource_id}:{metric}"
        forecaster = self.forecasters.get(key)

        if forecaster is None:
            raise ValueError(f"资源 {key} 未注册")

        forecast = forecaster.predict(recent_data)
        forecast.resource = resource_id
        forecast.metric = metric

        # 计算饱和时间
        threshold = self.thresholds.get(key, 0.8)
        forecast.saturation_time = self._estimate_saturation_time(
            forecast.predictions,
            threshold
        )

        return forecast

    def _estimate_saturation_time(self, predictions: List[float],
                                   threshold: float) -> Optional[float]:
        """估计饱和时间"""
        for i, pred in enumerate(predictions):
            if pred >= threshold:
                return i  # 小时数
        return None

    def generate_capacity_report(self, forecasts: List[CapacityForecast]) -> Dict[str, Any]:
        """生成容量报告"""
        report = {
            "generated_at": time.time(),
            "resources": [],
            "alerts": [],
            "recommendations": []
        }

        for forecast in forecasts:
            resource_report = {
                "resource": forecast.resource,
                "metric": forecast.metric,
                "current_trend": forecast.trend,
                "saturation_time_hours": forecast.saturation_time,
                "predictions_24h": {
                    "min": min(forecast.predictions),
                    "max": max(forecast.predictions),
                    "avg": np.mean(forecast.predictions)
                }
            }
            report["resources"].append(resource_report)

            # 生成告警
            if forecast.saturation_time and forecast.saturation_time < 24:
                report["alerts"].append({
                    "resource": forecast.resource,
                    "severity": "high" if forecast.saturation_time < 6 else "medium",
                    "message": f"{forecast.resource} 的 {forecast.metric} "
                               f"预计在 {forecast.saturation_time} 小时内达到饱和"
                })

            # 生成建议
            if forecast.trend == "increasing":
                report["recommendations"].append({
                    "resource": forecast.resource,
                    "action": "scale_up",
                    "reason": f"{forecast.metric} 呈上升趋势"
                })

        return report

3. 弹性伸缩

3.1 智能 HPA

"""
智能弹性伸缩
"""

import numpy as np
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum


class ScalingDirection(Enum):
    """伸缩方向"""
    UP = "up"
    DOWN = "down"
    NONE = "none"


@dataclass
class ScalingDecision:
    """伸缩决策"""
    resource: str
    direction: ScalingDirection
    current_replicas: int
    target_replicas: int
    reason: str
    confidence: float
    metrics: Dict[str, float]


class PredictiveHPA:
    """预测性 HPA"""

    def __init__(self, forecaster: DeepCapacityForecaster,
                 target_utilization: float = 0.7,
                 min_replicas: int = 1,
                 max_replicas: int = 100,
                 scale_up_cooldown: int = 180,
                 scale_down_cooldown: int = 300):
        """
        初始化预测性 HPA

        Args:
            forecaster: 容量预测器
            target_utilization: 目标利用率
            min_replicas: 最小副本数
            max_replicas: 最大副本数
            scale_up_cooldown: 扩容冷却时间 (秒)
            scale_down_cooldown: 缩容冷却时间 (秒)
        """
        self.forecaster = forecaster
        self.target_utilization = target_utilization
        self.min_replicas = min_replicas
        self.max_replicas = max_replicas
        self.scale_up_cooldown = scale_up_cooldown
        self.scale_down_cooldown = scale_down_cooldown

        self.last_scale_time: Dict[str, float] = {}
        self.last_scale_direction: Dict[str, ScalingDirection] = {}

    def evaluate(self, resource: str,
                 current_metrics: Dict[str, float],
                 historical_metrics: np.ndarray,
                 current_replicas: int) -> ScalingDecision:
        """
        评估伸缩需求

        Args:
            resource: 资源名称
            current_metrics: 当前指标
            historical_metrics: 历史指标
            current_replicas: 当前副本数

        Returns:
            ScalingDecision: 伸缩决策
        """
        import time

        # 1. 预测未来负载
        forecast = self.forecaster.predict(historical_metrics)
        predicted_peak = max(forecast.predictions)

        # 2. 计算所需副本数
        # 基于预测峰值和目标利用率
        required_capacity = predicted_peak / self.target_utilization
        current_capacity = current_replicas

        target_replicas = int(np.ceil(required_capacity))
        target_replicas = max(self.min_replicas, min(self.max_replicas, target_replicas))

        # 3. 判断伸缩方向
        if target_replicas > current_replicas:
            direction = ScalingDirection.UP
            reason = f"预测峰值 {predicted_peak:.2f} 需要更多容量"
        elif target_replicas < current_replicas:
            direction = ScalingDirection.DOWN
            reason = f"预测负载下降,可以减少副本"
        else:
            direction = ScalingDirection.NONE
            reason = "当前容量适合预测负载"

        # 4. 检查冷却时间
        last_time = self.last_scale_time.get(resource, 0)
        last_direction = self.last_scale_direction.get(resource, ScalingDirection.NONE)

        cooldown = (self.scale_up_cooldown if direction == ScalingDirection.UP
                    else self.scale_down_cooldown)

        if time.time() - last_time < cooldown:
            direction = ScalingDirection.NONE
            target_replicas = current_replicas
            reason = f"冷却期内,跳过伸缩"

        # 5. 更新状态
        if direction != ScalingDirection.NONE:
            self.last_scale_time[resource] = time.time()
            self.last_scale_direction[resource] = direction

        return ScalingDecision(
            resource=resource,
            direction=direction,
            current_replicas=current_replicas,
            target_replicas=target_replicas,
            reason=reason,
            confidence=forecast.confidence,
            metrics={
                "predicted_peak": predicted_peak,
                "target_utilization": self.target_utilization,
                "current_utilization": current_metrics.get("cpu_utilization", 0)
            }
        )


class MultiMetricHPA:
    """多指标 HPA"""

    def __init__(self, metrics_config: List[Dict[str, Any]]):
        """
        初始化多指标 HPA

        Args:
            metrics_config: 指标配置列表
        """
        self.metrics_config = metrics_config
        self.weights = {m["name"]: m.get("weight", 1.0) for m in metrics_config}
        self.targets = {m["name"]: m["target"] for m in metrics_config}

    def evaluate(self, resource: str,
                 current_metrics: Dict[str, float],
                 current_replicas: int) -> ScalingDecision:
        """
        基于多指标评估伸缩

        Args:
            resource: 资源名称
            current_metrics: 当前指标值
            current_replicas: 当前副本数

        Returns:
            ScalingDecision: 伸缩决策
        """
        # 计算每个指标的期望副本数
        replica_suggestions = []
        total_weight = 0

        for metric_config in self.metrics_config:
            metric_name = metric_config["name"]
            target = metric_config["target"]
            weight = metric_config.get("weight", 1.0)

            current_value = current_metrics.get(metric_name)
            if current_value is None:
                continue

            # 计算基于该指标的期望副本数
            ratio = current_value / target
            suggested = int(np.ceil(current_replicas * ratio))

            replica_suggestions.append((suggested, weight))
            total_weight += weight

        if not replica_suggestions:
            return ScalingDecision(
                resource=resource,
                direction=ScalingDirection.NONE,
                current_replicas=current_replicas,
                target_replicas=current_replicas,
                reason="无可用指标",
                confidence=0,
                metrics=current_metrics
            )

        # 加权平均
        weighted_sum = sum(s * w for s, w in replica_suggestions)
        target_replicas = int(np.ceil(weighted_sum / total_weight))

        # 确定方向
        if target_replicas > current_replicas:
            direction = ScalingDirection.UP
        elif target_replicas < current_replicas:
            direction = ScalingDirection.DOWN
        else:
            direction = ScalingDirection.NONE

        return ScalingDecision(
            resource=resource,
            direction=direction,
            current_replicas=current_replicas,
            target_replicas=target_replicas,
            reason=f"基于 {len(replica_suggestions)} 个指标的加权决策",
            confidence=0.8,
            metrics=current_metrics
        )


class RLBasedAutoscaler:
    """基于强化学习的自动伸缩"""

    def __init__(self, state_dim: int, action_dim: int,
                 hidden_dim: int = 128):
        """
        初始化 RL 伸缩器

        Args:
            state_dim: 状态维度
            action_dim: 动作维度 (伸缩级别)
            hidden_dim: 隐藏层维度
        """
        self.state_dim = state_dim
        self.action_dim = action_dim

        # DQN 网络
        self.q_network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

        self.target_network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

        self.optimizer = torch.optim.Adam(self.q_network.parameters())
        self.memory = []
        self.gamma = 0.99
        self.epsilon = 0.1

    def get_state(self, metrics: Dict[str, float],
                  current_replicas: int) -> np.ndarray:
        """构建状态向量"""
        return np.array([
            metrics.get("cpu_utilization", 0),
            metrics.get("memory_utilization", 0),
            metrics.get("request_rate", 0),
            metrics.get("latency_p99", 0),
            current_replicas,
            metrics.get("pending_requests", 0)
        ])

    def select_action(self, state: np.ndarray,
                      epsilon: Optional[float] = None) -> int:
        """
        选择动作

        动作空间:
        0: 保持不变
        1: +1 副本
        2: +2 副本
        3: -1 副本
        4: -2 副本
        """
        if epsilon is None:
            epsilon = self.epsilon

        if np.random.random() < epsilon:
            return np.random.randint(self.action_dim)

        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            q_values = self.q_network(state_tensor)
        return q_values.argmax().item()

    def action_to_delta(self, action: int) -> int:
        """将动作转换为副本变化量"""
        action_map = {0: 0, 1: 1, 2: 2, 3: -1, 4: -2}
        return action_map.get(action, 0)

    def calculate_reward(self, metrics: Dict[str, float],
                         action: int) -> float:
        """
        计算奖励

        奖励设计:
        - 保持目标利用率 -> 正奖励
        - 过度伸缩 -> 负奖励
        - SLA 违规 -> 大负奖励
        """
        cpu_util = metrics.get("cpu_utilization", 0)
        latency = metrics.get("latency_p99", 0)

        # 利用率奖励 (目标 70%)
        util_reward = -abs(cpu_util - 0.7) * 10

        # 延迟惩罚
        latency_threshold = 500  # ms
        latency_penalty = max(0, (latency - latency_threshold) / 100) * -5

        # 伸缩成本
        delta = abs(self.action_to_delta(action))
        scale_cost = -delta * 0.5

        return util_reward + latency_penalty + scale_cost

    def train(self, batch_size: int = 32):
        """训练模型"""
        if len(self.memory) < batch_size:
            return

        # 采样批次
        indices = np.random.choice(len(self.memory), batch_size, replace=False)
        batch = [self.memory[i] for i in indices]

        states = torch.FloatTensor([t[0] for t in batch])
        actions = torch.LongTensor([t[1] for t in batch])
        rewards = torch.FloatTensor([t[2] for t in batch])
        next_states = torch.FloatTensor([t[3] for t in batch])
        dones = torch.FloatTensor([t[4] for t in batch])

        # 计算 Q 值
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1))

        with torch.no_grad():
            next_q = self.target_network(next_states).max(1)[0]
            target_q = rewards + self.gamma * next_q * (1 - dones)

        # 更新网络
        loss = nn.MSELoss()(current_q.squeeze(), target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_network(self):
        """更新目标网络"""
        self.target_network.load_state_dict(self.q_network.state_dict())

4. 成本优化

4.1 资源成本分析

"""
资源成本分析与优化
"""

from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
import numpy as np


@dataclass
class ResourceCost:
    """资源成本"""
    resource_id: str
    resource_type: str
    hourly_cost: float
    monthly_cost: float
    utilization: float
    waste_percentage: float
    optimization_potential: float


@dataclass
class CostOptimizationRecommendation:
    """成本优化建议"""
    recommendation_id: str
    resource_id: str
    action: str
    estimated_savings_monthly: float
    risk_level: str
    implementation_effort: str
    description: str


class CostAnalyzer:
    """成本分析器"""

    def __init__(self, pricing: Dict[str, float]):
        """
        初始化成本分析器

        Args:
            pricing: 定价信息 {资源类型: 每小时成本}
        """
        self.pricing = pricing

    def analyze_resource(self, resource_id: str,
                         resource_type: str,
                         metrics: Dict[str, float],
                         instance_count: int = 1) -> ResourceCost:
        """
        分析单个资源成本

        Args:
            resource_id: 资源 ID
            resource_type: 资源类型
            metrics: 使用指标
            instance_count: 实例数量

        Returns:
            ResourceCost: 成本分析结果
        """
        hourly_rate = self.pricing.get(resource_type, 0)
        hourly_cost = hourly_rate * instance_count
        monthly_cost = hourly_cost * 24 * 30

        # 计算利用率
        cpu_util = metrics.get("cpu_utilization", 0)
        mem_util = metrics.get("memory_utilization", 0)
        avg_utilization = (cpu_util + mem_util) / 2

        # 计算浪费比例
        waste_percentage = max(0, 1 - avg_utilization)

        # 优化潜力 (可节省的成本比例)
        optimization_potential = waste_percentage * 0.8  # 保守估计

        return ResourceCost(
            resource_id=resource_id,
            resource_type=resource_type,
            hourly_cost=hourly_cost,
            monthly_cost=monthly_cost,
            utilization=avg_utilization,
            waste_percentage=waste_percentage,
            optimization_potential=optimization_potential
        )

    def analyze_cluster(self, resources: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        分析集群成本

        Args:
            resources: 资源列表

        Returns:
            Dict: 集群成本分析
        """
        total_cost = 0
        total_waste = 0
        resource_costs = []

        for resource in resources:
            cost = self.analyze_resource(
                resource["id"],
                resource["type"],
                resource["metrics"],
                resource.get("count", 1)
            )
            resource_costs.append(cost)
            total_cost += cost.monthly_cost
            total_waste += cost.monthly_cost * cost.waste_percentage

        return {
            "total_monthly_cost": total_cost,
            "total_waste": total_waste,
            "waste_percentage": total_waste / total_cost if total_cost > 0 else 0,
            "resource_breakdown": resource_costs,
            "top_wasters": sorted(
                resource_costs,
                key=lambda x: x.monthly_cost * x.waste_percentage,
                reverse=True
            )[:5]
        }


class CostOptimizer:
    """成本优化器"""

    def __init__(self, analyzer: CostAnalyzer):
        self.analyzer = analyzer

    def generate_recommendations(self,
                                  cluster_analysis: Dict[str, Any]) -> List[CostOptimizationRecommendation]:
        """
        生成优化建议

        Args:
            cluster_analysis: 集群分析结果

        Returns:
            List: 优化建议列表
        """
        recommendations = []
        rec_id = 0

        for cost in cluster_analysis["resource_breakdown"]:
            # 右大小调整建议
            if cost.utilization < 0.3:
                rec_id += 1
                savings = cost.monthly_cost * 0.5
                recommendations.append(CostOptimizationRecommendation(
                    recommendation_id=f"rec-{rec_id}",
                    resource_id=cost.resource_id,
                    action="rightsize_down",
                    estimated_savings_monthly=savings,
                    risk_level="low",
                    implementation_effort="medium",
                    description=f"利用率仅 {cost.utilization:.1%},建议缩小规格或减少副本"
                ))

            # 使用 Spot/抢占式实例
            if cost.resource_type in ["vm", "ec2", "gce"]:
                rec_id += 1
                savings = cost.monthly_cost * 0.6
                recommendations.append(CostOptimizationRecommendation(
                    recommendation_id=f"rec-{rec_id}",
                    resource_id=cost.resource_id,
                    action="use_spot_instances",
                    estimated_savings_monthly=savings,
                    risk_level="medium",
                    implementation_effort="high",
                    description="考虑使用 Spot/抢占式实例节省成本"
                ))

            # 预留实例
            if cost.utilization > 0.7 and cost.monthly_cost > 100:
                rec_id += 1
                savings = cost.monthly_cost * 0.3
                recommendations.append(CostOptimizationRecommendation(
                    recommendation_id=f"rec-{rec_id}",
                    resource_id=cost.resource_id,
                    action="reserved_instance",
                    estimated_savings_monthly=savings,
                    risk_level="low",
                    implementation_effort="low",
                    description="高利用率资源,建议购买预留实例"
                ))

        # 按节省金额排序
        recommendations.sort(key=lambda x: x.estimated_savings_monthly, reverse=True)

        return recommendations

    def optimize_scaling_policy(self,
                                 historical_usage: np.ndarray,
                                 current_replicas: int) -> Dict[str, Any]:
        """
        优化伸缩策略

        Args:
            historical_usage: 历史使用数据
            current_replicas: 当前副本数

        Returns:
            Dict: 优化后的伸缩策略
        """
        # 分析使用模式
        usage_percentiles = {
            "p50": np.percentile(historical_usage, 50),
            "p90": np.percentile(historical_usage, 90),
            "p99": np.percentile(historical_usage, 99),
            "max": np.max(historical_usage)
        }

        # 计算最优副本数范围
        min_replicas = max(1, int(usage_percentiles["p50"] / 0.8))
        max_replicas = int(usage_percentiles["p99"] / 0.7) + 1

        # 建议的伸缩策略
        policy = {
            "min_replicas": min_replicas,
            "max_replicas": max_replicas,
            "target_utilization": 0.7,
            "scale_up_threshold": 0.8,
            "scale_down_threshold": 0.5,
            "scale_up_cooldown": 180,
            "scale_down_cooldown": 300,
            "usage_analysis": usage_percentiles,
            "current_vs_optimal": {
                "current_replicas": current_replicas,
                "suggested_replicas": int((min_replicas + max_replicas) / 2),
                "over_provisioned": current_replicas > max_replicas
            }
        }

        return policy


class FinOpsIntegration:
    """FinOps 集成"""

    def __init__(self):
        self.budgets: Dict[str, float] = {}
        self.alerts: List[Dict[str, Any]] = []

    def set_budget(self, team: str, monthly_budget: float):
        """设置预算"""
        self.budgets[team] = monthly_budget

    def check_budget(self, team: str, current_spend: float) -> Dict[str, Any]:
        """
        检查预算

        Args:
            team: 团队
            current_spend: 当前支出

        Returns:
            Dict: 预算状态
        """
        budget = self.budgets.get(team, float('inf'))
        percentage_used = current_spend / budget if budget > 0 else 0

        status = {
            "team": team,
            "budget": budget,
            "current_spend": current_spend,
            "remaining": budget - current_spend,
            "percentage_used": percentage_used,
            "status": "ok"
        }

        if percentage_used >= 1.0:
            status["status"] = "exceeded"
            self.alerts.append({
                "team": team,
                "type": "budget_exceeded",
                "message": f"团队 {team} 已超出预算"
            })
        elif percentage_used >= 0.9:
            status["status"] = "warning"
            self.alerts.append({
                "team": team,
                "type": "budget_warning",
                "message": f"团队 {team} 预算使用已达 {percentage_used:.0%}"
            })

        return status

    def generate_cost_report(self, team_costs: Dict[str, float]) -> Dict[str, Any]:
        """
        生成成本报告

        Args:
            team_costs: 各团队成本

        Returns:
            Dict: 成本报告
        """
        total_cost = sum(team_costs.values())

        report = {
            "period": "monthly",
            "total_cost": total_cost,
            "by_team": [],
            "budget_status": [],
            "recommendations": []
        }

        for team, cost in team_costs.items():
            budget_status = self.check_budget(team, cost)
            report["by_team"].append({
                "team": team,
                "cost": cost,
                "percentage_of_total": cost / total_cost if total_cost > 0 else 0
            })
            report["budget_status"].append(budget_status)

        return report

5. 综合决策系统

5.1 决策引擎

"""
综合运维决策引擎
"""

from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio


class DecisionType(Enum):
    """决策类型"""
    REMEDIATION = "remediation"
    SCALING = "scaling"
    COST_OPTIMIZATION = "cost_optimization"
    CAPACITY_PLANNING = "capacity_planning"


@dataclass
class OpsDecision:
    """运维决策"""
    decision_id: str
    decision_type: DecisionType
    priority: int
    resource: str
    action: str
    parameters: Dict[str, Any]
    confidence: float
    estimated_impact: str
    requires_approval: bool
    auto_execute: bool


class IntelligentOpsEngine:
    """智能运维引擎"""

    def __init__(self):
        # 子系统
        self.remediation_engine: Optional[AutoRemediationEngine] = None
        self.capacity_planner: Optional[CapacityPlanningService] = None
        self.autoscaler: Optional[PredictiveHPA] = None
        self.cost_optimizer: Optional[CostOptimizer] = None

        # 决策队列
        self.pending_decisions: List[OpsDecision] = []
        self.executed_decisions: List[OpsDecision] = []

        # 配置
        self.auto_execute_threshold = 0.8  # 置信度阈值

    def register_remediation_engine(self, engine: AutoRemediationEngine):
        """注册修复引擎"""
        self.remediation_engine = engine

    def register_capacity_planner(self, planner: CapacityPlanningService):
        """注册容量规划器"""
        self.capacity_planner = planner

    def register_autoscaler(self, scaler: PredictiveHPA):
        """注册自动伸缩器"""
        self.autoscaler = scaler

    def register_cost_optimizer(self, optimizer: CostOptimizer):
        """注册成本优化器"""
        self.cost_optimizer = optimizer

    async def process_event(self, event: Dict[str, Any]) -> List[OpsDecision]:
        """
        处理事件并生成决策

        Args:
            event: 事件数据

        Returns:
            List[OpsDecision]: 决策列表
        """
        decisions = []

        event_type = event.get("type")

        if event_type == "incident":
            # 生成修复决策
            if self.remediation_engine:
                results = await self.remediation_engine.handle_incident(event)
                for result in results:
                    decision = OpsDecision(
                        decision_id=result.action_id,
                        decision_type=DecisionType.REMEDIATION,
                        priority=1,
                        resource=event.get("resource", ""),
                        action="remediate",
                        parameters={"result": result},
                        confidence=0.9,
                        estimated_impact="恢复服务",
                        requires_approval=False,
                        auto_execute=True
                    )
                    decisions.append(decision)

        elif event_type == "capacity_alert":
            # 生成容量决策
            if self.capacity_planner and self.autoscaler:
                resource = event.get("resource")
                metrics = event.get("metrics", {})
                historical = event.get("historical_data", np.array([]))

                scaling_decision = self.autoscaler.evaluate(
                    resource,
                    metrics,
                    historical,
                    event.get("current_replicas", 1)
                )

                if scaling_decision.direction != ScalingDirection.NONE:
                    decision = OpsDecision(
                        decision_id=str(uuid.uuid4()),
                        decision_type=DecisionType.SCALING,
                        priority=2,
                        resource=resource,
                        action=f"scale_{scaling_decision.direction.value}",
                        parameters={
                            "current": scaling_decision.current_replicas,
                            "target": scaling_decision.target_replicas
                        },
                        confidence=scaling_decision.confidence,
                        estimated_impact=scaling_decision.reason,
                        requires_approval=scaling_decision.confidence < 0.7,
                        auto_execute=scaling_decision.confidence >= self.auto_execute_threshold
                    )
                    decisions.append(decision)

        elif event_type == "cost_review":
            # 生成成本优化决策
            if self.cost_optimizer:
                recommendations = self.cost_optimizer.generate_recommendations(
                    event.get("cluster_analysis", {})
                )

                for rec in recommendations[:3]:  # 取前3个建议
                    decision = OpsDecision(
                        decision_id=rec.recommendation_id,
                        decision_type=DecisionType.COST_OPTIMIZATION,
                        priority=3,
                        resource=rec.resource_id,
                        action=rec.action,
                        parameters={
                            "savings": rec.estimated_savings_monthly,
                            "description": rec.description
                        },
                        confidence=0.8,
                        estimated_impact=f"节省 ${rec.estimated_savings_monthly:.0f}/月",
                        requires_approval=True,
                        auto_execute=False
                    )
                    decisions.append(decision)

        # 排序决策
        decisions.sort(key=lambda d: d.priority)

        # 添加到队列
        self.pending_decisions.extend(decisions)

        return decisions

    async def execute_decision(self, decision: OpsDecision) -> Dict[str, Any]:
        """
        执行决策

        Args:
            decision: 决策

        Returns:
            Dict: 执行结果
        """
        result = {
            "decision_id": decision.decision_id,
            "status": "pending",
            "message": ""
        }

        try:
            if decision.decision_type == DecisionType.REMEDIATION:
                # 修复已在 handle_incident 中执行
                result["status"] = "success"
                result["message"] = "修复已执行"

            elif decision.decision_type == DecisionType.SCALING:
                # 执行伸缩
                # 实际实现会调用 K8s API
                result["status"] = "success"
                result["message"] = f"已将 {decision.resource} 伸缩到 {decision.parameters['target']} 副本"

            elif decision.decision_type == DecisionType.COST_OPTIMIZATION:
                # 成本优化需要人工执行
                result["status"] = "pending_manual"
                result["message"] = "成本优化建议需要人工执行"

            # 记录执行
            self.executed_decisions.append(decision)

        except Exception as e:
            result["status"] = "failed"
            result["message"] = str(e)

        return result

    async def run(self):
        """运行决策引擎"""
        while True:
            # 处理待执行的决策
            for decision in self.pending_decisions:
                if decision.auto_execute:
                    await self.execute_decision(decision)
                    self.pending_decisions.remove(decision)

            await asyncio.sleep(10)

总结

本章深入探讨了智能运维决策的核心能力:

  1. 自动修复:规则引擎、执行器、验证机制
  2. 容量预测:Prophet、LSTM、不确定性估计
  3. 弹性伸缩:预测性 HPA、多指标、强化学习
  4. 成本优化:资源分析、优化建议、FinOps

关键要点:

  • 自动修复需要完善的风险评估和回滚机制
  • 容量预测应考虑季节性和趋势变化
  • 弹性伸缩要平衡响应速度和稳定性
  • 成本优化是持续过程,需要组织配合

下一章将探讨 AIOps 平台实战,包括平台架构、集成方案和最佳实践。

Prev
03-根因分析与告警聚合
Next
05-AIOps平台实战