监控与告警系统
1. 监控体系架构
┌──────────────────────────────────────────────────────────┐
│ 应用层监控 │
│ 业务指标、API QPS、订单TPS、成交量、用户数 │
└────────────────────┬─────────────────────────────────────┘
│
┌────────────────────┴─────────────────────────────────────┐
│ 基础设施监控 │
│ CPU、内存、磁盘、网络、数据库连接池、Redis连接 │
└────────────────────┬─────────────────────────────────────┘
│
┌────────────────────┴─────────────────────────────────────┐
│ 日志监控 │
│ 错误日志、访问日志、审计日志、慢查询日志 │
└────────────────────┬─────────────────────────────────────┘
│
┌────────────────────┴─────────────────────────────────────┐
│ 链路追踪 │
│ 分布式调用链、性能瓶颈、依赖关系 │
└─────────────────────────────────────────────────────────┘
2. Prometheus监控
2.1 指标类型
Counter(计数器) - 单调递增
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// 订单总数
orderTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "exchange_orders_total",
Help: "Total number of orders",
},
[]string{"symbol", "side", "type"},
)
// API请求总数
apiRequestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "exchange_api_requests_total",
Help: "Total number of API requests",
},
[]string{"method", "endpoint", "status"},
)
)
// 使用示例
func PlaceOrder(order *Order) error {
// 业务逻辑
// ...
// 记录指标
orderTotal.WithLabelValues(order.Symbol, order.Side, order.Type).Inc()
return nil
}
Gauge(仪表盘) - 可增可减
var (
// 当前活跃订单数
activeOrders = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "exchange_active_orders",
Help: "Number of active orders",
},
[]string{"symbol"},
)
// 用户在线数
onlineUsers = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "exchange_online_users",
Help: "Number of online users",
},
)
// 数据库连接池
dbConnections = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "exchange_db_connections",
Help: "Database connection pool stats",
},
[]string{"state"}, // active, idle, waiting
)
)
// 使用示例
func UpdateActiveOrders(symbol string, count int) {
activeOrders.WithLabelValues(symbol).Set(float64(count))
}
func UpdateDBConnectionStats(stats *sql.DBStats) {
dbConnections.WithLabelValues("active").Set(float64(stats.InUse))
dbConnections.WithLabelValues("idle").Set(float64(stats.Idle))
dbConnections.WithLabelValues("waiting").Set(float64(stats.WaitCount))
}
Histogram(直方图) - 分布统计
var (
// API响应时间
apiDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "exchange_api_duration_seconds",
Help: "API request duration",
Buckets: prometheus.DefBuckets, // [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
},
[]string{"endpoint"},
)
// 订单处理延迟
orderProcessingDuration = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "exchange_order_processing_seconds",
Help: "Order processing duration",
Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1},
},
)
)
// 使用示例
func APIHandler(w http.ResponseWriter, r *http.Request) {
start := time.Now()
defer func() {
duration := time.Since(start).Seconds()
apiDuration.WithLabelValues(r.URL.Path).Observe(duration)
}()
// 处理请求
// ...
}
Summary(摘要) - 分位数统计
var (
// 撮合引擎延迟
matchingLatency = promauto.NewSummaryVec(
prometheus.SummaryOpts{
Name: "exchange_matching_latency_seconds",
Help: "Matching engine latency",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, // P50, P90, P99
},
[]string{"symbol"},
)
)
func ProcessOrder(symbol string) {
start := time.Now()
defer func() {
latency := time.Since(start).Seconds()
matchingLatency.WithLabelValues(symbol).Observe(latency)
}()
// 撮合逻辑
// ...
}
2.2 自定义业务指标
package metrics
import (
"database/sql"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// 交易量
tradeVolume = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "exchange_trade_volume",
Help: "Trading volume in quote currency",
},
[]string{"symbol"},
)
// 手续费收入
feeRevenue = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "exchange_fee_revenue",
Help: "Fee revenue in USDT",
},
[]string{"currency"},
)
// 充值提现
deposits = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "exchange_deposits_total",
Help: "Total deposits",
},
[]string{"currency", "status"},
)
withdrawals = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "exchange_withdrawals_total",
Help: "Total withdrawals",
},
[]string{"currency", "status"},
)
// 资金总量
totalBalance = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "exchange_total_balance",
Help: "Total balance in USDT equivalent",
},
[]string{"currency"},
)
)
type MetricsCollector struct {
db *sql.DB
}
func NewMetricsCollector(db *sql.DB) *MetricsCollector {
mc := &MetricsCollector{db: db}
// 启动定时采集
go mc.collectPeriodically()
return mc
}
func (mc *MetricsCollector) collectPeriodically() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
mc.collectTotalBalance()
mc.collectActiveOrders()
}
}
func (mc *MetricsCollector) collectTotalBalance() {
rows, err := mc.db.Query(`
SELECT currency, SUM(available + frozen) as total
FROM account_balances
GROUP BY currency
`)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var currency string
var total float64
rows.Scan(¤cy, &total)
totalBalance.WithLabelValues(currency).Set(total)
}
}
func (mc *MetricsCollector) collectActiveOrders() {
rows, err := mc.db.Query(`
SELECT symbol, COUNT(*) as count
FROM orders
WHERE status IN ('pending', 'partial')
GROUP BY symbol
`)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var symbol string
var count int
rows.Scan(&symbol, &count)
activeOrders.WithLabelValues(symbol).Set(float64(count))
}
}
// 记录成交
func (mc *MetricsCollector) RecordTrade(trade *Trade) {
// 记录交易量
tradeVolume.WithLabelValues(trade.Symbol).Add(trade.Amount)
// 记录手续费
feeRevenue.WithLabelValues("USDT").Add(trade.BuyFee + trade.SellFee)
}
// 记录充值
func (mc *MetricsCollector) RecordDeposit(currency, status string, amount float64) {
deposits.WithLabelValues(currency, status).Inc()
}
// 记录提现
func (mc *MetricsCollector) RecordWithdrawal(currency, status string, amount float64) {
withdrawals.WithLabelValues(currency, status).Inc()
}
2.3 Exporter暴露指标
package main
import (
"log"
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func main() {
// 初始化指标采集器
collector := NewMetricsCollector(db)
// 暴露metrics端点
http.Handle("/metrics", promhttp.Handler())
log.Println("Metrics server listening on :9090")
log.Fatal(http.ListenAndServe(":9090", nil))
}
2.4 Prometheus配置
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
# 告警规则文件
rule_files:
- 'alerts.yml'
# 抓取配置
scrape_configs:
# 交易所服务
- job_name: 'exchange-api'
static_configs:
- targets: ['api-01:9090', 'api-02:9090', 'api-03:9090']
- job_name: 'exchange-matching'
static_configs:
- targets: ['matching-01:9090', 'matching-02:9090']
# 基础设施
- job_name: 'node-exporter'
static_configs:
- targets: ['node-01:9100', 'node-02:9100']
- job_name: 'mysql-exporter'
static_configs:
- targets: ['mysql-exporter:9104']
- job_name: 'redis-exporter'
static_configs:
- targets: ['redis-exporter:9121']
# 告警管理器
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
3. 告警规则
3.1 告警规则配置
# alerts.yml
groups:
- name: exchange_alerts
interval: 30s
rules:
# API可用性
- alert: APIHighErrorRate
expr: |
sum(rate(exchange_api_requests_total{status=~"5.."}[5m])) /
sum(rate(exchange_api_requests_total[5m])) > 0.05
for: 2m
labels:
severity: critical
annotations:
summary: "API error rate > 5%"
description: "API error rate is {{ $value | humanizePercentage }}"
# API响应时间
- alert: APIHighLatency
expr: |
histogram_quantile(0.99,
rate(exchange_api_duration_seconds_bucket[5m])
) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "API P99 latency > 1s"
description: "API P99 latency is {{ $value }}s"
# 撮合引擎延迟
- alert: MatchingEngineSlowdown
expr: |
exchange_matching_latency_seconds{quantile="0.99"} > 0.01
for: 3m
labels:
severity: critical
annotations:
summary: "Matching engine P99 latency > 10ms"
description: "Symbol {{ $labels.symbol }} matching latency is {{ $value }}s"
# 数据库连接池
- alert: DatabaseConnectionPoolExhausted
expr: exchange_db_connections{state="waiting"} > 10
for: 2m
labels:
severity: warning
annotations:
summary: "Database connection pool exhausted"
description: "{{ $value }} connections waiting"
# 磁盘空间
- alert: DiskSpaceLow
expr: |
(node_filesystem_avail_bytes{mountpoint="/"} /
node_filesystem_size_bytes{mountpoint="/"}) < 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "Disk space < 10%"
description: "Disk space on {{ $labels.instance }} is {{ $value | humanizePercentage }}"
# 内存使用
- alert: HighMemoryUsage
expr: |
(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "Memory usage > 90%"
description: "Memory usage on {{ $labels.instance }} is {{ $value | humanizePercentage }}"
# CPU使用
- alert: HighCPUUsage
expr: |
100 - (avg by (instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80
for: 10m
labels:
severity: warning
annotations:
summary: "CPU usage > 80%"
description: "CPU usage on {{ $labels.instance }} is {{ $value }}%"
# Redis连接失败
- alert: RedisDown
expr: redis_up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Redis is down"
description: "Redis instance {{ $labels.instance }} is down"
# Kafka消费延迟
- alert: KafkaConsumerLag
expr: kafka_consumer_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka consumer lag > 10k"
description: "Consumer {{ $labels.consumer_group }} lag is {{ $value }}"
# 异常交易量
- alert: AbnormalTradingVolume
expr: |
rate(exchange_trade_volume[5m]) >
(avg_over_time(rate(exchange_trade_volume[1h])[1d:5m]) * 3)
for: 5m
labels:
severity: warning
annotations:
summary: "Abnormal trading volume detected"
description: "Trading volume for {{ $labels.symbol }} is {{ $value }} (3x normal)"
3.2 AlertManager配置
# alertmanager.yml
global:
resolve_timeout: 5m
# 路由规则
route:
group_by: ['alertname', 'cluster']
group_wait: 10s
group_interval: 10s
repeat_interval: 12h
receiver: 'default'
routes:
# 严重告警:立即通知
- match:
severity: critical
receiver: 'critical-alerts'
continue: true汇总通知
- match:
severity: warning
receiver: 'warning-alerts'
group_wait: 30s
group_interval: 5m
# 接收器
receivers:
- name: 'default'
webhook_configs:
- url: 'http://alerting-service:8080/webhook'
- name: 'critical-alerts'
# 电话告警
webhook_configs:
- url: 'http://phone-alert-service:8080/call'
send_resolved: false
# 短信告警
webhook_configs:
- url: 'http://sms-service:8080/send'
# 企业微信
wechat_configs:
- corp_id: 'your_corp_id'
to_party: '1'
agent_id: 'your_agent_id'
api_secret: 'your_api_secret'
- name: 'warning-alerts'
# 邮件告警
email_configs:
- to: 'ops@exchange.com'
from: 'alert@exchange.com'
smarthost: 'smtp.gmail.com:587'
auth_username: 'alert@exchange.com'
auth_password: 'your_password'
# 抑制规则
inhibit_rules:
# 如果critical触发,抑制warning
- source_match:
severity: 'critical'
target_match:
severity: 'warning'
equal: ['alertname', 'instance']
4. 日志系统
4.1 结构化日志
package logger
import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var log *zap.Logger
func InitLogger() {
config := zap.NewProductionConfig()
// 自定义日志级别
config.Level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
// 日志格式
config.Encoding = "json"
// 输出路径
config.OutputPaths = []string{
"stdout",
"/var/log/exchange/app.log",
}
// 错误日志单独输出
config.ErrorOutputPaths = []string{
"stderr",
"/var/log/exchange/error.log",
}
log, _ = config.Build()
}
func Info(msg string, fields ...zap.Field) {
log.Info(msg, fields...)
}
func Error(msg string, fields ...zap.Field) {
log.Error(msg, fields...)
}
func Fatal(msg string, fields ...zap.Field) {
log.Fatal(msg, fields...)
}
// 使用示例
func PlaceOrder(order *Order) error {
logger.Info("order placed",
zap.String("order_id", order.OrderID),
zap.Int64("user_id", order.UserID),
zap.String("symbol", order.Symbol),
zap.String("side", order.Side),
zap.Float64("price", order.Price),
zap.Float64("quantity", order.Quantity),
)
// 业务逻辑
// ...
return nil
}
4.2 ELK日志收集
Filebeat配置(收集日志):
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/exchange/*.log
fields:
service: exchange-api
env: production
json.keys_under_root: true
json.add_error_key: true
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "exchange-logs-%{+yyyy.MM.dd}"
setup.template.name: "exchange"
setup.template.pattern: "exchange-*"
Logstash配置(日志处理):
# logstash.conf
input {
beats {
port => 5044
}
}
filter {
# 解析JSON日志
json {
source => "message"
}
# 提取字段
mutate {
add_field => {
"[@metadata][index]" => "exchange-logs-%{+YYYY.MM.dd}"
}
}
# 地理位置解析
geoip {
source => "ip"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][index]}"
}
}
Kibana查询示例:
# 查询错误日志
level:error AND service:exchange-api
# 查询特定用户的操作
user_id:1001 AND action:place_order
# 查询慢查询
duration:>1000 AND endpoint:/api/orders
# 聚合统计:每分钟错误数
5. 分布式链路追踪
5.1 Jaeger集成
package tracing
import (
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
)
func InitTracing(serviceName string) (opentracing.Tracer, io.Closer, error) {
cfg := &config.Configuration{
ServiceName: serviceName,
// 采样策略
Sampler: &config.SamplerConfig{
Type: "probabilistic",
Param: 0.1, // 采样10%的请求
},
// 报告配置
Reporter: &config.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: "jaeger-agent:6831",
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
return nil, nil, err
}
opentracing.SetGlobalTracer(tracer)
return tracer, closer, nil
}
使用示例:
func PlaceOrderHandler(w http.ResponseWriter, r *http.Request) {
// 1. 从HTTP请求中提取span context
spanCtx, _ := opentracing.GlobalTracer().Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(r.Header),
)
// 2. 创建新的span
span := opentracing.GlobalTracer().StartSpan(
"place_order",
opentracing.ChildOf(spanCtx),
)
defer span.Finish()
// 3. 添加tags
span.SetTag("user_id", getUserID(r))
span.SetTag("symbol", r.FormValue("symbol"))
// 4. 调用下游服务,传递span context
ctx := opentracing.ContextWithSpan(r.Context(), span)
err := placeOrder(ctx, order)
if err != nil {
// 记录错误
span.SetTag("error", true)
span.LogKV("error", err.Error())
}
}
func placeOrder(ctx context.Context, order *Order) error {
// 创建子span
span, _ := opentracing.StartSpanFromContext(ctx, "check_balance")
defer span.Finish()
// 检查余额
// ...
// 调用撮合引擎
matchSpan, _ := opentracing.StartSpanFromContext(ctx, "matching_engine")
defer matchSpan.Finish()
// 撮合逻辑
// ...
return nil
}
6. 业务监控大盘
6.1 Grafana Dashboard配置
{
"dashboard": {
"title": "Exchange Overview",
"panels": [
{
"title": "API QPS",
"targets": [
{
"expr": "sum(rate(exchange_api_requests_total[1m])) by (endpoint)"
}
]
},
{
"title": "API Latency (P99)",
"targets": [
{
"expr": "histogram_quantile(0.99, rate(exchange_api_duration_seconds_bucket[5m]))"
}
]
},
{
"title": "Orders Per Second",
"targets": [
{
"expr": "sum(rate(exchange_orders_total[1m])) by (symbol)"
}
]
},
{
"title": "Active Orders",
"targets": [
{
"expr": "sum(exchange_active_orders) by (symbol)"
}
]
},
{
"title": "Trading Volume (24h)",
"targets": [
{
"expr": "sum(increase(exchange_trade_volume[24h])) by (symbol)"
}
]
},
{
"title": "Fee Revenue (24h)",
"targets": [
{
"expr": "sum(increase(exchange_fee_revenue[24h]))"
}
]
},
{
"title": "Online Users",
"targets": [
{
"expr": "exchange_online_users"
}
]
},
{
"title": "Database Connections",
"targets": [
{
"expr": "exchange_db_connections"
}
]
}
]
}
}
6.2 PromQL查询示例
# API请求总数(按endpoint)
sum(rate(exchange_api_requests_total[5m])) by (endpoint)
# API错误率
sum(rate(exchange_api_requests_total{status=~"5.."}[5m])) /
sum(rate(exchange_api_requests_total[5m]))
# 订单处理延迟P99
histogram_quantile(0.99, rate(exchange_order_processing_seconds_bucket[5m]))
# 24小时交易量(按交易对)
sum(increase(exchange_trade_volume[24h])) by (symbol)
# 数据库连接池使用率
exchange_db_connections{state="active"} /
(exchange_db_connections{state="active"} + exchange_db_connections{state="idle"})
# CPU使用率
100 - (avg(rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)
# 内存使用率
(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100
# Kafka消费速率
rate(kafka_consumer_messages_consumed_total[5m])
7. 告警处理流程
7.1 告警webhook处理
package alerting
import (
"encoding/json"
"net/http"
)
type AlertWebhook struct {
notifier *Notifier
}
type Alert struct {
Status string `json:"status"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
StartsAt string `json:"startsAt"`
EndsAt string `json:"endsAt"`
}
type AlertMessage struct {
Version string `json:"version"`
GroupKey string `json:"groupKey"`
Status string `json:"status"`
Alerts []Alert `json:"alerts"`
}
func (aw *AlertWebhook) HandleWebhook(w http.ResponseWriter, r *http.Request) {
var msg AlertMessage
err := json.NewDecoder(r.Body).Decode(&msg)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, alert := range msg.Alerts {
// 根据severity决定通知方式
severity := alert.Labels["severity"]
switch severity {
case "critical":
// 电话 + 短信 + 企业微信
aw.notifier.SendPhoneCall(alert)
aw.notifier.SendSMS(alert)
aw.notifier.SendWechat(alert)
case "warning":
// 邮件 + 企业微信
aw.notifier.SendEmail(alert)
aw.notifier.SendWechat(alert)
default:
// 仅企业微信
aw.notifier.SendWechat(alert)
}
// 记录告警到数据库
aw.saveAlert(alert)
}
w.WriteHeader(http.StatusOK)
}
type Notifier struct {
// 通知渠道配置
}
func (n *Notifier) SendPhoneCall(alert Alert) {
// 调用电话告警API
log.Printf("Calling ops team: %s", alert.Annotations["summary"])
}
func (n *Notifier) SendSMS(alert Alert) {
// 调用短信API
log.Printf("SMS sent: %s", alert.Annotations["summary"])
}
func (n *Notifier) SendWechat(alert Alert) {
// 调用企业微信API
message := fmt.Sprintf(
"【%s】\n告警: %s\n详情: %s\n时间: %s",
alert.Labels["severity"],
alert.Annotations["summary"],
alert.Annotations["description"],
alert.StartsAt,
)
// 发送企业微信消息
log.Printf("Wechat message: %s", message)
}
func (n *Notifier) SendEmail(alert Alert) {
// 发送邮件
subject := fmt.Sprintf("[%s] %s", alert.Labels["severity"], alert.Annotations["summary"])
body := alert.Annotations["description"]
log.Printf("Email sent: %s", subject)
}
小结
本章介绍了监控与告警系统:
- Prometheus监控:指标类型(Counter/Gauge/Histogram/Summary)、自定义业务指标
- 告警规则:API可用性、响应时间、资源使用、异常检测
- AlertManager:告警路由、分级通知、抑制规则
- 日志系统:结构化日志、ELK收集分析
- 链路追踪:Jaeger集成、分布式调用链
- 监控大盘:Grafana可视化、PromQL查询
- 告警处理:Webhook处理、多渠道通知
下一章将讲解安全防护与攻防。