第2章:API网关实现
Kong实战
Kong架构
核心概念:
┌──────────────────────────────────────┐
│ Kong Architecture │
│ │
│ ┌────────────────────────────────┐ │
│ │ Admin API (8001) │ │ 管理接口
│ └────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────┐ │
│ │ Proxy (8000/8443) │ │ 代理流量
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Plugins │ │ Plugins │ │ │
│ │ └──────────┘ └──────────┘ │ │
│ └────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────┐ │
│ │ Database (PostgreSQL/ │ │ 存储配置
│ │ Cassandra/DBless) │ │
│ └────────────────────────────────┘ │
└──────────────────────────────────────┘
核心概念:
- Service:上游服务抽象
- Route:路由规则(如何匹配请求)
- Upstream:负载均衡目标
- Plugin:功能插件
- Consumer:API消费者
安装Kong
Docker方式:
# 1. 创建Kong网络
docker network create kong-net
# 2. 启动PostgreSQL数据库
docker run -d --name kong-database \
--network=kong-net \
-e "POSTGRES_USER=kong" \
-e "POSTGRES_DB=kong" \
-e "POSTGRES_PASSWORD=kong" \
postgres:13
# 3. 初始化数据库
docker run --rm \
--network=kong-net \
-e "KONG_DATABASE=postgres" \
-e "KONG_PG_HOST=kong-database" \
-e "KONG_PG_PASSWORD=kong" \
kong:latest kong migrations bootstrap
# 4. 启动Kong
docker run -d --name kong \
--network=kong-net \
-e "KONG_DATABASE=postgres" \
-e "KONG_PG_HOST=kong-database" \
-e "KONG_PG_PASSWORD=kong" \
-e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \
-e "KONG_ADMIN_ACCESS_LOG=/dev/stdout" \
-e "KONG_PROXY_ERROR_LOG=/dev/stderr" \
-e "KONG_ADMIN_ERROR_LOG=/dev/stderr" \
-e "KONG_ADMIN_LISTEN=0.0.0.0:8001" \
-p 8000:8000 \
-p 8443:8443 \
-p 8001:8001 \
-p 8444:8444 \
kong:latest
# 5. 验证
curl -i http://localhost:8001/
DBless模式(推荐):
# kong.yml
_format_version: "2.1"
services:
- name: user-service
url: http://user-service:8080
routes:
- name: user-route
paths:
- /api/users
plugins:
- name: rate-limiting
config:
minute: 100
policy: local
- name: key-auth
- name: order-service
url: http://order-service:8081
routes:
- name: order-route
paths:
- /api/orders
plugins:
- name: jwt
consumers:
- username: test-user
keyauth_credentials:
- key: test-api-key
# 启动Kong(DBless)
docker run -d --name kong-dbless \
-v "$(pwd)/kong.yml:/kong.yml" \
-e "KONG_DATABASE=off" \
-e "KONG_DECLARATIVE_CONFIG=/kong.yml" \
-e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \
-e "KONG_ADMIN_ACCESS_LOG=/dev/stdout" \
-p 8000:8000 \
-p 8001:8001 \
kong:latest
Kong配置示例
1. 创建Service和Route
# 创建Service
curl -i -X POST http://localhost:8001/services \
--data "name=user-service" \
--data "url=http://user-service:8080"
# 创建Route
curl -i -X POST http://localhost:8001/services/user-service/routes \
--data "name=user-route" \
--data "paths[]=/api/users" \
--data "methods[]=GET" \
--data "methods[]=POST"
# 测试
curl -i http://localhost:8000/api/users
2. 添加限流插件
# 添加限流(每分钟100次)
curl -i -X POST http://localhost:8001/services/user-service/plugins \
--data "name=rate-limiting" \
--data "config.minute=100" \
--data "config.policy=local"
# 测试限流
for i in {1..110}; do
curl -i http://localhost:8000/api/users
done
# 超过限制会返回429
HTTP/1.1 429 Too Many Requests
{
"message": "API rate limit exceeded"
}
3. 添加JWT鉴权
# 1. 创建Consumer
curl -i -X POST http://localhost:8001/consumers \
--data "username=testuser"
# 2. 为Consumer创建JWT凭证
curl -i -X POST http://localhost:8001/consumers/testuser/jwt \
--data "key=myapikey" \
--data "secret=mysecret"
# 返回:
{
"key": "myapikey",
"secret": "mysecret",
"id": "xxx"
}
# 3. 为Service添加JWT插件
curl -i -X POST http://localhost:8001/services/user-service/plugins \
--data "name=jwt"
# 4. 生成JWT Token(使用在线工具或代码)
# Header:
{
"alg": "HS256",
"typ": "JWT"
}
# Payload:
{
"iss": "myapikey",
"exp": 1735660800
}
# Secret: mysecret
# 5. 测试(不带Token,失败)
curl -i http://localhost:8000/api/users
# 返回401 Unauthorized
# 6. 测试(带Token,成功)
curl -i http://localhost:8000/api/users \
-H "Authorization: Bearer <your-jwt-token>"
4. 添加API Key鉴权
# 1. 创建Consumer
curl -i -X POST http://localhost:8001/consumers \
--data "username=apiuser"
# 2. 为Consumer创建API Key
curl -i -X POST http://localhost:8001/consumers/apiuser/key-auth \
--data "key=my-api-key-12345"
# 3. 为Route添加key-auth插件
curl -i -X POST http://localhost:8001/routes/user-route/plugins \
--data "name=key-auth"
# 4. 测试(不带API Key,失败)
curl -i http://localhost:8000/api/users
# 返回401
# 5. 测试(带API Key,成功)
curl -i http://localhost:8000/api/users \
-H "apikey: my-api-key-12345"
5. 负载均衡配置
# 1. 创建Upstream
curl -i -X POST http://localhost:8001/upstreams \
--data "name=user-service-upstream"
# 2. 添加Target(后端实例)
curl -i -X POST http://localhost:8001/upstreams/user-service-upstream/targets \
--data "target=user-service-1:8080" \
--data "weight=100"
curl -i -X POST http://localhost:8001/upstreams/user-service-upstream/targets \
--data "target=user-service-2:8080" \
--data "weight=100"
curl -i -X POST http://localhost:8001/upstreams/user-service-upstream/targets \
--data "target=user-service-3:8080" \
--data "weight=50"
# 3. 更新Service指向Upstream
curl -i -X PATCH http://localhost:8001/services/user-service \
--data "host=user-service-upstream"
# 负载均衡算法(默认:加权轮询)
# 其他算法:least-connections、consistent-hashing
Kong插件开发
自定义限流插件(Lua):
-- custom-rate-limiting.lua
local BasePlugin = require "kong.plugins.base_plugin"
local redis = require "resty.redis"
local CustomRateLimitingHandler = BasePlugin:extend()
CustomRateLimitingHandler.VERSION = "1.0.0"
CustomRateLimitingHandler.PRIORITY = 900
function CustomRateLimitingHandler:new()
CustomRateLimitingHandler.super.new(self, "custom-rate-limiting")
end
function CustomRateLimitingHandler:access(conf)
CustomRateLimitingHandler.super.access(self)
-- 1. 获取客户端标识(IP或User ID)
local identifier = kong.client.get_forwarded_ip()
-- 2. 连接Redis
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect(conf.redis_host, conf.redis_port)
if not ok then
kong.log.err("Failed to connect to Redis: ", err)
return
end
-- 3. 检查限流
local key = "rate_limit:" .. identifier
local count, err = red:incr(key)
if count == 1 then
-- 首次请求,设置过期时间
red:expire(key, conf.window_size)
end
-- 4. 判断是否超过限制
if count > conf.limit then
kong.response.exit(429, {
message = "API rate limit exceeded",
limit = conf.limit,
window = conf.window_size
})
end
-- 5. 添加响应头
kong.response.set_header("X-RateLimit-Limit", conf.limit)
kong.response.set_header("X-RateLimit-Remaining", conf.limit - count)
-- 6. 关闭Redis连接
red:close()
end
return CustomRateLimitingHandler
插件Schema配置:
-- schema.lua
return {
name = "custom-rate-limiting",
fields = {
{ config = {
type = "record",
fields = {
{ limit = {
type = "number",
required = true,
default = 100
}},
{ window_size = {
type = "number",
required = true,
default = 60
}},
{ redis_host = {
type = "string",
required = true,
default = "localhost"
}},
{ redis_port = {
type = "number",
required = true,
default = 6379
}}
}
}}
}
}
Nginx网关
OpenResty + Lua
架构:
┌──────────────────────────────────────┐
│ Nginx + OpenResty │
│ │
│ ┌────────────────────────────────┐ │
│ │ Lua Code │ │ 业务逻辑
│ │ - 路由 │ │
│ │ - 鉴权 │ │
│ │ - 限流 │ │
│ └────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────┐ │
│ │ Nginx Core │ │ HTTP处理
│ └────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────┐ │
│ │ Shared Dict / Redis │ │ 共享存储
│ └────────────────────────────────┘ │
└──────────────────────────────────────┘
安装OpenResty
# macOS
brew install openresty
# Ubuntu
apt-get install openresty
# 启动
openresty -p `pwd` -c nginx.conf
Nginx配置示例
完整网关配置:
# nginx.conf
worker_processes auto;
error_log logs/error.log warn;
events {
worker_connections 1024;
}
http {
# Lua模块路径
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
# 共享内存(用于限流、缓存)
lua_shared_dict rate_limit 10m;
lua_shared_dict jwt_cache 10m;
# 上游服务定义
upstream user_service {
server 127.0.0.1:8080;
server 127.0.0.1:8081;
keepalive 32;
}
upstream order_service {
server 127.0.0.1:9080;
server 127.0.0.1:9081;
keepalive 32;
}
server {
listen 80;
server_name localhost;
# 路由:User Service
location /api/users {
# 鉴权
access_by_lua_file lua/auth.lua;
# 限流
access_by_lua_file lua/rate_limit.lua;
# 代理
proxy_pass http://user_service;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 添加用户信息到Header
proxy_set_header X-User-ID $user_id;
}
# 路由:Order Service
location /api/orders {
access_by_lua_file lua/auth.lua;
access_by_lua_file lua/rate_limit.lua;
proxy_pass http://order_service;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-User-ID $user_id;
}
# 健康检查
location /health {
access_log off;
return 200 "OK";
}
}
}
Lua脚本实现
1. JWT鉴权(auth.lua):
-- lua/auth.lua
local jwt = require "resty.jwt"
local cjson = require "cjson"
-- JWT密钥
local jwt_secret = "your-secret-key"
-- 获取Token
local auth_header = ngx.var.http_authorization
if not auth_header then
ngx.status = 401
ngx.say(cjson.encode({
code = 401,
message = "Missing authorization header"
}))
ngx.exit(401)
end
-- 提取Token
local token = string.sub(auth_header, 8) -- 去掉 "Bearer "
-- 验证Token
local jwt_obj = jwt:verify(jwt_secret, token)
if not jwt_obj.verified then
ngx.status = 401
ngx.say(cjson.encode({
code = 401,
message = "Invalid token",
reason = jwt_obj.reason
}))
ngx.exit(401)
end
-- 提取用户信息
local payload = jwt_obj.payload
ngx.var.user_id = payload.user_id
-- 缓存验证结果(可选)
local jwt_cache = ngx.shared.jwt_cache
jwt_cache:set(token, payload.user_id, 300) -- 缓存5分钟
2. 限流(rate_limit.lua):
-- lua/rate_limit.lua
local limit_req = require "resty.limit.req"
-- 创建限流器(每秒100次,burst 200)
local limiter, err = limit_req.new("rate_limit", 100, 200)
if not limiter then
ngx.log(ngx.ERR, "Failed to create limiter: ", err)
return
end
-- 获取客户端标识(IP或User ID)
local key = ngx.var.user_id or ngx.var.remote_addr
-- 限流检查
local delay, err = limiter:incoming(key, true)
if not delay then
if err == "rejected" then
ngx.status = 429
ngx.say(cjson.encode({
code = 429,
message = "Too many requests"
}))
ngx.exit(429)
end
ngx.log(ngx.ERR, "Failed to limit request: ", err)
return
end
-- 如果需要延迟
if delay >= 0.001 then
ngx.sleep(delay)
end
3. 动态路由(router.lua):
-- lua/router.lua
local cjson = require "cjson"
local redis = require "resty.redis"
-- 连接Redis获取路由配置
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.log(ngx.ERR, "Failed to connect to Redis: ", err)
return
end
-- 获取请求路径
local path = ngx.var.uri
-- 从Redis获取路由规则
local route_key = "route:" .. path
local upstream, err = red:get(route_key)
if upstream == ngx.null then
ngx.status = 404
ngx.say(cjson.encode({
code = 404,
message = "Route not found"
}))
ngx.exit(404)
end
-- 设置上游服务
ngx.var.upstream = upstream
red:close()
使用动态路由:
location / {
access_by_lua_file lua/router.lua;
# 使用Lua设置的upstream变量
proxy_pass http://$upstream;
}
在Redis中配置路由:
# 设置路由规则
redis-cli SET "route:/api/users" "user_service"
redis-cli SET "route:/api/orders" "order_service"
redis-cli SET "route:/api/products" "product_service"
自研网关(Go)
架构设计
┌──────────────────────────────────────┐
│ API Gateway (Go) │
│ │
│ ┌────────────────────────────────┐ │
│ │ HTTP Server │ │
│ └────────────────────────────────┘ │
│ ↓ │
│ ┌────────────────────────────────┐ │
│ │ Middleware Chain │ │
│ │ - Logger │ │
│ │ - Auth │ │
│ │ - Rate Limiter │ │
│ │ - Circuit Breaker │ │
│ └────────────────────────────────┘ │
│ ↓ │
│ ┌────────────────────────────────┐ │
│ │ Router │ │
│ └────────────────────────────────┘ │
│ ↓ │
│ ┌────────────────────────────────┐ │
│ │ Reverse Proxy │ │
│ │ + Load Balancer │ │
│ └────────────────────────────────┘ │
└──────────────────────────────────────┘
完整实现
main.go:
package main
import (
"log"
"net/http"
"time"
)
func main() {
// 创建网关
gateway := NewGateway()
// 注册路由
gateway.AddRoute("/api/users/*", "http://localhost:8080", []string{"GET", "POST"})
gateway.AddRoute("/api/orders/*", "http://localhost:8081", []string{"GET", "POST", "PUT"})
gateway.AddRoute("/api/products/*", "http://localhost:8082", []string{"GET"})
// 启动服务器
server := &http.Server{
Addr: ":8000",
Handler: gateway,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
log.Println("API Gateway listening on :8000")
if err := server.ListenAndServe(); err != nil {
log.Fatal(err)
}
}
gateway.go:
package main
import (
"net/http"
"sync"
)
// Gateway网关结构
type Gateway struct {
router *Router
middlewares []Middleware
loadBalancer *LoadBalancer
rateLimiter *RateLimiter
circuitBreaker *CircuitBreaker
mu sync.RWMutex
}
func NewGateway() *Gateway {
return &Gateway{
router: NewRouter(),
middlewares: []Middleware{},
loadBalancer: NewLoadBalancer(),
rateLimiter: NewRateLimiter(100), // 100 QPS
circuitBreaker: NewCircuitBreaker(5, 10*time.Second),
}
}
// 添加路由
func (g *Gateway) AddRoute(path, upstream string, methods []string) {
route := &Route{
Path: path,
Upstream: upstream,
Methods: methods,
}
g.router.AddRoute(route)
}
// 添加中间件
func (g *Gateway) Use(middleware Middleware) {
g.middlewares = append(g.middlewares, middleware)
}
// 实现http.Handler接口
func (g *Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 1. 执行中间件链
handler := g.buildHandler()
handler.ServeHTTP(w, r)
}
// 构建处理器链
func (g *Gateway) buildHandler() http.Handler {
// 最终处理器:反向代理
handler := http.HandlerFunc(g.proxyHandler)
// 逆序添加中间件
for i := len(g.middlewares) - 1; i >= 0; i-- {
handler = g.middlewares[i](handler)
}
// 添加默认中间件
handler = g.loggerMiddleware(handler)
handler = g.authMiddleware(handler)
handler = g.rateLimitMiddleware(handler)
handler = g.circuitBreakerMiddleware(handler)
return handler
}
// 反向代理处理器
func (g *Gateway) proxyHandler(w http.ResponseWriter, r *http.Request) {
// 1. 路由匹配
route := g.router.Match(r.Method, r.URL.Path)
if route == nil {
http.Error(w, "Route not found", http.StatusNotFound)
return
}
// 2. 选择后端实例(负载均衡)
backend := g.loadBalancer.Select(route.Upstream)
// 3. 反向代理
proxy := NewReverseProxy(backend)
proxy.ServeHTTP(w, r)
}
router.go:
package main
import (
"path"
"strings"
)
type Route struct {
Path string
Upstream string
Methods []string
}
type Router struct {
routes []*Route
}
func NewRouter() *Router {
return &Router{
routes: []*Route{},
}
}
func (r *Router) AddRoute(route *Route) {
r.routes = append(r.routes, route)
}
func (r *Router) Match(method, urlPath string) *Route {
for _, route := range r.routes {
// 检查HTTP方法
if !contains(route.Methods, method) {
continue
}
// 检查路径匹配
if r.pathMatch(route.Path, urlPath) {
return route
}
}
return nil
}
// 路径匹配(支持通配符)
func (r *Router) pathMatch(pattern, urlPath string) bool {
// 精确匹配
if pattern == urlPath {
return true
}
// 前缀匹配(/api/users/* 匹配 /api/users/123)
if strings.HasSuffix(pattern, "/*") {
prefix := strings.TrimSuffix(pattern, "/*")
return strings.HasPrefix(urlPath, prefix)
}
// 参数匹配(/api/users/{id})
patternParts := strings.Split(pattern, "/")
pathParts := strings.Split(urlPath, "/")
if len(patternParts) != len(pathParts) {
return false
}
for i := range patternParts {
if patternParts[i] != pathParts[i] &&
!strings.HasPrefix(patternParts[i], "{") {
return false
}
}
return true
}
func contains(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true
}
}
return false
}
middleware.go:
package main
import (
"log"
"net/http"
"strings"
"time"
)
type Middleware func(http.Handler) http.Handler
// 日志中间件
func (g *Gateway) loggerMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 包装ResponseWriter以记录状态码
lrw := &loggingResponseWriter{ResponseWriter: w}
next.ServeHTTP(lrw, r)
log.Printf("[%s] %s %s - %d (%s)",
r.Method,
r.URL.Path,
r.RemoteAddr,
lrw.statusCode,
time.Since(start),
)
})
}
type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
}
func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}
// 鉴权中间件(JWT)
func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 跳过健康检查
if r.URL.Path == "/health" {
next.ServeHTTP(w, r)
return
}
// 获取Token
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, "Missing authorization header", http.StatusUnauthorized)
return
}
// 验证Token
token := strings.TrimPrefix(authHeader, "Bearer ")
userID, err := ValidateJWT(token)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
// 添加用户信息到Header
r.Header.Set("X-User-ID", userID)
next.ServeHTTP(w, r)
})
}
// 限流中间件
func (g *Gateway) rateLimitMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 获取客户端标识
userID := r.Header.Get("X-User-ID")
if userID == "" {
userID = r.RemoteAddr
}
// 限流检查
if !g.rateLimiter.Allow(userID) {
http.Error(w, "Too many requests", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
// 熔断中间件
func (g *Gateway) circuitBreakerMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 检查熔断器状态
if !g.circuitBreaker.Allow() {
http.Error(w, "Service temporarily unavailable", http.StatusServiceUnavailable)
return
}
// 包装ResponseWriter以记录结果
lrw := &loggingResponseWriter{ResponseWriter: w}
next.ServeHTTP(lrw, r)
// 记录成功/失败
if lrw.statusCode >= 500 {
g.circuitBreaker.RecordFailure()
} else {
g.circuitBreaker.RecordSuccess()
}
})
}
rate_limiter.go:
package main
import (
"sync"
"time"
"golang.org/x/time/rate"
)
type RateLimiter struct {
limiters map[string]*rate.Limiter
mu sync.RWMutex
qps int
}
func NewRateLimiter(qps int) *RateLimiter {
return &RateLimiter{
limiters: make(map[string]*rate.Limiter),
qps: qps,
}
}
func (rl *RateLimiter) Allow(key string) bool {
rl.mu.RLock()
limiter, exists := rl.limiters[key]
rl.mu.RUnlock()
if !exists {
rl.mu.Lock()
limiter = rate.NewLimiter(rate.Limit(rl.qps), rl.qps*2) // burst = qps*2
rl.limiters[key] = limiter
rl.mu.Unlock()
}
return limiter.Allow()
}
circuit_breaker.go:
package main
import (
"sync"
"time"
)
type CircuitState int
const (
StateClosed CircuitState = iota
StateOpen
StateHalfOpen
)
type CircuitBreaker struct {
maxFailures int
timeout time.Duration
state CircuitState
failures int
lastFailTime time.Time
mu sync.Mutex
}
func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
timeout: timeout,
state: StateClosed,
}
}
func (cb *CircuitBreaker) Allow() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case StateClosed:
return true
case StateOpen:
// 检查是否可以尝试恢复
if time.Since(cb.lastFailTime) > cb.timeout {
cb.state = StateHalfOpen
return true
}
return false
case StateHalfOpen:
return true
default:
return false
}
}
func (cb *CircuitBreaker) RecordSuccess() {
cb.mu.Lock()
defer cb.mu.Unlock()
if cb.state == StateHalfOpen {
cb.state = StateClosed
cb.failures = 0
}
}
func (cb *CircuitBreaker) RecordFailure() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failures++
cb.lastFailTime = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = StateOpen
}
}
load_balancer.go:
package main
import (
"sync"
)
type LoadBalancer struct {
backends map[string][]string
indexes map[string]int
mu sync.RWMutex
}
func NewLoadBalancer() *LoadBalancer {
lb := &LoadBalancer{
backends: make(map[string][]string),
indexes: make(map[string]int),
}
// 配置后端实例
lb.backends["http://localhost:8080"] = []string{
"http://localhost:8080",
"http://localhost:8081",
"http://localhost:8082",
}
return lb
}
// 轮询算法
func (lb *LoadBalancer) Select(upstream string) string {
lb.mu.Lock()
defer lb.mu.Unlock()
backends, exists := lb.backends[upstream]
if !exists || len(backends) == 0 {
return upstream
}
index := lb.indexes[upstream]
backend := backends[index]
// 更新索引
lb.indexes[upstream] = (index + 1) % len(backends)
return backend
}
proxy.go:
package main
import (
"io"
"net/http"
"net/url"
)
func NewReverseProxy(targetURL string) http.Handler {
target, _ := url.Parse(targetURL)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 1. 修改请求
r.URL.Scheme = target.Scheme
r.URL.Host = target.Host
r.Host = target.Host
// 2. 创建新请求
proxyReq, err := http.NewRequest(r.Method, r.URL.String(), r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 3. 复制Header
for key, values := range r.Header {
for _, value := range values {
proxyReq.Header.Add(key, value)
}
}
// 4. 发送请求
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(proxyReq)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
defer resp.Body.Close()
// 5. 复制响应Header
for key, values := range resp.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}
// 6. 写入状态码
w.WriteHeader(resp.StatusCode)
// 7. 复制响应Body
io.Copy(w, resp.Body)
})
}
jwt.go:
package main
import (
"errors"
"time"
"github.com/golang-jwt/jwt/v5"
)
var jwtSecret = []byte("your-secret-key")
func ValidateJWT(tokenString string) (string, error) {
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, errors.New("invalid signing method")
}
return jwtSecret, nil
})
if err != nil {
return "", err
}
if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
// 检查过期时间
exp := int64(claims["exp"].(float64))
if time.Now().Unix() > exp {
return "", errors.New("token expired")
}
userID := claims["user_id"].(string)
return userID, nil
}
return "", errors.New("invalid token")
}
func GenerateJWT(userID string) (string, error) {
claims := jwt.MapClaims{
"user_id": userID,
"exp": time.Now().Add(24 * time.Hour).Unix(),
"iat": time.Now().Unix(),
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(jwtSecret)
}
性能对比
测试环境
硬件:
- CPU: 8核
- 内存: 16GB
- 网络: 1Gbps
工具:
- wrk(HTTP压测工具)
后端服务:
- 简单HTTP服务,返回固定JSON
测试结果
1. Kong:
wrk -t8 -c100 -d30s http://localhost:8000/api/users
Requests/sec: 28,453
Latency (avg): 3.51ms
Latency (P99): 12.34ms
2. Nginx + OpenResty:
wrk -t8 -c100 -d30s http://localhost:8000/api/users
Requests/sec: 65,234
Latency (avg): 1.53ms
Latency (P99): 5.67ms
3. 自研网关(Go):
wrk -t8 -c100 -d30s http://localhost:8000/api/users
Requests/sec: 42,156
Latency (avg): 2.37ms
Latency (P99): 8.91ms
结论:
- Nginx + OpenResty性能最高(C语言实现)
- 自研Go网关性能中等(易于开发和维护)
- Kong性能较低(但功能最丰富)
面试问答
如何实现API网关的限流功能?
答案:
限流算法:
1. 令牌桶(Token Bucket)
type TokenBucket struct {
capacity int // 桶容量
tokens int // 当前令牌数
rate int // 生成速率(每秒)
lastTime time.Time
mu sync.Mutex
}
func NewTokenBucket(capacity, rate int) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
rate: rate,
lastTime: time.Now(),
}
}
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
// 1. 计算新生成的令牌
now := time.Now()
elapsed := now.Sub(tb.lastTime).Seconds()
newTokens := int(elapsed * float64(tb.rate))
// 2. 更新令牌数(不超过容量)
tb.tokens = min(tb.capacity, tb.tokens+newTokens)
tb.lastTime = now
// 3. 消耗令牌
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}
2. 漏桶(Leaky Bucket)
type LeakyBucket struct {
capacity int
queue chan struct{}
rate time.Duration
}
func NewLeakyBucket(capacity int, rate time.Duration) *LeakyBucket {
lb := &LeakyBucket{
capacity: capacity,
queue: make(chan struct{}, capacity),
rate: rate,
}
// 启动漏水协程
go lb.leak()
return lb
}
func (lb *LeakyBucket) leak() {
ticker := time.NewTicker(lb.rate)
defer ticker.Stop()
for range ticker.C {
select {
case <-lb.queue:
// 漏掉一个请求
default:
}
}
}
func (lb *LeakyBucket) Allow() bool {
select {
case lb.queue <- struct{}{}:
return true
default:
return false
}
}
3. 滑动窗口
type SlidingWindow struct {
limit int
windowSize time.Duration
requests []time.Time
mu sync.Mutex
}
func NewSlidingWindow(limit int, windowSize time.Duration) *SlidingWindow {
return &SlidingWindow{
limit: limit,
windowSize: windowSize,
requests: []time.Time{},
}
}
func (sw *SlidingWindow) Allow() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
now := time.Now()
// 1. 移除过期请求
cutoff := now.Add(-sw.windowSize)
validRequests := []time.Time{}
for _, req := range sw.requests {
if req.After(cutoff) {
validRequests = append(validRequests, req)
}
}
sw.requests = validRequests
// 2. 检查是否超过限制
if len(sw.requests) >= sw.limit {
return false
}
// 3. 记录请求
sw.requests = append(sw.requests, now)
return true
}
选择建议:
令牌桶:适合突发流量(允许burst)
漏桶:平滑流量(严格控制速率)
滑动窗口:精确统计(资源消耗较高)
API网关如何实现负载均衡?
答案:
负载均衡算法:
1. 轮询(Round Robin)
type RoundRobinBalancer struct {
backends []string
index int
mu sync.Mutex
}
func (rb *RoundRobinBalancer) Select() string {
rb.mu.Lock()
defer rb.mu.Unlock()
backend := rb.backends[rb.index]
rb.index = (rb.index + 1) % len(rb.backends)
return backend
}
2. 加权轮询(Weighted Round Robin)
type WeightedBackend struct {
URL string
Weight int
}
type WeightedRoundRobinBalancer struct {
backends []*WeightedBackend
currentWeights []int
mu sync.Mutex
}
func (wrb *WeightedRoundRobinBalancer) Select() string {
wrb.mu.Lock()
defer wrb.mu.Unlock()
maxWeightIndex := 0
totalWeight := 0
for i, backend := range wrb.backends {
wrb.currentWeights[i] += backend.Weight
totalWeight += backend.Weight
if wrb.currentWeights[i] > wrb.currentWeights[maxWeightIndex] {
maxWeightIndex = i
}
}
wrb.currentWeights[maxWeightIndex] -= totalWeight
return wrb.backends[maxWeightIndex].URL
}
3. 最少连接(Least Connections)
type LeastConnectionsBalancer struct {
backends []string
connections map[string]int
mu sync.Mutex
}
func (lcb *LeastConnectionsBalancer) Select() string {
lcb.mu.Lock()
defer lcb.mu.Unlock()
minConnections := int(^uint(0) >> 1) // max int
selected := ""
for _, backend := range lcb.backends {
connections := lcb.connections[backend]
if connections < minConnections {
minConnections = connections
selected = backend
}
}
lcb.connections[selected]++
return selected
}
func (lcb *LeastConnectionsBalancer) Release(backend string) {
lcb.mu.Lock()
defer lcb.mu.Unlock()
lcb.connections[backend]--
}
4. 一致性哈希(Consistent Hashing)
type ConsistentHashBalancer struct {
ring *hashring.HashRing
backends []string
}
func NewConsistentHashBalancer(backends []string) *ConsistentHashBalancer {
ring := hashring.New(backends)
return &ConsistentHashBalancer{
ring: ring,
backends: backends,
}
}
func (chb *ConsistentHashBalancer) Select(key string) string {
backend, _ := chb.ring.GetNode(key)
return backend
}
如何实现API网关的熔断功能?
答案:
熔断器实现:
type CircuitBreaker struct {
maxFailures int // 失败次数阈值
timeout time.Duration // 熔断超时时间
state CircuitState // 当前状态
failures int // 失败次数
lastFailTime time.Time // 最后失败时间
mu sync.Mutex
}
type CircuitState int
const (
StateClosed CircuitState = iota // 关闭(正常)
StateOpen // 打开(熔断)
StateHalfOpen // 半开(尝试恢复)
)
func (cb *CircuitBreaker) Call(fn func() error) error {
// 1. 检查是否允许请求
if !cb.Allow() {
return errors.New("circuit breaker is open")
}
// 2. 执行请求
err := fn()
// 3. 记录结果
if err != nil {
cb.RecordFailure()
return err
}
cb.RecordSuccess()
return nil
}
func (cb *CircuitBreaker) Allow() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case StateClosed:
return true
case StateOpen:
// 检查是否可以尝试恢复
if time.Since(cb.lastFailTime) > cb.timeout {
cb.state = StateHalfOpen
return true
}
return false
case StateHalfOpen:
return true
default:
return false
}
}
func (cb *CircuitBreaker) RecordSuccess() {
cb.mu.Lock()
defer cb.mu.Unlock()
if cb.state == StateHalfOpen {
cb.state = StateClosed
cb.failures = 0
}
}
func (cb *CircuitBreaker) RecordFailure() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failures++
cb.lastFailTime = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = StateOpen
}
}
使用示例:
cb := NewCircuitBreaker(5, 10*time.Second)
err := cb.Call(func() error {
resp, err := http.Get("http://backend-service/api/data")
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 500 {
return errors.New("server error")
}
return nil
})
if err != nil {
// 降级处理
return getDataFromCache()
}