HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于

从零搭建企业级RAG系统:我踩过的10个坑

一、背景与需求

1.1 项目背景

我们构建的企业级RAG(Retrieval-Augmented Generation)系统需要支持:

  • 文档规模:100万+ 企业文档(PDF、Word、Markdown、代码)
  • 并发查询:1000+ QPS
  • 响应时间:P99 < 2s(检索 + 生成)
  • 准确率要求:Top-5召回率 > 85%

1.2 RAG架构概览

1.3 技术栈

组件技术选型版本
向量数据库Milvus2.3.0
Embedding模型BGE-M3-
LLMGPT-4 / Claude-3-
框架LangChain0.1.0
重排序BGE-Reranker-Large-
文档解析Unstructured0.10.0

二、十个核心踩坑经验

坑1:向量数据库选型误区

问题描述

最初选择了Pinecone(云服务),但遇到:

  • 成本高昂:100万向量/月 $70+
  • 延迟不稳定:P99波动在200-800ms
  • 数据主权:企业数据必须内部部署

三大向量数据库对比测试

测试环境:100万条768维向量,Top-10检索

指标Milvus 2.3Qdrant 1.7Weaviate 1.22
写入性能8000 vec/s6000 vec/s5500 vec/s
查询延迟(P50)15ms18ms22ms
查询延迟(P99)45ms65ms85ms
内存占用2.1GB2.8GB3.5GB
磁盘占用1.8GB2.2GB2.6GB
集群支持✅✅✅
混合检索✅✅✅
多租户✅❌✅
云原生✅⚠️⚠️

最终选择:Milvus

理由:

  1. 性能最优(P99延迟低40%)
  2. 云原生架构,K8s部署友好
  3. 多租户隔离(重要)
  4. 活跃的社区和文档

Milvus生产级部署

# Milvus Helm values.yaml
cluster:
  enabled: true

# 独立组件部署
standalone:
  enabled: false

# etcd集群(元数据存储)
etcd:
  replicaCount: 3
  persistence:
    enabled: true
    size: 50Gi
    storageClass: ssd

# MinIO(对象存储 - 存储向量数据)
minio:
  mode: distributed
  replicas: 4
  persistence:
    enabled: true
    size: 500Gi
    storageClass: ssd

# Pulsar(消息队列)
pulsar:
  enabled: false  # 使用Kafka替代

# Kafka配置(推荐使用外部Kafka)
externalKafka:
  enabled: true
  brokers:
    - kafka-broker-1:9092
    - kafka-broker-2:9092
    - kafka-broker-3:9092

# QueryNode(查询节点)
queryNode:
  replicas: 4
  resources:
    requests:
      cpu: "4"
      memory: 16Gi
    limits:
      cpu: "8"
      memory: 32Gi

# DataNode(数据节点)
dataNode:
  replicas: 3
  resources:
    requests:
      cpu: "2"
      memory: 8Gi
    limits:
      cpu: "4"
      memory: 16Gi

# IndexNode(索引节点)
indexNode:
  replicas: 2
  resources:
    requests:
      cpu: "4"
      memory: 16Gi
    limits:
      cpu: "8"
      memory: 32Gi

# Proxy(代理节点)
proxy:
  replicas: 2
  resources:
    requests:
      cpu: "2"
      memory: 4Gi
    limits:
      cpu: "4"
      memory: 8Gi

# 性能优化配置
config:
  common:
    retentionDuration: 86400  # 24小时
  queryNode:
    gracefulTime: 1000
    cacheSize: 16  # GB
  indexNode:
    buildParallel: 4

坑2:Embedding模型选择失误

问题描述

最初使用OpenAI text-embedding-ada-002:

  • 成本问题:100万次调用 $100
  • 延迟问题:API调用P99 300ms+
  • 中文效果:中文文档召回率只有62%

Embedding模型对比测试

测试数据集:10000条中英文混合企业文档QA对(自建)

模型维度召回率@5召回率@10推理速度部署方式
OpenAI Ada-002153668%79%API云端
BGE-Large-zh102482%91%45 docs/s本地
BGE-M3102488%95%38 docs/s本地
M3E-Large102479%88%50 docs/s本地
GTE-Large-zh102481%90%42 docs/s本地

最终选择:BGE-M3(BAAI/bge-m3)

优势:

  1. 多语言支持:中英文效果均佳
  2. 混合检索:同时支持Dense + Sparse + Multi-Vector
  3. 召回率最高:Top-5达到88%
  4. 本地部署:成本可控

BGE-M3部署实践

# embedding_service.py
from FlagEmbedding import BGEM3FlagModel
import torch
from typing import List, Dict
import numpy as np

class EmbeddingService:
    def __init__(
        self,
        model_path: str = "/models/bge-m3",
        device: str = "cuda" if torch.cuda.is_available() else "cpu",
        batch_size: int = 32,
    ):
        self.model = BGEM3FlagModel(
            model_path,
            use_fp16=True,  # 使用半精度加速
            device=device
        )
        self.batch_size = batch_size

    def encode_queries(
        self,
        queries: List[str],
        max_length: int = 512
    ) -> Dict[str, np.ndarray]:
        """编码查询(支持混合检索)"""
        embeddings = self.model.encode(
            queries,
            batch_size=self.batch_size,
            max_length=max_length,
            return_dense=True,   # Dense向量
            return_sparse=True,  # Sparse向量(类BM25)
            return_colbert_vecs=False  # 不使用ColBERT多向量
        )
        return {
            "dense": embeddings['dense_vecs'],
            "sparse": embeddings['lexical_weights']
        }

    def encode_documents(
        self,
        documents: List[str],
        max_length: int = 8192  # M3支持长文本
    ) -> Dict[str, np.ndarray]:
        """编码文档"""
        return self.model.encode(
            documents,
            batch_size=self.batch_size,
            max_length=max_length,
            return_dense=True,
            return_sparse=True,
            return_colbert_vecs=False
        )

# 批量处理优化
def batch_encode(texts: List[str], batch_size: int = 32):
    """分批处理大量文本"""
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        embeddings = service.encode_documents(batch)
        all_embeddings.append(embeddings)
    return all_embeddings

FastAPI服务化

# main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import uvicorn

app = FastAPI()
embedding_service = EmbeddingService()

class EmbedRequest(BaseModel):
    texts: List[str]
    max_length: int = 512

class EmbedResponse(BaseModel):
    embeddings: List[List[float]]
    dim: int

@app.post("/embed/query", response_model=EmbedResponse)
async def embed_query(request: EmbedRequest):
    try:
        result = embedding_service.encode_queries(
            request.texts,
            max_length=request.max_length
        )
        return EmbedResponse(
            embeddings=result["dense"].tolist(),
            dim=result["dense"].shape[1]
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/embed/document", response_model=EmbedResponse)
async def embed_document(request: EmbedRequest):
    try:
        result = embedding_service.encode_documents(
            request.texts,
            max_length=request.max_length
        )
        return EmbedResponse(
            embeddings=result["dense"].tolist(),
            dim=result["dense"].shape[1]
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080, workers=4)

K8s部署

apiVersion: apps/v1
kind: Deployment
metadata:
  name: embedding-service
spec:
  replicas: 4
  template:
    spec:
      containers:
      - name: embedding
        image: bge-m3-service:v1
        resources:
          requests:
            memory: "8Gi"
            cpu: "4"
          limits:
            memory: "16Gi"
            cpu: "8"
        env:
        - name: BATCH_SIZE
          value: "32"
        - name: MAX_LENGTH
          value: "8192"
        ports:
        - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: embedding-service
spec:
  selector:
    app: embedding-service
  ports:
  - port: 8080
    targetPort: 8080

坑3:文档分块策略不当

问题描述

最初使用固定长度分块(512 tokens),导致:

  • 语义割裂:段落中间被切断
  • 上下文丢失:检索到的片段缺乏上下文
  • 召回率低:关键信息跨chunk分布

分块策略对比

策略召回率@5生成质量实现复杂度适用场景
固定长度65%⭐⭐简单不推荐
句子边界72%⭐⭐⭐简单短文档
段落边界78%⭐⭐⭐⭐中等结构化文档
语义分块85%⭐⭐⭐⭐⭐复杂推荐
滑动窗口+重叠81%⭐⭐⭐⭐中等通用

最佳实践:语义分块 + 上下文增强

语义分块实现

from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict
import re

class SemanticChunker:
    def __init__(
        self,
        chunk_size: int = 800,      # 目标chunk大小
        chunk_overlap: int = 200,   # 重叠大小
        min_chunk_size: int = 100   # 最小chunk大小
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk_size = min_chunk_size

        # 分隔符优先级:段落 > 句子 > 短语 > 单词
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=[
                "\n\n",   # 段落
                "\n",     # 行
                "。",     # 中文句号
                ". ",     # 英文句号
                "! ",     # 感叹号
                "? ",     # 问号
                ";",     # 分号
                ",",     # 逗号
                " ",      # 空格
                ""        # 字符
            ],
            length_function=len,
            is_separator_regex=False,
        )

    def chunk_document(
        self,
        text: str,
        metadata: Dict = None
    ) -> List[Dict]:
        """文档分块"""
        chunks = self.splitter.create_documents(
            texts=[text],
            metadatas=[metadata] if metadata else None
        )

        # 过滤过小的chunk
        chunks = [
            chunk for chunk in chunks
            if len(chunk.page_content) >= self.min_chunk_size
        ]

        # 添加上下文信息
        enhanced_chunks = []
        for i, chunk in enumerate(chunks):
            # 前后各取一个chunk作为扩展上下文
            prev_text = chunks[i-1].page_content if i > 0 else ""
            next_text = chunks[i+1].page_content if i < len(chunks) - 1 else ""

            enhanced_chunk = {
                "content": chunk.page_content,
                "metadata": chunk.metadata or {},
                "chunk_id": i,
                "total_chunks": len(chunks),
                # 上下文窗口
                "context_before": prev_text[-100:] if prev_text else "",
                "context_after": next_text[:100] if next_text else "",
            }
            enhanced_chunks.append(enhanced_chunk)

        return enhanced_chunks

# 针对不同文档类型的分块策略
class AdaptiveChunker:
    """自适应分块器"""

    @staticmethod
    def chunk_markdown(text: str) -> List[Dict]:
        """Markdown文档按标题层级分块"""
        chunks = []
        sections = re.split(r'\n(#{1,6}\s+.+)\n', text)

        for i in range(1, len(sections), 2):
            header = sections[i]
            content = sections[i+1] if i+1 < len(sections) else ""

            chunks.append({
                "content": f"{header}\n{content}",
                "metadata": {
                    "type": "markdown",
                    "header": header.strip("#").strip(),
                    "level": header.count("#")
                }
            })
        return chunks

    @staticmethod
    def chunk_code(text: str, language: str) -> List[Dict]:
        """代码文件按函数/类分块"""
        if language == "python":
            # 使用AST解析Python代码
            import ast
            tree = ast.parse(text)
            chunks = []

            for node in ast.walk(tree):
                if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
                    chunk_text = ast.get_source_segment(text, node)
                    chunks.append({
                        "content": chunk_text,
                        "metadata": {
                            "type": "code",
                            "language": "python",
                            "name": node.name,
                            "node_type": type(node).__name__
                        }
                    })
            return chunks
        else:
            # 其他语言使用简单正则
            return []

    @staticmethod
    def chunk_table(text: str) -> List[Dict]:
        """表格数据分块 - 保持行完整性"""
        lines = text.strip().split('\n')
        header = lines[0]

        chunks = []
        chunk_size = 50  # 每chunk包含50行
        for i in range(1, len(lines), chunk_size):
            chunk_lines = [header] + lines[i:i+chunk_size]
            chunks.append({
                "content": "\n".join(chunk_lines),
                "metadata": {
                    "type": "table",
                    "rows": len(chunk_lines) - 1
                }
            })
        return chunks

坑4:索引配置不合理

问题描述

默认使用FLAT索引,100万向量后查询延迟达到2s+,完全无法使用。

Milvus索引类型对比

索引类型查询延迟召回率内存占用构建时间适用场景
FLAT1800ms100%高无<10万向量
IVF_FLAT45ms95-98%中快通用
IVF_SQ835ms93-96%低快内存受限
IVF_PQ25ms90-95%极低中大规模
HNSW15ms98-99%高慢推荐
ANNOY40ms94-97%中快备选

最终选择:HNSW(Hierarchical Navigable Small World)

配置实践:

from pymilvus import Collection, FieldSchema, CollectionSchema, DataType, connections

# 连接Milvus
connections.connect(
    alias="default",
    host="milvus-proxy",
    port="19530"
)

# 定义Collection Schema
fields = [
    FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=10000),
    FieldSchema(name="metadata", dtype=DataType.JSON),
]

schema = CollectionSchema(
    fields=fields,
    description="Enterprise RAG Collection",
    enable_dynamic_field=True  # 支持动态字段
)

collection = Collection(
    name="enterprise_docs",
    schema=schema,
    using='default',
    shards_num=4  # 4个分片
)

# 创建HNSW索引
index_params = {
    "metric_type": "IP",      # Inner Product(内积)- 适合归一化向量
    "index_type": "HNSW",
    "params": {
        "M": 32,              # 每层最大连接数 (16-64)
        "efConstruction": 200 # 构建时的搜索范围 (100-500)
    }
}

collection.create_index(
    field_name="embedding",
    index_params=index_params
)

# 加载到内存
collection.load()

# 查询参数优化
search_params = {
    "metric_type": "IP",
    "params": {
        "ef": 64  # 查询时的搜索范围 (top_k到500)
    }
}

# 执行检索
results = collection.search(
    data=[query_embedding],
    anns_field="embedding",
    param=search_params,
    limit=10,
    output_fields=["text", "metadata"]
)

索引参数调优指南

# HNSW参数调优脚本
import time
import numpy as np

def tune_hnsw_params(
    collection: Collection,
    test_queries: np.ndarray,  # 测试查询向量
    ground_truth: List[List[str]]  # 真实Top-K结果
):
    """自动调优HNSW参数"""

    # 测试不同的ef值
    ef_values = [16, 32, 64, 128, 256, 512]
    results = []

    for ef in ef_values:
        search_params = {
            "metric_type": "IP",
            "params": {"ef": ef}
        }

        # 测试查询性能
        start_time = time.time()
        search_results = collection.search(
            data=test_queries,
            anns_field="embedding",
            param=search_params,
            limit=10
        )
        latency = (time.time() - start_time) / len(test_queries) * 1000  # ms

        # 计算召回率
        recall = calculate_recall(search_results, ground_truth)

        results.append({
            "ef": ef,
            "latency_ms": latency,
            "recall": recall
        })

        print(f"ef={ef}: latency={latency:.2f}ms, recall={recall:.2%}")

    return results

def calculate_recall(results, ground_truth, k=10):
    """计算Top-K召回率"""
    total_recall = 0
    for i, hits in enumerate(results):
        retrieved = set([hit.id for hit in hits])
        relevant = set(ground_truth[i][:k])
        recall = len(retrieved & relevant) / len(relevant)
        total_recall += recall
    return total_recall / len(results)

调优结果示例:

ef延迟(ms)召回率推荐场景
161292%对延迟极敏感
321596%平衡
641898%生产推荐
1282599%高召回率需求
2564099.5%离线分析

坑5:没有做查询改写和扩展

问题描述

用户查询短且模糊:

  • "去年营收"(缺少公司名、具体时间)
  • "怎么配置"(缺少配置对象)
  • "bug修复"(缺少具体描述)

直接检索效果极差,Top-5召回率仅45%。

查询理解与改写

from typing import List, Dict
import openai

class QueryRewriter:
    """查询改写器"""

    def __init__(self, llm_client):
        self.llm = llm_client

    def rewrite_query(self, query: str, context: Dict = None) -> List[str]:
        """
        生成多个改写版本:
        1. 原始查询
        2. 扩展查询(添加上下文)
        3. 同义改写
        4. 分解子查询
        """
        prompt = f"""
你是一个查询改写专家。请对用户查询进行改写,生成3-5个改写版本。

用户查询:{query}
上下文信息:{context or "无"}

改写要求:
1. 补全缺失的实体和时间信息
2. 使用同义词和相关术语
3. 如果是复合查询,分解为子查询
4. 保持原意,增强检索效果

请以JSON格式返回改写结果:
{{
  "rewrites": ["改写1", "改写2", "改写3", ...]
}}
"""

        response = self.llm.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )

        import json
        result = json.loads(response.choices[0].message.content)
        return result["rewrites"]

    def expand_with_synonyms(self, query: str) -> List[str]:
        """使用同义词扩展查询"""
        # 简化版:使用词典
        synonym_dict = {
            "营收": ["收入", "销售额", "营业额"],
            "配置": ["设置", "参数", "选项"],
            "bug": ["错误", "问题", "故障", "缺陷"],
        }

        expanded = [query]
        for word, synonyms in synonym_dict.items():
            if word in query:
                for syn in synonyms:
                    expanded.append(query.replace(word, syn))

        return list(set(expanded))  # 去重

    def generate_hypothetical_document(self, query: str) -> str:
        """
        HyDE技术:生成假设性文档
        用LLM生成一个"假想的答案文档",然后检索与之相似的真实文档
        """
        prompt = f"""
假设你需要回答这个问题:{query}

请生成一段详细的回答(200-300字),即使你不确定答案的准确性。
这段回答将用于检索相关文档。

回答:
"""
        response = self.llm.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            max_tokens=400
        )

        return response.choices[0].message.content

多查询融合检索

class FusionRetriever:
    """融合检索器 - 合并多个查询的检索结果"""

    def __init__(
        self,
        collection: Collection,
        embedding_service: EmbeddingService
    ):
        self.collection = collection
        self.embedding_service = embedding_service

    def reciprocal_rank_fusion(
        self,
        all_results: List[List[Dict]],
        k: int = 60
    ) -> List[Dict]:
        """
        Reciprocal Rank Fusion (RRF)算法
        融合多个检索结果列表
        """
        # 收集所有文档ID及其分数
        doc_scores = {}

        for results in all_results:
            for rank, doc in enumerate(results):
                doc_id = doc['id']
                # RRF公式:1 / (k + rank)
                score = 1.0 / (k + rank + 1)

                if doc_id in doc_scores:
                    doc_scores[doc_id]['score'] += score
                else:
                    doc_scores[doc_id] = {
                        'id': doc_id,
                        'score': score,
                        'doc': doc
                    }

        # 按分数排序
        ranked = sorted(
            doc_scores.values(),
            key=lambda x: x['score'],
            reverse=True
        )

        return [item['doc'] for item in ranked]

    def retrieve_with_fusion(
        self,
        query: str,
        top_k: int = 10,
        rewrite_count: int = 3
    ) -> List[Dict]:
        """使用查询改写+融合检索"""

        # 1. 生成改写查询
        rewriter = QueryRewriter(llm_client)
        queries = [query] + rewriter.rewrite_query(query)[:rewrite_count]

        # 2. 对每个查询进行检索
        all_results = []
        for q in queries:
            # 向量化查询
            q_embedding = self.embedding_service.encode_queries([q])

            # 检索
            results = self.collection.search(
                data=[q_embedding["dense"][0]],
                anns_field="embedding",
                param={"metric_type": "IP", "params": {"ef": 64}},
                limit=top_k * 2  # 每个查询多检索一些
            )
            all_results.append(results[0])

        # 3. 融合结果
        fused_results = self.reciprocal_rank_fusion(all_results, k=60)

        return fused_results[:top_k]

坑6:忽视了重排序Rerank

问题描述

向量检索的Top-10结果中,真正相关的可能只在第5-10位,直接送给LLM会引入噪声。

Rerank模型对比

模型MRR@10延迟(ms)适用场景
无Rerank0.620基准
BM250.685轻量级
BGE-Reranker-Base0.7845平衡
BGE-Reranker-Large0.8485推荐
Cross-Encoder0.82120高精度

选择:BGE-Reranker-Large

Reranker实现

from FlagEmbedding import FlagReranker
from typing import List, Tuple

class RerankerService:
    def __init__(self, model_path: str = "/models/bge-reranker-large"):
        self.reranker = FlagReranker(
            model_path,
            use_fp16=True,
            device="cuda"
        )

    def rerank(
        self,
        query: str,
        documents: List[str],
        top_k: int = 5
    ) -> List[Tuple[int, float]]:
        """
        重排序文档
        返回:[(doc_idx, score), ...]
        """
        # 构造query-doc对
        pairs = [[query, doc] for doc in documents]

        # 计算相关性分数
        scores = self.reranker.compute_score(
            pairs,
            batch_size=32,
            max_length=1024
        )

        # 排序并返回Top-K
        scored_docs = list(enumerate(scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        return scored_docs[:top_k]

# 集成到检索流程
class RAGRetriever:
    def __init__(
        self,
        collection: Collection,
        embedding_service: EmbeddingService,
        reranker: RerankerService
    ):
        self.collection = collection
        self.embedding_service = embedding_service
        self.reranker = reranker

    def retrieve(
        self,
        query: str,
        top_k: int = 5,
        rerank_top_n: int = 20
    ) -> List[Dict]:
        """
        两阶段检索:
        1. 向量检索召回Top-N (N=20)
        2. Rerank精排得到Top-K (K=5)
        """
        # 第一阶段:向量检索
        q_embedding = self.embedding_service.encode_queries([query])
        vector_results = self.collection.search(
            data=[q_embedding["dense"][0]],
            anns_field="embedding",
            param={"metric_type": "IP", "params": {"ef": 64}},
            limit=rerank_top_n,
            output_fields=["text", "metadata"]
        )[0]

        # 提取文档文本
        documents = [hit.entity.get("text") for hit in vector_results]

        # 第二阶段:Rerank
        reranked_indices = self.reranker.rerank(
            query=query,
            documents=documents,
            top_k=top_k
        )

        # 返回重排序后的结果
        final_results = []
        for idx, score in reranked_indices:
            hit = vector_results[idx]
            final_results.append({
                "id": hit.id,
                "text": hit.entity.get("text"),
                "metadata": hit.entity.get("metadata"),
                "vector_score": hit.distance,
                "rerank_score": score,
                "final_rank": len(final_results) + 1
            })

        return final_results

效果提升:

  • MRR@5:0.62 → 0.84(+35%)
  • 生成质量主观评分:3.2 → 4.5(5分制)

坑7:上下文窗口管理不当

问题描述

  • 上下文过长:塞入10个chunk(8000 tokens),LLM成本高、延迟大
  • 上下文过短:只给2个chunk,信息不足导致回答质量差

动态上下文选择

class ContextBuilder:
    """智能上下文构建器"""

    def __init__(
        self,
        max_tokens: int = 4000,  # 最大上下文长度
        min_chunks: int = 2,      # 最少chunk数
        max_chunks: int = 8       # 最多chunk数
    ):
        self.max_tokens = max_tokens
        self.min_chunks = min_chunks
        self.max_chunks = max_chunks

    def build_context(
        self,
        query: str,
        chunks: List[Dict],
        tokenizer
    ) -> str:
        """
        智能构建上下文:
        1. 按相关性分数排序
        2. 贪心选择,直到达到token上限
        3. 添加chunk间的连接信息
        """
        selected_chunks = []
        total_tokens = 0

        # 预留给query和系统提示的token数
        query_tokens = len(tokenizer.encode(query))
        system_tokens = 200  # 估算
        available_tokens = self.max_tokens - query_tokens - system_tokens

        for chunk in chunks[:self.max_chunks]:
            chunk_text = chunk['text']
            chunk_tokens = len(tokenizer.encode(chunk_text))

            # 检查是否超出限制
            if total_tokens + chunk_tokens > available_tokens:
                if len(selected_chunks) >= self.min_chunks:
                    break  # 已有足够chunk,停止添加
                else:
                    # 截断chunk以满足最小数量要求
                    remaining_tokens = available_tokens - total_tokens
                    chunk_text = self.truncate_text(
                        chunk_text,
                        remaining_tokens,
                        tokenizer
                    )
                    chunk_tokens = len(tokenizer.encode(chunk_text))

            selected_chunks.append({
                **chunk,
                'text': chunk_text
            })
            total_tokens += chunk_tokens

        # 格式化上下文
        context = self.format_context(selected_chunks)
        return context

    def format_context(self, chunks: List[Dict]) -> str:
        """格式化上下文,添加元数据"""
        formatted_parts = []

        for i, chunk in enumerate(chunks):
            metadata = chunk.get('metadata', {})
            source = metadata.get('source', '未知来源')
            chunk_id = metadata.get('chunk_id', i)

            part = f"""
[文档片段 {i+1}/{len(chunks)}]
来源:{source}
片段ID:{chunk_id}
相关性得分:{chunk.get('rerank_score', 0):.3f}

内容:
{chunk['text']}

---
"""
            formatted_parts.append(part)

        return "\n".join(formatted_parts)

    def truncate_text(
        self,
        text: str,
        max_tokens: int,
        tokenizer
    ) -> str:
        """截断文本至指定token数"""
        tokens = tokenizer.encode(text)
        if len(tokens) <= max_tokens:
            return text

        # 截断并解码
        truncated_tokens = tokens[:max_tokens]
        truncated_text = tokenizer.decode(truncated_tokens)

        # 尝试在句子边界截断
        sentences = truncated_text.split('。')
        if len(sentences) > 1:
            return '。'.join(sentences[:-1]) + '。'

        return truncated_text

坑8:缺少混合检索

问题描述

纯向量检索对精确匹配(如产品型号、代码函数名)效果差。

例如查询"sklearn.metrics.accuracy_score",向量检索可能返回无关的metrics函数。

混合检索方案

class HybridRetriever:
    """混合检索:向量检索 + BM25"""

    def __init__(
        self,
        collection: Collection,
        embedding_service: EmbeddingService
    ):
        self.collection = collection
        self.embedding_service = embedding_service

        # 初始化BM25
        from rank_bm25 import BM25Okapi
        import jieba

        # 加载文档语料(需要预先构建)
        self.documents = self._load_documents()
        self.tokenized_docs = [
            list(jieba.cut(doc['text']))
            for doc in self.documents
        ]
        self.bm25 = BM25Okapi(self.tokenized_docs)

    def hybrid_search(
        self,
        query: str,
        top_k: int = 10,
        alpha: float = 0.5  # 向量检索权重
    ) -> List[Dict]:
        """
        混合检索
        alpha=1.0: 纯向量检索
        alpha=0.0: 纯BM25
        alpha=0.5: 平衡
        """
        # 1. 向量检索
        q_embedding = self.embedding_service.encode_queries([query])
        vector_results = self.collection.search(
            data=[q_embedding["dense"][0]],
            anns_field="embedding",
            param={"metric_type": "IP", "params": {"ef": 64}},
            limit=top_k * 2
        )[0]

        # 归一化向量分数 (0-1)
        vector_scores = {}
        if len(vector_results) > 0:
            max_score = max(hit.distance for hit in vector_results)
            min_score = min(hit.distance for hit in vector_results)
            score_range = max_score - min_score if max_score != min_score else 1

            for hit in vector_results:
                normalized = (hit.distance - min_score) / score_range
                vector_scores[hit.id] = normalized

        # 2. BM25检索
        import jieba
        query_tokens = list(jieba.cut(query))
        bm25_scores_raw = self.bm25.get_scores(query_tokens)

        # 归一化BM25分数
        max_bm25 = max(bm25_scores_raw) if len(bm25_scores_raw) > 0 else 1
        bm25_scores = {
            doc['id']: score / max_bm25
            for doc, score in zip(self.documents, bm25_scores_raw)
        }

        # 3. 融合分数
        all_doc_ids = set(vector_scores.keys()) | set(bm25_scores.keys())
        hybrid_scores = {}

        for doc_id in all_doc_ids:
            vec_score = vector_scores.get(doc_id, 0)
            bm25_score = bm25_scores.get(doc_id, 0)

            # 加权融合
            hybrid_scores[doc_id] = (
                alpha * vec_score + (1 - alpha) * bm25_score
            )

        # 4. 排序并返回Top-K
        ranked_ids = sorted(
            hybrid_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_k]

        # 获取文档详情
        results = []
        for doc_id, score in ranked_ids:
            doc = next((d for d in self.documents if d['id'] == doc_id), None)
            if doc:
                results.append({
                    **doc,
                    'hybrid_score': score,
                    'vector_score': vector_scores.get(doc_id, 0),
                    'bm25_score': bm25_scores.get(doc_id, 0)
                })

        return results

效果对比(精确匹配查询):

查询类型纯向量纯BM25混合(α=0.5)
精确匹配68%92%95%
语义查询88%71%91%
综合78%81%93%

坑9:没有处理多模态文档

问题描述

企业文档包含大量表格、图片、公式,纯文本解析丢失了关键信息。

多模态解析方案

from unstructured.partition.auto import partition
from unstructured.staging.base import elements_to_json
from PIL import Image
import pytesseract

class MultimodalDocumentParser:
    """多模态文档解析器"""

    def parse_document(self, file_path: str) -> List[Dict]:
        """解析文档(支持PDF、Word、PPT等)"""

        # 使用Unstructured库自动解析
        elements = partition(filename=file_path)

        chunks = []
        for element in elements:
            element_type = type(element).__name__

            chunk = {
                "type": element_type,
                "text": str(element),
                "metadata": element.metadata.to_dict()
            }

            # 特殊处理不同元素类型
            if element_type == "Table":
                chunk["table_data"] = self.parse_table(element)
            elif element_type == "Image":
                chunk["image_description"] = self.describe_image(element)

            chunks.append(chunk)

        return chunks

    def parse_table(self, table_element) -> Dict:
        """解析表格为结构化数据"""
        # 转换为HTML
        html = table_element.metadata.text_as_html

        # 使用pandas解析
        import pandas as pd
        df = pd.read_html(html)[0]

        return {
            "html": html,
            "dataframe": df.to_dict(),
            "markdown": df.to_markdown(),  # 用于LLM
            "summary": self.summarize_table(df)
        }

    def summarize_table(self, df) -> str:
        """生成表格摘要"""
        rows, cols = df.shape
        columns = ", ".join(df.columns.tolist())

        summary = f"表格包含{rows}行{cols}列。列名:{columns}。"

        # 提取关键统计信息
        numeric_cols = df.select_dtypes(include=['number']).columns
        if len(numeric_cols) > 0:
            stats = df[numeric_cols].describe().to_string()
            summary += f"\n数值列统计:\n{stats}"

        return summary

    def describe_image(self, image_element) -> str:
        """
        描述图片内容(使用OCR或多模态LLM)
        """
        image_path = image_element.metadata.filename

        # 方案1:OCR提取文字
        image = Image.open(image_path)
        ocr_text = pytesseract.image_to_string(image, lang='chi_sim+eng')

        # 方案2:使用GPT-4V描述图片(生产环境推荐)
        # image_description = self.describe_with_gpt4v(image_path)

        return f"图片OCR文字:{ocr_text}"

    def describe_with_gpt4v(self, image_path: str) -> str:
        """使用GPT-4V描述图片"""
        import base64

        with open(image_path, "rb") as f:
            image_data = base64.b64encode(f.read()).decode()

        response = openai.chat.completions.create(
            model="gpt-4-vision-preview",
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": "请详细描述这张图片的内容,包括文字、图表、关键信息。"
                        },
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{image_data}"
                            }
                        }
                    ]
                }
            ],
            max_tokens=500
        )

        return response.choices[0].message.content

坑10:缺少评估和监控

问题描述

没有评估指标,无法量化RAG系统的效果和发现问题。

评估框架

from dataclasses import dataclass
from typing import List, Dict
import numpy as np

@dataclass
class RAGEvaluation:
    """RAG评估指标"""
    recall_at_k: float       # 召回率@K
    mrr: float               # Mean Reciprocal Rank
    ndcg: float              # Normalized Discounted Cumulative Gain
    precision_at_k: float    # 精确率@K
    answer_quality: float    # 答案质量(人工评分)
    latency_p50: float       # 延迟P50
    latency_p99: float       # 延迟P99

class RAGEvaluator:
    """RAG系统评估器"""

    def evaluate_retrieval(
        self,
        queries: List[str],
        ground_truth: List[List[str]],  # 每个query的相关文档ID列表
        retrieval_results: List[List[str]],  # 检索结果ID列表
        k: int = 5
    ) -> Dict[str, float]:
        """评估检索效果"""

        recalls = []
        mrrs = []
        ndcgs = []
        precisions = []

        for gt, results in zip(ground_truth, retrieval_results):
            gt_set = set(gt[:k])
            results_set = set(results[:k])

            # Recall@K
            recall = len(gt_set & results_set) / len(gt_set) if gt_set else 0
            recalls.append(recall)

            # MRR
            for i, doc_id in enumerate(results):
                if doc_id in gt_set:
                    mrrs.append(1.0 / (i + 1))
                    break
            else:
                mrrs.append(0)

            # NDCG@K
            ndcg = self.calculate_ndcg(results, gt, k)
            ndcgs.append(ndcg)

            # Precision@K
            precision = len(gt_set & results_set) / k
            precisions.append(precision)

        return {
            "recall@k": np.mean(recalls),
            "mrr": np.mean(mrrs),
            "ndcg@k": np.mean(ndcgs),
            "precision@k": np.mean(precisions)
        }

    def calculate_ndcg(
        self,
        results: List[str],
        ground_truth: List[str],
        k: int
    ) -> float:
        """计算NDCG@K"""
        dcg = 0.0
        for i, doc_id in enumerate(results[:k]):
            if doc_id in ground_truth:
                # 相关性得分:在ground_truth中的排名越靠前,得分越高
                relevance = len(ground_truth) - ground_truth.index(doc_id)
                dcg += relevance / np.log2(i + 2)

        # Ideal DCG
        idcg = 0.0
        for i in range(min(k, len(ground_truth))):
            relevance = len(ground_truth) - i
            idcg += relevance / np.log2(i + 2)

        return dcg / idcg if idcg > 0 else 0

    def evaluate_end_to_end(
        self,
        test_cases: List[Dict]  # {"query": str, "expected_answer": str}
    ) -> Dict[str, float]:
        """端到端评估(使用LLM评判)"""

        scores = []
        for case in test_cases:
            query = case["query"]
            expected = case["expected_answer"]

            # 运行RAG系统
            actual = self.rag_system.query(query)

            # 使用LLM评分
            score = self.llm_judge(query, expected, actual)
            scores.append(score)

        return {
            "average_quality_score": np.mean(scores),
            "pass_rate": sum(1 for s in scores if s >= 0.7) / len(scores)
        }

    def llm_judge(
        self,
        query: str,
        expected: str,
        actual: str
    ) -> float:
        """使用LLM评判答案质量"""

        prompt = f"""
请评估以下答案的质量(0-1分):

问题:{query}

标准答案:{expected}

实际答案:{actual}

评分标准:
- 0.9-1.0: 完全正确,信息完整
- 0.7-0.9: 基本正确,细节略有不足
- 0.5-0.7: 部分正确
- 0.3-0.5: 包含错误信息
- 0.0-0.3: 完全错误

请只返回数字分数。
"""

        response = openai.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.1
        )

        try:
            score = float(response.choices[0].message.content.strip())
            return max(0.0, min(1.0, score))
        except:
            return 0.5  # 默认分数

生产监控

# Prometheus监控指标
from prometheus_client import Counter, Histogram, Gauge

# 请求计数
rag_requests_total = Counter(
    'rag_requests_total',
    'Total RAG requests',
    ['status']
)

# 延迟分布
rag_latency = Histogram(
    'rag_latency_seconds',
    'RAG request latency',
    ['stage'],  # retrieval, rerank, generation
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0]
)

# 检索质量
rag_retrieval_quality = Gauge(
    'rag_retrieval_quality',
    'Retrieval quality score',
    ['metric']  # recall, mrr, ndcg
)

# 向量库大小
vector_db_size = Gauge(
    'vector_db_total_vectors',
    'Total vectors in database'
)

# 使用示例
def query_with_monitoring(query: str):
    rag_requests_total.labels(status='started').inc()

    try:
        # 检索阶段
        with rag_latency.labels(stage='retrieval').time():
            docs = retriever.retrieve(query)

        # Rerank阶段
        with rag_latency.labels(stage='rerank').time():
            ranked_docs = reranker.rerank(query, docs)

        # 生成阶段
        with rag_latency.labels(stage='generation').time():
            answer = llm.generate(query, ranked_docs)

        rag_requests_total.labels(status='success').inc()
        return answer

    except Exception as e:
        rag_requests_total.labels(status='error').inc()
        raise

三、生产级架构总结

3.1 完整架构图

┌─────────────────────────────────────────────────────────────────┐
│                         用户查询入口                              │
└───────────────────────────┬─────────────────────────────────────┘
                            │
                    ┌───────▼────────┐
                    │  Query Rewriter │
                    │  (查询改写)      │
                    └───────┬────────┘
                            │
            ┌───────────────┼───────────────┐
            │               │               │
    ┌───────▼─────┐  ┌─────▼──────┐  ┌────▼─────┐
    │ Vector Search│  │ BM25 Search │  │ HyDE     │
    └───────┬─────┘  └─────┬──────┘  └────┬─────┘
            │               │               │
            └───────────────┼───────────────┘
                    ┌───────▼────────┐
                    │  Fusion (RRF)   │
                    └───────┬────────┘
                    ┌───────▼────────┐
                    │  Reranker       │
                    │  (重排序)        │
                    └───────┬────────┘
                    ┌───────▼────────┐
                    │ Context Builder │
                    └───────┬────────┘
                    ┌───────▼────────┐
                    │  LLM Generator  │
                    └───────┬────────┘
                    ┌───────▼────────┐
                    │  返回结果       │
                    └────────────────┘

3.2 关键指标总结

指标优化前优化后提升
召回率@545%88%+96%
MRR0.520.84+62%
端到端延迟(P99)3.5s1.8s-49%
答案质量评分3.24.5+41%
月成本$5000$800-84%

四、参考资源

  • Milvus官方文档
  • BGE Embedding
  • LangChain
  • Unstructured