HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • API网关与服务治理

    • API网关与服务治理手册
    • 第1章:API网关概述
    • 第2章:API网关实现
    • 第3章:服务网格(Service Mesh)
    • 第4章:灰度发布与蓝绿部署
    • 第5章:服务治理

第2章:API网关实现

Kong实战

Kong架构

核心概念:

┌──────────────────────────────────────┐
│           Kong Architecture          │
│                                      │
│  ┌────────────────────────────────┐ │
│  │      Admin API (8001)          │ │ 管理接口
│  └────────────────────────────────┘ │
│                                      │
│  ┌────────────────────────────────┐ │
│  │      Proxy (8000/8443)         │ │ 代理流量
│  │                                 │ │
│  │  ┌──────────┐  ┌──────────┐  │ │
│  │  │ Plugins  │  │ Plugins  │  │ │
│  │  └──────────┘  └──────────┘  │ │
│  └────────────────────────────────┘ │
│                                      │
│  ┌────────────────────────────────┐ │
│  │   Database (PostgreSQL/        │ │ 存储配置
│  │   Cassandra/DBless)            │ │
│  └────────────────────────────────┘ │
└──────────────────────────────────────┘

核心概念:
- Service:上游服务抽象
- Route:路由规则(如何匹配请求)
- Upstream:负载均衡目标
- Plugin:功能插件
- Consumer:API消费者

安装Kong

Docker方式:

# 1. 创建Kong网络
docker network create kong-net

# 2. 启动PostgreSQL数据库
docker run -d --name kong-database \
  --network=kong-net \
  -e "POSTGRES_USER=kong" \
  -e "POSTGRES_DB=kong" \
  -e "POSTGRES_PASSWORD=kong" \
  postgres:13

# 3. 初始化数据库
docker run --rm \
  --network=kong-net \
  -e "KONG_DATABASE=postgres" \
  -e "KONG_PG_HOST=kong-database" \
  -e "KONG_PG_PASSWORD=kong" \
  kong:latest kong migrations bootstrap

# 4. 启动Kong
docker run -d --name kong \
  --network=kong-net \
  -e "KONG_DATABASE=postgres" \
  -e "KONG_PG_HOST=kong-database" \
  -e "KONG_PG_PASSWORD=kong" \
  -e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \
  -e "KONG_ADMIN_ACCESS_LOG=/dev/stdout" \
  -e "KONG_PROXY_ERROR_LOG=/dev/stderr" \
  -e "KONG_ADMIN_ERROR_LOG=/dev/stderr" \
  -e "KONG_ADMIN_LISTEN=0.0.0.0:8001" \
  -p 8000:8000 \
  -p 8443:8443 \
  -p 8001:8001 \
  -p 8444:8444 \
  kong:latest

# 5. 验证
curl -i http://localhost:8001/

DBless模式(推荐):

# kong.yml
_format_version: "2.1"

services:
  - name: user-service
    url: http://user-service:8080
    routes:
      - name: user-route
        paths:
          - /api/users
    plugins:
      - name: rate-limiting
        config:
          minute: 100
          policy: local
      - name: key-auth

  - name: order-service
    url: http://order-service:8081
    routes:
      - name: order-route
        paths:
          - /api/orders
    plugins:
      - name: jwt

consumers:
  - username: test-user
    keyauth_credentials:
      - key: test-api-key
# 启动Kong(DBless)
docker run -d --name kong-dbless \
  -v "$(pwd)/kong.yml:/kong.yml" \
  -e "KONG_DATABASE=off" \
  -e "KONG_DECLARATIVE_CONFIG=/kong.yml" \
  -e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \
  -e "KONG_ADMIN_ACCESS_LOG=/dev/stdout" \
  -p 8000:8000 \
  -p 8001:8001 \
  kong:latest

Kong配置示例

1. 创建Service和Route

# 创建Service
curl -i -X POST http://localhost:8001/services \
  --data "name=user-service" \
  --data "url=http://user-service:8080"

# 创建Route
curl -i -X POST http://localhost:8001/services/user-service/routes \
  --data "name=user-route" \
  --data "paths[]=/api/users" \
  --data "methods[]=GET" \
  --data "methods[]=POST"

# 测试
curl -i http://localhost:8000/api/users

2. 添加限流插件

# 添加限流(每分钟100次)
curl -i -X POST http://localhost:8001/services/user-service/plugins \
  --data "name=rate-limiting" \
  --data "config.minute=100" \
  --data "config.policy=local"

# 测试限流
for i in {1..110}; do
  curl -i http://localhost:8000/api/users
done

# 超过限制会返回429
HTTP/1.1 429 Too Many Requests
{
  "message": "API rate limit exceeded"
}

3. 添加JWT鉴权

# 1. 创建Consumer
curl -i -X POST http://localhost:8001/consumers \
  --data "username=testuser"

# 2. 为Consumer创建JWT凭证
curl -i -X POST http://localhost:8001/consumers/testuser/jwt \
  --data "key=myapikey" \
  --data "secret=mysecret"

# 返回:
{
  "key": "myapikey",
  "secret": "mysecret",
  "id": "xxx"
}

# 3. 为Service添加JWT插件
curl -i -X POST http://localhost:8001/services/user-service/plugins \
  --data "name=jwt"

# 4. 生成JWT Token(使用在线工具或代码)
# Header:
{
  "alg": "HS256",
  "typ": "JWT"
}

# Payload:
{
  "iss": "myapikey",
  "exp": 1735660800
}

# Secret: mysecret

# 5. 测试(不带Token,失败)
curl -i http://localhost:8000/api/users
# 返回401 Unauthorized

# 6. 测试(带Token,成功)
curl -i http://localhost:8000/api/users \
  -H "Authorization: Bearer <your-jwt-token>"

4. 添加API Key鉴权

# 1. 创建Consumer
curl -i -X POST http://localhost:8001/consumers \
  --data "username=apiuser"

# 2. 为Consumer创建API Key
curl -i -X POST http://localhost:8001/consumers/apiuser/key-auth \
  --data "key=my-api-key-12345"

# 3. 为Route添加key-auth插件
curl -i -X POST http://localhost:8001/routes/user-route/plugins \
  --data "name=key-auth"

# 4. 测试(不带API Key,失败)
curl -i http://localhost:8000/api/users
# 返回401

# 5. 测试(带API Key,成功)
curl -i http://localhost:8000/api/users \
  -H "apikey: my-api-key-12345"

5. 负载均衡配置

# 1. 创建Upstream
curl -i -X POST http://localhost:8001/upstreams \
  --data "name=user-service-upstream"

# 2. 添加Target(后端实例)
curl -i -X POST http://localhost:8001/upstreams/user-service-upstream/targets \
  --data "target=user-service-1:8080" \
  --data "weight=100"

curl -i -X POST http://localhost:8001/upstreams/user-service-upstream/targets \
  --data "target=user-service-2:8080" \
  --data "weight=100"

curl -i -X POST http://localhost:8001/upstreams/user-service-upstream/targets \
  --data "target=user-service-3:8080" \
  --data "weight=50"

# 3. 更新Service指向Upstream
curl -i -X PATCH http://localhost:8001/services/user-service \
  --data "host=user-service-upstream"

# 负载均衡算法(默认:加权轮询)
# 其他算法:least-connections、consistent-hashing

Kong插件开发

自定义限流插件(Lua):

-- custom-rate-limiting.lua

local BasePlugin = require "kong.plugins.base_plugin"
local redis = require "resty.redis"

local CustomRateLimitingHandler = BasePlugin:extend()

CustomRateLimitingHandler.VERSION = "1.0.0"
CustomRateLimitingHandler.PRIORITY = 900

function CustomRateLimitingHandler:new()
  CustomRateLimitingHandler.super.new(self, "custom-rate-limiting")
end

function CustomRateLimitingHandler:access(conf)
  CustomRateLimitingHandler.super.access(self)

  -- 1. 获取客户端标识(IP或User ID)
  local identifier = kong.client.get_forwarded_ip()

  -- 2. 连接Redis
  local red = redis:new()
  red:set_timeout(1000)

  local ok, err = red:connect(conf.redis_host, conf.redis_port)
  if not ok then
    kong.log.err("Failed to connect to Redis: ", err)
    return
  end

  -- 3. 检查限流
  local key = "rate_limit:" .. identifier
  local count, err = red:incr(key)

  if count == 1 then
    -- 首次请求,设置过期时间
    red:expire(key, conf.window_size)
  end

  -- 4. 判断是否超过限制
  if count > conf.limit then
    kong.response.exit(429, {
      message = "API rate limit exceeded",
      limit = conf.limit,
      window = conf.window_size
    })
  end

  -- 5. 添加响应头
  kong.response.set_header("X-RateLimit-Limit", conf.limit)
  kong.response.set_header("X-RateLimit-Remaining", conf.limit - count)

  -- 6. 关闭Redis连接
  red:close()
end

return CustomRateLimitingHandler

插件Schema配置:

-- schema.lua

return {
  name = "custom-rate-limiting",
  fields = {
    { config = {
        type = "record",
        fields = {
          { limit = {
              type = "number",
              required = true,
              default = 100
          }},
          { window_size = {
              type = "number",
              required = true,
              default = 60
          }},
          { redis_host = {
              type = "string",
              required = true,
              default = "localhost"
          }},
          { redis_port = {
              type = "number",
              required = true,
              default = 6379
          }}
        }
    }}
  }
}

Nginx网关

OpenResty + Lua

架构:

┌──────────────────────────────────────┐
│         Nginx + OpenResty            │
│                                      │
│  ┌────────────────────────────────┐ │
│  │      Lua Code                  │ │ 业务逻辑
│  │  - 路由                         │ │
│  │  - 鉴权                         │ │
│  │  - 限流                         │ │
│  └────────────────────────────────┘ │
│                                      │
│  ┌────────────────────────────────┐ │
│  │    Nginx Core                  │ │ HTTP处理
│  └────────────────────────────────┘ │
│                                      │
│  ┌────────────────────────────────┐ │
│  │    Shared Dict / Redis         │ │ 共享存储
│  └────────────────────────────────┘ │
└──────────────────────────────────────┘

安装OpenResty

# macOS
brew install openresty

# Ubuntu
apt-get install openresty

# 启动
openresty -p `pwd` -c nginx.conf

Nginx配置示例

完整网关配置:

# nginx.conf

worker_processes auto;
error_log logs/error.log warn;

events {
    worker_connections 1024;
}

http {
    # Lua模块路径
    lua_package_path "/usr/local/openresty/lualib/?.lua;;";

    # 共享内存(用于限流、缓存)
    lua_shared_dict rate_limit 10m;
    lua_shared_dict jwt_cache 10m;

    # 上游服务定义
    upstream user_service {
        server 127.0.0.1:8080;
        server 127.0.0.1:8081;
        keepalive 32;
    }

    upstream order_service {
        server 127.0.0.1:9080;
        server 127.0.0.1:9081;
        keepalive 32;
    }

    server {
        listen 80;
        server_name localhost;

        # 路由:User Service
        location /api/users {
            # 鉴权
            access_by_lua_file lua/auth.lua;

            # 限流
            access_by_lua_file lua/rate_limit.lua;

            # 代理
            proxy_pass http://user_service;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

            # 添加用户信息到Header
            proxy_set_header X-User-ID $user_id;
        }

        # 路由:Order Service
        location /api/orders {
            access_by_lua_file lua/auth.lua;
            access_by_lua_file lua/rate_limit.lua;

            proxy_pass http://order_service;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-User-ID $user_id;
        }

        # 健康检查
        location /health {
            access_log off;
            return 200 "OK";
        }
    }
}

Lua脚本实现

1. JWT鉴权(auth.lua):

-- lua/auth.lua

local jwt = require "resty.jwt"
local cjson = require "cjson"

-- JWT密钥
local jwt_secret = "your-secret-key"

-- 获取Token
local auth_header = ngx.var.http_authorization
if not auth_header then
    ngx.status = 401
    ngx.say(cjson.encode({
        code = 401,
        message = "Missing authorization header"
    }))
    ngx.exit(401)
end

-- 提取Token
local token = string.sub(auth_header, 8) -- 去掉 "Bearer "

-- 验证Token
local jwt_obj = jwt:verify(jwt_secret, token)

if not jwt_obj.verified then
    ngx.status = 401
    ngx.say(cjson.encode({
        code = 401,
        message = "Invalid token",
        reason = jwt_obj.reason
    }))
    ngx.exit(401)
end

-- 提取用户信息
local payload = jwt_obj.payload
ngx.var.user_id = payload.user_id

-- 缓存验证结果(可选)
local jwt_cache = ngx.shared.jwt_cache
jwt_cache:set(token, payload.user_id, 300) -- 缓存5分钟

2. 限流(rate_limit.lua):

-- lua/rate_limit.lua

local limit_req = require "resty.limit.req"

-- 创建限流器(每秒100次,burst 200)
local limiter, err = limit_req.new("rate_limit", 100, 200)

if not limiter then
    ngx.log(ngx.ERR, "Failed to create limiter: ", err)
    return
end

-- 获取客户端标识(IP或User ID)
local key = ngx.var.user_id or ngx.var.remote_addr

-- 限流检查
local delay, err = limiter:incoming(key, true)

if not delay then
    if err == "rejected" then
        ngx.status = 429
        ngx.say(cjson.encode({
            code = 429,
            message = "Too many requests"
        }))
        ngx.exit(429)
    end

    ngx.log(ngx.ERR, "Failed to limit request: ", err)
    return
end

-- 如果需要延迟
if delay >= 0.001 then
    ngx.sleep(delay)
end

3. 动态路由(router.lua):

-- lua/router.lua

local cjson = require "cjson"
local redis = require "resty.redis"

-- 连接Redis获取路由配置
local red = redis:new()
red:set_timeout(1000)

local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
    ngx.log(ngx.ERR, "Failed to connect to Redis: ", err)
    return
end

-- 获取请求路径
local path = ngx.var.uri

-- 从Redis获取路由规则
local route_key = "route:" .. path
local upstream, err = red:get(route_key)

if upstream == ngx.null then
    ngx.status = 404
    ngx.say(cjson.encode({
        code = 404,
        message = "Route not found"
    }))
    ngx.exit(404)
end

-- 设置上游服务
ngx.var.upstream = upstream

red:close()

使用动态路由:

location / {
    access_by_lua_file lua/router.lua;

    # 使用Lua设置的upstream变量
    proxy_pass http://$upstream;
}

在Redis中配置路由:

# 设置路由规则
redis-cli SET "route:/api/users" "user_service"
redis-cli SET "route:/api/orders" "order_service"
redis-cli SET "route:/api/products" "product_service"

自研网关(Go)

架构设计

┌──────────────────────────────────────┐
│          API Gateway (Go)            │
│                                      │
│  ┌────────────────────────────────┐ │
│  │      HTTP Server               │ │
│  └────────────────────────────────┘ │
│               ↓                      │
│  ┌────────────────────────────────┐ │
│  │    Middleware Chain            │ │
│  │  - Logger                      │ │
│  │  - Auth                        │ │
│  │  - Rate Limiter                │ │
│  │  - Circuit Breaker             │ │
│  └────────────────────────────────┘ │
│               ↓                      │
│  ┌────────────────────────────────┐ │
│  │      Router                    │ │
│  └────────────────────────────────┘ │
│               ↓                      │
│  ┌────────────────────────────────┐ │
│  │   Reverse Proxy                │ │
│  │   + Load Balancer              │ │
│  └────────────────────────────────┘ │
└──────────────────────────────────────┘

完整实现

main.go:

package main

import (
    "log"
    "net/http"
    "time"
)

func main() {
    // 创建网关
    gateway := NewGateway()

    // 注册路由
    gateway.AddRoute("/api/users/*", "http://localhost:8080", []string{"GET", "POST"})
    gateway.AddRoute("/api/orders/*", "http://localhost:8081", []string{"GET", "POST", "PUT"})
    gateway.AddRoute("/api/products/*", "http://localhost:8082", []string{"GET"})

    // 启动服务器
    server := &http.Server{
        Addr:         ":8000",
        Handler:      gateway,
        ReadTimeout:  10 * time.Second,
        WriteTimeout: 10 * time.Second,
    }

    log.Println("API Gateway listening on :8000")
    if err := server.ListenAndServe(); err != nil {
        log.Fatal(err)
    }
}

gateway.go:

package main

import (
    "net/http"
    "sync"
)

// Gateway网关结构
type Gateway struct {
    router       *Router
    middlewares  []Middleware
    loadBalancer *LoadBalancer
    rateLimiter  *RateLimiter
    circuitBreaker *CircuitBreaker
    mu           sync.RWMutex
}

func NewGateway() *Gateway {
    return &Gateway{
        router:       NewRouter(),
        middlewares:  []Middleware{},
        loadBalancer: NewLoadBalancer(),
        rateLimiter:  NewRateLimiter(100), // 100 QPS
        circuitBreaker: NewCircuitBreaker(5, 10*time.Second),
    }
}

// 添加路由
func (g *Gateway) AddRoute(path, upstream string, methods []string) {
    route := &Route{
        Path:     path,
        Upstream: upstream,
        Methods:  methods,
    }
    g.router.AddRoute(route)
}

// 添加中间件
func (g *Gateway) Use(middleware Middleware) {
    g.middlewares = append(g.middlewares, middleware)
}

// 实现http.Handler接口
func (g *Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // 1. 执行中间件链
    handler := g.buildHandler()
    handler.ServeHTTP(w, r)
}

// 构建处理器链
func (g *Gateway) buildHandler() http.Handler {
    // 最终处理器:反向代理
    handler := http.HandlerFunc(g.proxyHandler)

    // 逆序添加中间件
    for i := len(g.middlewares) - 1; i >= 0; i-- {
        handler = g.middlewares[i](handler)
    }

    // 添加默认中间件
    handler = g.loggerMiddleware(handler)
    handler = g.authMiddleware(handler)
    handler = g.rateLimitMiddleware(handler)
    handler = g.circuitBreakerMiddleware(handler)

    return handler
}

// 反向代理处理器
func (g *Gateway) proxyHandler(w http.ResponseWriter, r *http.Request) {
    // 1. 路由匹配
    route := g.router.Match(r.Method, r.URL.Path)
    if route == nil {
        http.Error(w, "Route not found", http.StatusNotFound)
        return
    }

    // 2. 选择后端实例(负载均衡)
    backend := g.loadBalancer.Select(route.Upstream)

    // 3. 反向代理
    proxy := NewReverseProxy(backend)
    proxy.ServeHTTP(w, r)
}

router.go:

package main

import (
    "path"
    "strings"
)

type Route struct {
    Path     string
    Upstream string
    Methods  []string
}

type Router struct {
    routes []*Route
}

func NewRouter() *Router {
    return &Router{
        routes: []*Route{},
    }
}

func (r *Router) AddRoute(route *Route) {
    r.routes = append(r.routes, route)
}

func (r *Router) Match(method, urlPath string) *Route {
    for _, route := range r.routes {
        // 检查HTTP方法
        if !contains(route.Methods, method) {
            continue
        }

        // 检查路径匹配
        if r.pathMatch(route.Path, urlPath) {
            return route
        }
    }
    return nil
}

// 路径匹配(支持通配符)
func (r *Router) pathMatch(pattern, urlPath string) bool {
    // 精确匹配
    if pattern == urlPath {
        return true
    }

    // 前缀匹配(/api/users/* 匹配 /api/users/123)
    if strings.HasSuffix(pattern, "/*") {
        prefix := strings.TrimSuffix(pattern, "/*")
        return strings.HasPrefix(urlPath, prefix)
    }

    // 参数匹配(/api/users/{id})
    patternParts := strings.Split(pattern, "/")
    pathParts := strings.Split(urlPath, "/")

    if len(patternParts) != len(pathParts) {
        return false
    }

    for i := range patternParts {
        if patternParts[i] != pathParts[i] &&
           !strings.HasPrefix(patternParts[i], "{") {
            return false
        }
    }

    return true
}

func contains(slice []string, item string) bool {
    for _, s := range slice {
        if s == item {
            return true
        }
    }
    return false
}

middleware.go:

package main

import (
    "log"
    "net/http"
    "strings"
    "time"
)

type Middleware func(http.Handler) http.Handler

// 日志中间件
func (g *Gateway) loggerMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()

        // 包装ResponseWriter以记录状态码
        lrw := &loggingResponseWriter{ResponseWriter: w}

        next.ServeHTTP(lrw, r)

        log.Printf("[%s] %s %s - %d (%s)",
            r.Method,
            r.URL.Path,
            r.RemoteAddr,
            lrw.statusCode,
            time.Since(start),
        )
    })
}

type loggingResponseWriter struct {
    http.ResponseWriter
    statusCode int
}

func (lrw *loggingResponseWriter) WriteHeader(code int) {
    lrw.statusCode = code
    lrw.ResponseWriter.WriteHeader(code)
}

// 鉴权中间件(JWT)
func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // 跳过健康检查
        if r.URL.Path == "/health" {
            next.ServeHTTP(w, r)
            return
        }

        // 获取Token
        authHeader := r.Header.Get("Authorization")
        if authHeader == "" {
            http.Error(w, "Missing authorization header", http.StatusUnauthorized)
            return
        }

        // 验证Token
        token := strings.TrimPrefix(authHeader, "Bearer ")
        userID, err := ValidateJWT(token)
        if err != nil {
            http.Error(w, "Invalid token", http.StatusUnauthorized)
            return
        }

        // 添加用户信息到Header
        r.Header.Set("X-User-ID", userID)

        next.ServeHTTP(w, r)
    })
}

// 限流中间件
func (g *Gateway) rateLimitMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // 获取客户端标识
        userID := r.Header.Get("X-User-ID")
        if userID == "" {
            userID = r.RemoteAddr
        }

        // 限流检查
        if !g.rateLimiter.Allow(userID) {
            http.Error(w, "Too many requests", http.StatusTooManyRequests)
            return
        }

        next.ServeHTTP(w, r)
    })
}

// 熔断中间件
func (g *Gateway) circuitBreakerMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // 检查熔断器状态
        if !g.circuitBreaker.Allow() {
            http.Error(w, "Service temporarily unavailable", http.StatusServiceUnavailable)
            return
        }

        // 包装ResponseWriter以记录结果
        lrw := &loggingResponseWriter{ResponseWriter: w}
        next.ServeHTTP(lrw, r)

        // 记录成功/失败
        if lrw.statusCode >= 500 {
            g.circuitBreaker.RecordFailure()
        } else {
            g.circuitBreaker.RecordSuccess()
        }
    })
}

rate_limiter.go:

package main

import (
    "sync"
    "time"

    "golang.org/x/time/rate"
)

type RateLimiter struct {
    limiters map[string]*rate.Limiter
    mu       sync.RWMutex
    qps      int
}

func NewRateLimiter(qps int) *RateLimiter {
    return &RateLimiter{
        limiters: make(map[string]*rate.Limiter),
        qps:      qps,
    }
}

func (rl *RateLimiter) Allow(key string) bool {
    rl.mu.RLock()
    limiter, exists := rl.limiters[key]
    rl.mu.RUnlock()

    if !exists {
        rl.mu.Lock()
        limiter = rate.NewLimiter(rate.Limit(rl.qps), rl.qps*2) // burst = qps*2
        rl.limiters[key] = limiter
        rl.mu.Unlock()
    }

    return limiter.Allow()
}

circuit_breaker.go:

package main

import (
    "sync"
    "time"
)

type CircuitState int

const (
    StateClosed CircuitState = iota
    StateOpen
    StateHalfOpen
)

type CircuitBreaker struct {
    maxFailures  int
    timeout      time.Duration
    state        CircuitState
    failures     int
    lastFailTime time.Time
    mu           sync.Mutex
}

func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures: maxFailures,
        timeout:     timeout,
        state:       StateClosed,
    }
}

func (cb *CircuitBreaker) Allow() bool {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    switch cb.state {
    case StateClosed:
        return true

    case StateOpen:
        // 检查是否可以尝试恢复
        if time.Since(cb.lastFailTime) > cb.timeout {
            cb.state = StateHalfOpen
            return true
        }
        return false

    case StateHalfOpen:
        return true

    default:
        return false
    }
}

func (cb *CircuitBreaker) RecordSuccess() {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if cb.state == StateHalfOpen {
        cb.state = StateClosed
        cb.failures = 0
    }
}

func (cb *CircuitBreaker) RecordFailure() {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    cb.failures++
    cb.lastFailTime = time.Now()

    if cb.failures >= cb.maxFailures {
        cb.state = StateOpen
    }
}

load_balancer.go:

package main

import (
    "sync"
)

type LoadBalancer struct {
    backends map[string][]string
    indexes  map[string]int
    mu       sync.RWMutex
}

func NewLoadBalancer() *LoadBalancer {
    lb := &LoadBalancer{
        backends: make(map[string][]string),
        indexes:  make(map[string]int),
    }

    // 配置后端实例
    lb.backends["http://localhost:8080"] = []string{
        "http://localhost:8080",
        "http://localhost:8081",
        "http://localhost:8082",
    }

    return lb
}

// 轮询算法
func (lb *LoadBalancer) Select(upstream string) string {
    lb.mu.Lock()
    defer lb.mu.Unlock()

    backends, exists := lb.backends[upstream]
    if !exists || len(backends) == 0 {
        return upstream
    }

    index := lb.indexes[upstream]
    backend := backends[index]

    // 更新索引
    lb.indexes[upstream] = (index + 1) % len(backends)

    return backend
}

proxy.go:

package main

import (
    "io"
    "net/http"
    "net/url"
)

func NewReverseProxy(targetURL string) http.Handler {
    target, _ := url.Parse(targetURL)

    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // 1. 修改请求
        r.URL.Scheme = target.Scheme
        r.URL.Host = target.Host
        r.Host = target.Host

        // 2. 创建新请求
        proxyReq, err := http.NewRequest(r.Method, r.URL.String(), r.Body)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        // 3. 复制Header
        for key, values := range r.Header {
            for _, value := range values {
                proxyReq.Header.Add(key, value)
            }
        }

        // 4. 发送请求
        client := &http.Client{Timeout: 10 * time.Second}
        resp, err := client.Do(proxyReq)
        if err != nil {
            http.Error(w, err.Error(), http.StatusBadGateway)
            return
        }
        defer resp.Body.Close()

        // 5. 复制响应Header
        for key, values := range resp.Header {
            for _, value := range values {
                w.Header().Add(key, value)
            }
        }

        // 6. 写入状态码
        w.WriteHeader(resp.StatusCode)

        // 7. 复制响应Body
        io.Copy(w, resp.Body)
    })
}

jwt.go:

package main

import (
    "errors"
    "time"

    "github.com/golang-jwt/jwt/v5"
)

var jwtSecret = []byte("your-secret-key")

func ValidateJWT(tokenString string) (string, error) {
    token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
        if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
            return nil, errors.New("invalid signing method")
        }
        return jwtSecret, nil
    })

    if err != nil {
        return "", err
    }

    if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
        // 检查过期时间
        exp := int64(claims["exp"].(float64))
        if time.Now().Unix() > exp {
            return "", errors.New("token expired")
        }

        userID := claims["user_id"].(string)
        return userID, nil
    }

    return "", errors.New("invalid token")
}

func GenerateJWT(userID string) (string, error) {
    claims := jwt.MapClaims{
        "user_id": userID,
        "exp":     time.Now().Add(24 * time.Hour).Unix(),
        "iat":     time.Now().Unix(),
    }

    token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
    return token.SignedString(jwtSecret)
}

性能对比

测试环境

硬件:
- CPU: 8核
- 内存: 16GB
- 网络: 1Gbps

工具:
- wrk(HTTP压测工具)

后端服务:
- 简单HTTP服务,返回固定JSON

测试结果

1. Kong:
   wrk -t8 -c100 -d30s http://localhost:8000/api/users

   Requests/sec: 28,453
   Latency (avg): 3.51ms
   Latency (P99): 12.34ms

2. Nginx + OpenResty:
   wrk -t8 -c100 -d30s http://localhost:8000/api/users

   Requests/sec: 65,234
   Latency (avg): 1.53ms
   Latency (P99): 5.67ms

3. 自研网关(Go):
   wrk -t8 -c100 -d30s http://localhost:8000/api/users

   Requests/sec: 42,156
   Latency (avg): 2.37ms
   Latency (P99): 8.91ms

结论:
- Nginx + OpenResty性能最高(C语言实现)
- 自研Go网关性能中等(易于开发和维护)
- Kong性能较低(但功能最丰富)

面试问答

如何实现API网关的限流功能?

答案:

限流算法:

1. 令牌桶(Token Bucket)

type TokenBucket struct {
    capacity  int       // 桶容量
    tokens    int       // 当前令牌数
    rate      int       // 生成速率(每秒)
    lastTime  time.Time
    mu        sync.Mutex
}

func NewTokenBucket(capacity, rate int) *TokenBucket {
    return &TokenBucket{
        capacity: capacity,
        tokens:   capacity,
        rate:     rate,
        lastTime: time.Now(),
    }
}

func (tb *TokenBucket) Allow() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()

    // 1. 计算新生成的令牌
    now := time.Now()
    elapsed := now.Sub(tb.lastTime).Seconds()
    newTokens := int(elapsed * float64(tb.rate))

    // 2. 更新令牌数(不超过容量)
    tb.tokens = min(tb.capacity, tb.tokens+newTokens)
    tb.lastTime = now

    // 3. 消耗令牌
    if tb.tokens > 0 {
        tb.tokens--
        return true
    }

    return false
}

2. 漏桶(Leaky Bucket)

type LeakyBucket struct {
    capacity int
    queue    chan struct{}
    rate     time.Duration
}

func NewLeakyBucket(capacity int, rate time.Duration) *LeakyBucket {
    lb := &LeakyBucket{
        capacity: capacity,
        queue:    make(chan struct{}, capacity),
        rate:     rate,
    }

    // 启动漏水协程
    go lb.leak()

    return lb
}

func (lb *LeakyBucket) leak() {
    ticker := time.NewTicker(lb.rate)
    defer ticker.Stop()

    for range ticker.C {
        select {
        case <-lb.queue:
            // 漏掉一个请求
        default:
        }
    }
}

func (lb *LeakyBucket) Allow() bool {
    select {
    case lb.queue <- struct{}{}:
        return true
    default:
        return false
    }
}

3. 滑动窗口

type SlidingWindow struct {
    limit      int
    windowSize time.Duration
    requests   []time.Time
    mu         sync.Mutex
}

func NewSlidingWindow(limit int, windowSize time.Duration) *SlidingWindow {
    return &SlidingWindow{
        limit:      limit,
        windowSize: windowSize,
        requests:   []time.Time{},
    }
}

func (sw *SlidingWindow) Allow() bool {
    sw.mu.Lock()
    defer sw.mu.Unlock()

    now := time.Now()

    // 1. 移除过期请求
    cutoff := now.Add(-sw.windowSize)
    validRequests := []time.Time{}
    for _, req := range sw.requests {
        if req.After(cutoff) {
            validRequests = append(validRequests, req)
        }
    }
    sw.requests = validRequests

    // 2. 检查是否超过限制
    if len(sw.requests) >= sw.limit {
        return false
    }

    // 3. 记录请求
    sw.requests = append(sw.requests, now)
    return true
}

选择建议:

令牌桶:适合突发流量(允许burst)
漏桶:平滑流量(严格控制速率)
滑动窗口:精确统计(资源消耗较高)

API网关如何实现负载均衡?

答案:

负载均衡算法:

1. 轮询(Round Robin)

type RoundRobinBalancer struct {
    backends []string
    index    int
    mu       sync.Mutex
}

func (rb *RoundRobinBalancer) Select() string {
    rb.mu.Lock()
    defer rb.mu.Unlock()

    backend := rb.backends[rb.index]
    rb.index = (rb.index + 1) % len(rb.backends)
    return backend
}

2. 加权轮询(Weighted Round Robin)

type WeightedBackend struct {
    URL    string
    Weight int
}

type WeightedRoundRobinBalancer struct {
    backends  []*WeightedBackend
    currentWeights []int
    mu        sync.Mutex
}

func (wrb *WeightedRoundRobinBalancer) Select() string {
    wrb.mu.Lock()
    defer wrb.mu.Unlock()

    maxWeightIndex := 0
    totalWeight := 0

    for i, backend := range wrb.backends {
        wrb.currentWeights[i] += backend.Weight
        totalWeight += backend.Weight

        if wrb.currentWeights[i] > wrb.currentWeights[maxWeightIndex] {
            maxWeightIndex = i
        }
    }

    wrb.currentWeights[maxWeightIndex] -= totalWeight
    return wrb.backends[maxWeightIndex].URL
}

3. 最少连接(Least Connections)

type LeastConnectionsBalancer struct {
    backends    []string
    connections map[string]int
    mu          sync.Mutex
}

func (lcb *LeastConnectionsBalancer) Select() string {
    lcb.mu.Lock()
    defer lcb.mu.Unlock()

    minConnections := int(^uint(0) >> 1) // max int
    selected := ""

    for _, backend := range lcb.backends {
        connections := lcb.connections[backend]
        if connections < minConnections {
            minConnections = connections
            selected = backend
        }
    }

    lcb.connections[selected]++
    return selected
}

func (lcb *LeastConnectionsBalancer) Release(backend string) {
    lcb.mu.Lock()
    defer lcb.mu.Unlock()

    lcb.connections[backend]--
}

4. 一致性哈希(Consistent Hashing)

type ConsistentHashBalancer struct {
    ring       *hashring.HashRing
    backends   []string
}

func NewConsistentHashBalancer(backends []string) *ConsistentHashBalancer {
    ring := hashring.New(backends)
    return &ConsistentHashBalancer{
        ring:     ring,
        backends: backends,
    }
}

func (chb *ConsistentHashBalancer) Select(key string) string {
    backend, _ := chb.ring.GetNode(key)
    return backend
}

如何实现API网关的熔断功能?

答案:

熔断器实现:

type CircuitBreaker struct {
    maxFailures  int           // 失败次数阈值
    timeout      time.Duration // 熔断超时时间
    state        CircuitState  // 当前状态
    failures     int           // 失败次数
    lastFailTime time.Time     // 最后失败时间
    mu           sync.Mutex
}

type CircuitState int

const (
    StateClosed   CircuitState = iota // 关闭(正常)
    StateOpen                          // 打开(熔断)
    StateHalfOpen                      // 半开(尝试恢复)
)

func (cb *CircuitBreaker) Call(fn func() error) error {
    // 1. 检查是否允许请求
    if !cb.Allow() {
        return errors.New("circuit breaker is open")
    }

    // 2. 执行请求
    err := fn()

    // 3. 记录结果
    if err != nil {
        cb.RecordFailure()
        return err
    }

    cb.RecordSuccess()
    return nil
}

func (cb *CircuitBreaker) Allow() bool {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    switch cb.state {
    case StateClosed:
        return true

    case StateOpen:
        // 检查是否可以尝试恢复
        if time.Since(cb.lastFailTime) > cb.timeout {
            cb.state = StateHalfOpen
            return true
        }
        return false

    case StateHalfOpen:
        return true

    default:
        return false
    }
}

func (cb *CircuitBreaker) RecordSuccess() {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if cb.state == StateHalfOpen {
        cb.state = StateClosed
        cb.failures = 0
    }
}

func (cb *CircuitBreaker) RecordFailure() {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    cb.failures++
    cb.lastFailTime = time.Now()

    if cb.failures >= cb.maxFailures {
        cb.state = StateOpen
    }
}

使用示例:

cb := NewCircuitBreaker(5, 10*time.Second)

err := cb.Call(func() error {
    resp, err := http.Get("http://backend-service/api/data")
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode >= 500 {
        return errors.New("server error")
    }

    return nil
})

if err != nil {
    // 降级处理
    return getDataFromCache()
}

参考资料

  • Kong Plugin Development Guide
  • OpenResty Best Practices
  • Go HTTP Reverse Proxy
  • Circuit Breaker Pattern
Prev
第1章:API网关概述
Next
第3章:服务网格(Service Mesh)