从零搭建企业级RAG系统:我踩过的10个坑
一、背景与需求
1.1 项目背景
我们构建的企业级RAG(Retrieval-Augmented Generation)系统需要支持:
- 文档规模:100万+ 企业文档(PDF、Word、Markdown、代码)
- 并发查询:1000+ QPS
- 响应时间:P99 < 2s(检索 + 生成)
- 准确率要求:Top-5召回率 > 85%
1.2 RAG架构概览
1.3 技术栈
| 组件 | 技术选型 | 版本 |
|---|---|---|
| 向量数据库 | Milvus | 2.3.0 |
| Embedding模型 | BGE-M3 | - |
| LLM | GPT-4 / Claude-3 | - |
| 框架 | LangChain | 0.1.0 |
| 重排序 | BGE-Reranker-Large | - |
| 文档解析 | Unstructured | 0.10.0 |
二、十个核心踩坑经验
坑1:向量数据库选型误区
问题描述
最初选择了Pinecone(云服务),但遇到:
- 成本高昂:100万向量/月 $70+
- 延迟不稳定:P99波动在200-800ms
- 数据主权:企业数据必须内部部署
三大向量数据库对比测试
测试环境:100万条768维向量,Top-10检索
| 指标 | Milvus 2.3 | Qdrant 1.7 | Weaviate 1.22 |
|---|---|---|---|
| 写入性能 | 8000 vec/s | 6000 vec/s | 5500 vec/s |
| 查询延迟(P50) | 15ms | 18ms | 22ms |
| 查询延迟(P99) | 45ms | 65ms | 85ms |
| 内存占用 | 2.1GB | 2.8GB | 3.5GB |
| 磁盘占用 | 1.8GB | 2.2GB | 2.6GB |
| 集群支持 | ✅ | ✅ | ✅ |
| 混合检索 | ✅ | ✅ | ✅ |
| 多租户 | ✅ | ❌ | ✅ |
| 云原生 | ✅ | ⚠️ | ⚠️ |
最终选择:Milvus
理由:
- 性能最优(P99延迟低40%)
- 云原生架构,K8s部署友好
- 多租户隔离(重要)
- 活跃的社区和文档
Milvus生产级部署
# Milvus Helm values.yaml
cluster:
enabled: true
# 独立组件部署
standalone:
enabled: false
# etcd集群(元数据存储)
etcd:
replicaCount: 3
persistence:
enabled: true
size: 50Gi
storageClass: ssd
# MinIO(对象存储 - 存储向量数据)
minio:
mode: distributed
replicas: 4
persistence:
enabled: true
size: 500Gi
storageClass: ssd
# Pulsar(消息队列)
pulsar:
enabled: false # 使用Kafka替代
# Kafka配置(推荐使用外部Kafka)
externalKafka:
enabled: true
brokers:
- kafka-broker-1:9092
- kafka-broker-2:9092
- kafka-broker-3:9092
# QueryNode(查询节点)
queryNode:
replicas: 4
resources:
requests:
cpu: "4"
memory: 16Gi
limits:
cpu: "8"
memory: 32Gi
# DataNode(数据节点)
dataNode:
replicas: 3
resources:
requests:
cpu: "2"
memory: 8Gi
limits:
cpu: "4"
memory: 16Gi
# IndexNode(索引节点)
indexNode:
replicas: 2
resources:
requests:
cpu: "4"
memory: 16Gi
limits:
cpu: "8"
memory: 32Gi
# Proxy(代理节点)
proxy:
replicas: 2
resources:
requests:
cpu: "2"
memory: 4Gi
limits:
cpu: "4"
memory: 8Gi
# 性能优化配置
config:
common:
retentionDuration: 86400 # 24小时
queryNode:
gracefulTime: 1000
cacheSize: 16 # GB
indexNode:
buildParallel: 4
坑2:Embedding模型选择失误
问题描述
最初使用OpenAI text-embedding-ada-002:
- 成本问题:100万次调用 $100
- 延迟问题:API调用P99 300ms+
- 中文效果:中文文档召回率只有62%
Embedding模型对比测试
测试数据集:10000条中英文混合企业文档QA对(自建)
| 模型 | 维度 | 召回率@5 | 召回率@10 | 推理速度 | 部署方式 |
|---|---|---|---|---|---|
| OpenAI Ada-002 | 1536 | 68% | 79% | API | 云端 |
| BGE-Large-zh | 1024 | 82% | 91% | 45 docs/s | 本地 |
| BGE-M3 | 1024 | 88% | 95% | 38 docs/s | 本地 |
| M3E-Large | 1024 | 79% | 88% | 50 docs/s | 本地 |
| GTE-Large-zh | 1024 | 81% | 90% | 42 docs/s | 本地 |
最终选择:BGE-M3(BAAI/bge-m3)
优势:
- 多语言支持:中英文效果均佳
- 混合检索:同时支持Dense + Sparse + Multi-Vector
- 召回率最高:Top-5达到88%
- 本地部署:成本可控
BGE-M3部署实践
# embedding_service.py
from FlagEmbedding import BGEM3FlagModel
import torch
from typing import List, Dict
import numpy as np
class EmbeddingService:
def __init__(
self,
model_path: str = "/models/bge-m3",
device: str = "cuda" if torch.cuda.is_available() else "cpu",
batch_size: int = 32,
):
self.model = BGEM3FlagModel(
model_path,
use_fp16=True, # 使用半精度加速
device=device
)
self.batch_size = batch_size
def encode_queries(
self,
queries: List[str],
max_length: int = 512
) -> Dict[str, np.ndarray]:
"""编码查询(支持混合检索)"""
embeddings = self.model.encode(
queries,
batch_size=self.batch_size,
max_length=max_length,
return_dense=True, # Dense向量
return_sparse=True, # Sparse向量(类BM25)
return_colbert_vecs=False # 不使用ColBERT多向量
)
return {
"dense": embeddings['dense_vecs'],
"sparse": embeddings['lexical_weights']
}
def encode_documents(
self,
documents: List[str],
max_length: int = 8192 # M3支持长文本
) -> Dict[str, np.ndarray]:
"""编码文档"""
return self.model.encode(
documents,
batch_size=self.batch_size,
max_length=max_length,
return_dense=True,
return_sparse=True,
return_colbert_vecs=False
)
# 批量处理优化
def batch_encode(texts: List[str], batch_size: int = 32):
"""分批处理大量文本"""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
embeddings = service.encode_documents(batch)
all_embeddings.append(embeddings)
return all_embeddings
FastAPI服务化
# main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import uvicorn
app = FastAPI()
embedding_service = EmbeddingService()
class EmbedRequest(BaseModel):
texts: List[str]
max_length: int = 512
class EmbedResponse(BaseModel):
embeddings: List[List[float]]
dim: int
@app.post("/embed/query", response_model=EmbedResponse)
async def embed_query(request: EmbedRequest):
try:
result = embedding_service.encode_queries(
request.texts,
max_length=request.max_length
)
return EmbedResponse(
embeddings=result["dense"].tolist(),
dim=result["dense"].shape[1]
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/embed/document", response_model=EmbedResponse)
async def embed_document(request: EmbedRequest):
try:
result = embedding_service.encode_documents(
request.texts,
max_length=request.max_length
)
return EmbedResponse(
embeddings=result["dense"].tolist(),
dim=result["dense"].shape[1]
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080, workers=4)
K8s部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: embedding-service
spec:
replicas: 4
template:
spec:
containers:
- name: embedding
image: bge-m3-service:v1
resources:
requests:
memory: "8Gi"
cpu: "4"
limits:
memory: "16Gi"
cpu: "8"
env:
- name: BATCH_SIZE
value: "32"
- name: MAX_LENGTH
value: "8192"
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
name: embedding-service
spec:
selector:
app: embedding-service
ports:
- port: 8080
targetPort: 8080
坑3:文档分块策略不当
问题描述
最初使用固定长度分块(512 tokens),导致:
- 语义割裂:段落中间被切断
- 上下文丢失:检索到的片段缺乏上下文
- 召回率低:关键信息跨chunk分布
分块策略对比
| 策略 | 召回率@5 | 生成质量 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|
| 固定长度 | 65% | ⭐⭐ | 简单 | 不推荐 |
| 句子边界 | 72% | ⭐⭐⭐ | 简单 | 短文档 |
| 段落边界 | 78% | ⭐⭐⭐⭐ | 中等 | 结构化文档 |
| 语义分块 | 85% | ⭐⭐⭐⭐⭐ | 复杂 | 推荐 |
| 滑动窗口+重叠 | 81% | ⭐⭐⭐⭐ | 中等 | 通用 |
最佳实践:语义分块 + 上下文增强
语义分块实现
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict
import re
class SemanticChunker:
def __init__(
self,
chunk_size: int = 800, # 目标chunk大小
chunk_overlap: int = 200, # 重叠大小
min_chunk_size: int = 100 # 最小chunk大小
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
# 分隔符优先级:段落 > 句子 > 短语 > 单词
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=[
"\n\n", # 段落
"\n", # 行
"。", # 中文句号
". ", # 英文句号
"! ", # 感叹号
"? ", # 问号
";", # 分号
",", # 逗号
" ", # 空格
"" # 字符
],
length_function=len,
is_separator_regex=False,
)
def chunk_document(
self,
text: str,
metadata: Dict = None
) -> List[Dict]:
"""文档分块"""
chunks = self.splitter.create_documents(
texts=[text],
metadatas=[metadata] if metadata else None
)
# 过滤过小的chunk
chunks = [
chunk for chunk in chunks
if len(chunk.page_content) >= self.min_chunk_size
]
# 添加上下文信息
enhanced_chunks = []
for i, chunk in enumerate(chunks):
# 前后各取一个chunk作为扩展上下文
prev_text = chunks[i-1].page_content if i > 0 else ""
next_text = chunks[i+1].page_content if i < len(chunks) - 1 else ""
enhanced_chunk = {
"content": chunk.page_content,
"metadata": chunk.metadata or {},
"chunk_id": i,
"total_chunks": len(chunks),
# 上下文窗口
"context_before": prev_text[-100:] if prev_text else "",
"context_after": next_text[:100] if next_text else "",
}
enhanced_chunks.append(enhanced_chunk)
return enhanced_chunks
# 针对不同文档类型的分块策略
class AdaptiveChunker:
"""自适应分块器"""
@staticmethod
def chunk_markdown(text: str) -> List[Dict]:
"""Markdown文档按标题层级分块"""
chunks = []
sections = re.split(r'\n(#{1,6}\s+.+)\n', text)
for i in range(1, len(sections), 2):
header = sections[i]
content = sections[i+1] if i+1 < len(sections) else ""
chunks.append({
"content": f"{header}\n{content}",
"metadata": {
"type": "markdown",
"header": header.strip("#").strip(),
"level": header.count("#")
}
})
return chunks
@staticmethod
def chunk_code(text: str, language: str) -> List[Dict]:
"""代码文件按函数/类分块"""
if language == "python":
# 使用AST解析Python代码
import ast
tree = ast.parse(text)
chunks = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
chunk_text = ast.get_source_segment(text, node)
chunks.append({
"content": chunk_text,
"metadata": {
"type": "code",
"language": "python",
"name": node.name,
"node_type": type(node).__name__
}
})
return chunks
else:
# 其他语言使用简单正则
return []
@staticmethod
def chunk_table(text: str) -> List[Dict]:
"""表格数据分块 - 保持行完整性"""
lines = text.strip().split('\n')
header = lines[0]
chunks = []
chunk_size = 50 # 每chunk包含50行
for i in range(1, len(lines), chunk_size):
chunk_lines = [header] + lines[i:i+chunk_size]
chunks.append({
"content": "\n".join(chunk_lines),
"metadata": {
"type": "table",
"rows": len(chunk_lines) - 1
}
})
return chunks
坑4:索引配置不合理
问题描述
默认使用FLAT索引,100万向量后查询延迟达到2s+,完全无法使用。
Milvus索引类型对比
| 索引类型 | 查询延迟 | 召回率 | 内存占用 | 构建时间 | 适用场景 |
|---|---|---|---|---|---|
| FLAT | 1800ms | 100% | 高 | 无 | <10万向量 |
| IVF_FLAT | 45ms | 95-98% | 中 | 快 | 通用 |
| IVF_SQ8 | 35ms | 93-96% | 低 | 快 | 内存受限 |
| IVF_PQ | 25ms | 90-95% | 极低 | 中 | 大规模 |
| HNSW | 15ms | 98-99% | 高 | 慢 | 推荐 |
| ANNOY | 40ms | 94-97% | 中 | 快 | 备选 |
最终选择:HNSW(Hierarchical Navigable Small World)
配置实践:
from pymilvus import Collection, FieldSchema, CollectionSchema, DataType, connections
# 连接Milvus
connections.connect(
alias="default",
host="milvus-proxy",
port="19530"
)
# 定义Collection Schema
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=10000),
FieldSchema(name="metadata", dtype=DataType.JSON),
]
schema = CollectionSchema(
fields=fields,
description="Enterprise RAG Collection",
enable_dynamic_field=True # 支持动态字段
)
collection = Collection(
name="enterprise_docs",
schema=schema,
using='default',
shards_num=4 # 4个分片
)
# 创建HNSW索引
index_params = {
"metric_type": "IP", # Inner Product(内积)- 适合归一化向量
"index_type": "HNSW",
"params": {
"M": 32, # 每层最大连接数 (16-64)
"efConstruction": 200 # 构建时的搜索范围 (100-500)
}
}
collection.create_index(
field_name="embedding",
index_params=index_params
)
# 加载到内存
collection.load()
# 查询参数优化
search_params = {
"metric_type": "IP",
"params": {
"ef": 64 # 查询时的搜索范围 (top_k到500)
}
}
# 执行检索
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=10,
output_fields=["text", "metadata"]
)
索引参数调优指南
# HNSW参数调优脚本
import time
import numpy as np
def tune_hnsw_params(
collection: Collection,
test_queries: np.ndarray, # 测试查询向量
ground_truth: List[List[str]] # 真实Top-K结果
):
"""自动调优HNSW参数"""
# 测试不同的ef值
ef_values = [16, 32, 64, 128, 256, 512]
results = []
for ef in ef_values:
search_params = {
"metric_type": "IP",
"params": {"ef": ef}
}
# 测试查询性能
start_time = time.time()
search_results = collection.search(
data=test_queries,
anns_field="embedding",
param=search_params,
limit=10
)
latency = (time.time() - start_time) / len(test_queries) * 1000 # ms
# 计算召回率
recall = calculate_recall(search_results, ground_truth)
results.append({
"ef": ef,
"latency_ms": latency,
"recall": recall
})
print(f"ef={ef}: latency={latency:.2f}ms, recall={recall:.2%}")
return results
def calculate_recall(results, ground_truth, k=10):
"""计算Top-K召回率"""
total_recall = 0
for i, hits in enumerate(results):
retrieved = set([hit.id for hit in hits])
relevant = set(ground_truth[i][:k])
recall = len(retrieved & relevant) / len(relevant)
total_recall += recall
return total_recall / len(results)
调优结果示例:
| ef | 延迟(ms) | 召回率 | 推荐场景 |
|---|---|---|---|
| 16 | 12 | 92% | 对延迟极敏感 |
| 32 | 15 | 96% | 平衡 |
| 64 | 18 | 98% | 生产推荐 |
| 128 | 25 | 99% | 高召回率需求 |
| 256 | 40 | 99.5% | 离线分析 |
坑5:没有做查询改写和扩展
问题描述
用户查询短且模糊:
- "去年营收"(缺少公司名、具体时间)
- "怎么配置"(缺少配置对象)
- "bug修复"(缺少具体描述)
直接检索效果极差,Top-5召回率仅45%。
查询理解与改写
from typing import List, Dict
import openai
class QueryRewriter:
"""查询改写器"""
def __init__(self, llm_client):
self.llm = llm_client
def rewrite_query(self, query: str, context: Dict = None) -> List[str]:
"""
生成多个改写版本:
1. 原始查询
2. 扩展查询(添加上下文)
3. 同义改写
4. 分解子查询
"""
prompt = f"""
你是一个查询改写专家。请对用户查询进行改写,生成3-5个改写版本。
用户查询:{query}
上下文信息:{context or "无"}
改写要求:
1. 补全缺失的实体和时间信息
2. 使用同义词和相关术语
3. 如果是复合查询,分解为子查询
4. 保持原意,增强检索效果
请以JSON格式返回改写结果:
{{
"rewrites": ["改写1", "改写2", "改写3", ...]
}}
"""
response = self.llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
import json
result = json.loads(response.choices[0].message.content)
return result["rewrites"]
def expand_with_synonyms(self, query: str) -> List[str]:
"""使用同义词扩展查询"""
# 简化版:使用词典
synonym_dict = {
"营收": ["收入", "销售额", "营业额"],
"配置": ["设置", "参数", "选项"],
"bug": ["错误", "问题", "故障", "缺陷"],
}
expanded = [query]
for word, synonyms in synonym_dict.items():
if word in query:
for syn in synonyms:
expanded.append(query.replace(word, syn))
return list(set(expanded)) # 去重
def generate_hypothetical_document(self, query: str) -> str:
"""
HyDE技术:生成假设性文档
用LLM生成一个"假想的答案文档",然后检索与之相似的真实文档
"""
prompt = f"""
假设你需要回答这个问题:{query}
请生成一段详细的回答(200-300字),即使你不确定答案的准确性。
这段回答将用于检索相关文档。
回答:
"""
response = self.llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=400
)
return response.choices[0].message.content
多查询融合检索
class FusionRetriever:
"""融合检索器 - 合并多个查询的检索结果"""
def __init__(
self,
collection: Collection,
embedding_service: EmbeddingService
):
self.collection = collection
self.embedding_service = embedding_service
def reciprocal_rank_fusion(
self,
all_results: List[List[Dict]],
k: int = 60
) -> List[Dict]:
"""
Reciprocal Rank Fusion (RRF)算法
融合多个检索结果列表
"""
# 收集所有文档ID及其分数
doc_scores = {}
for results in all_results:
for rank, doc in enumerate(results):
doc_id = doc['id']
# RRF公式:1 / (k + rank)
score = 1.0 / (k + rank + 1)
if doc_id in doc_scores:
doc_scores[doc_id]['score'] += score
else:
doc_scores[doc_id] = {
'id': doc_id,
'score': score,
'doc': doc
}
# 按分数排序
ranked = sorted(
doc_scores.values(),
key=lambda x: x['score'],
reverse=True
)
return [item['doc'] for item in ranked]
def retrieve_with_fusion(
self,
query: str,
top_k: int = 10,
rewrite_count: int = 3
) -> List[Dict]:
"""使用查询改写+融合检索"""
# 1. 生成改写查询
rewriter = QueryRewriter(llm_client)
queries = [query] + rewriter.rewrite_query(query)[:rewrite_count]
# 2. 对每个查询进行检索
all_results = []
for q in queries:
# 向量化查询
q_embedding = self.embedding_service.encode_queries([q])
# 检索
results = self.collection.search(
data=[q_embedding["dense"][0]],
anns_field="embedding",
param={"metric_type": "IP", "params": {"ef": 64}},
limit=top_k * 2 # 每个查询多检索一些
)
all_results.append(results[0])
# 3. 融合结果
fused_results = self.reciprocal_rank_fusion(all_results, k=60)
return fused_results[:top_k]
坑6:忽视了重排序Rerank
问题描述
向量检索的Top-10结果中,真正相关的可能只在第5-10位,直接送给LLM会引入噪声。
Rerank模型对比
| 模型 | MRR@10 | 延迟(ms) | 适用场景 |
|---|---|---|---|
| 无Rerank | 0.62 | 0 | 基准 |
| BM25 | 0.68 | 5 | 轻量级 |
| BGE-Reranker-Base | 0.78 | 45 | 平衡 |
| BGE-Reranker-Large | 0.84 | 85 | 推荐 |
| Cross-Encoder | 0.82 | 120 | 高精度 |
选择:BGE-Reranker-Large
Reranker实现
from FlagEmbedding import FlagReranker
from typing import List, Tuple
class RerankerService:
def __init__(self, model_path: str = "/models/bge-reranker-large"):
self.reranker = FlagReranker(
model_path,
use_fp16=True,
device="cuda"
)
def rerank(
self,
query: str,
documents: List[str],
top_k: int = 5
) -> List[Tuple[int, float]]:
"""
重排序文档
返回:[(doc_idx, score), ...]
"""
# 构造query-doc对
pairs = [[query, doc] for doc in documents]
# 计算相关性分数
scores = self.reranker.compute_score(
pairs,
batch_size=32,
max_length=1024
)
# 排序并返回Top-K
scored_docs = list(enumerate(scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
return scored_docs[:top_k]
# 集成到检索流程
class RAGRetriever:
def __init__(
self,
collection: Collection,
embedding_service: EmbeddingService,
reranker: RerankerService
):
self.collection = collection
self.embedding_service = embedding_service
self.reranker = reranker
def retrieve(
self,
query: str,
top_k: int = 5,
rerank_top_n: int = 20
) -> List[Dict]:
"""
两阶段检索:
1. 向量检索召回Top-N (N=20)
2. Rerank精排得到Top-K (K=5)
"""
# 第一阶段:向量检索
q_embedding = self.embedding_service.encode_queries([query])
vector_results = self.collection.search(
data=[q_embedding["dense"][0]],
anns_field="embedding",
param={"metric_type": "IP", "params": {"ef": 64}},
limit=rerank_top_n,
output_fields=["text", "metadata"]
)[0]
# 提取文档文本
documents = [hit.entity.get("text") for hit in vector_results]
# 第二阶段:Rerank
reranked_indices = self.reranker.rerank(
query=query,
documents=documents,
top_k=top_k
)
# 返回重排序后的结果
final_results = []
for idx, score in reranked_indices:
hit = vector_results[idx]
final_results.append({
"id": hit.id,
"text": hit.entity.get("text"),
"metadata": hit.entity.get("metadata"),
"vector_score": hit.distance,
"rerank_score": score,
"final_rank": len(final_results) + 1
})
return final_results
效果提升:
- MRR@5:0.62 → 0.84(+35%)
- 生成质量主观评分:3.2 → 4.5(5分制)
坑7:上下文窗口管理不当
问题描述
- 上下文过长:塞入10个chunk(8000 tokens),LLM成本高、延迟大
- 上下文过短:只给2个chunk,信息不足导致回答质量差
动态上下文选择
class ContextBuilder:
"""智能上下文构建器"""
def __init__(
self,
max_tokens: int = 4000, # 最大上下文长度
min_chunks: int = 2, # 最少chunk数
max_chunks: int = 8 # 最多chunk数
):
self.max_tokens = max_tokens
self.min_chunks = min_chunks
self.max_chunks = max_chunks
def build_context(
self,
query: str,
chunks: List[Dict],
tokenizer
) -> str:
"""
智能构建上下文:
1. 按相关性分数排序
2. 贪心选择,直到达到token上限
3. 添加chunk间的连接信息
"""
selected_chunks = []
total_tokens = 0
# 预留给query和系统提示的token数
query_tokens = len(tokenizer.encode(query))
system_tokens = 200 # 估算
available_tokens = self.max_tokens - query_tokens - system_tokens
for chunk in chunks[:self.max_chunks]:
chunk_text = chunk['text']
chunk_tokens = len(tokenizer.encode(chunk_text))
# 检查是否超出限制
if total_tokens + chunk_tokens > available_tokens:
if len(selected_chunks) >= self.min_chunks:
break # 已有足够chunk,停止添加
else:
# 截断chunk以满足最小数量要求
remaining_tokens = available_tokens - total_tokens
chunk_text = self.truncate_text(
chunk_text,
remaining_tokens,
tokenizer
)
chunk_tokens = len(tokenizer.encode(chunk_text))
selected_chunks.append({
**chunk,
'text': chunk_text
})
total_tokens += chunk_tokens
# 格式化上下文
context = self.format_context(selected_chunks)
return context
def format_context(self, chunks: List[Dict]) -> str:
"""格式化上下文,添加元数据"""
formatted_parts = []
for i, chunk in enumerate(chunks):
metadata = chunk.get('metadata', {})
source = metadata.get('source', '未知来源')
chunk_id = metadata.get('chunk_id', i)
part = f"""
[文档片段 {i+1}/{len(chunks)}]
来源:{source}
片段ID:{chunk_id}
相关性得分:{chunk.get('rerank_score', 0):.3f}
内容:
{chunk['text']}
---
"""
formatted_parts.append(part)
return "\n".join(formatted_parts)
def truncate_text(
self,
text: str,
max_tokens: int,
tokenizer
) -> str:
"""截断文本至指定token数"""
tokens = tokenizer.encode(text)
if len(tokens) <= max_tokens:
return text
# 截断并解码
truncated_tokens = tokens[:max_tokens]
truncated_text = tokenizer.decode(truncated_tokens)
# 尝试在句子边界截断
sentences = truncated_text.split('。')
if len(sentences) > 1:
return '。'.join(sentences[:-1]) + '。'
return truncated_text
坑8:缺少混合检索
问题描述
纯向量检索对精确匹配(如产品型号、代码函数名)效果差。
例如查询"sklearn.metrics.accuracy_score",向量检索可能返回无关的metrics函数。
混合检索方案
class HybridRetriever:
"""混合检索:向量检索 + BM25"""
def __init__(
self,
collection: Collection,
embedding_service: EmbeddingService
):
self.collection = collection
self.embedding_service = embedding_service
# 初始化BM25
from rank_bm25 import BM25Okapi
import jieba
# 加载文档语料(需要预先构建)
self.documents = self._load_documents()
self.tokenized_docs = [
list(jieba.cut(doc['text']))
for doc in self.documents
]
self.bm25 = BM25Okapi(self.tokenized_docs)
def hybrid_search(
self,
query: str,
top_k: int = 10,
alpha: float = 0.5 # 向量检索权重
) -> List[Dict]:
"""
混合检索
alpha=1.0: 纯向量检索
alpha=0.0: 纯BM25
alpha=0.5: 平衡
"""
# 1. 向量检索
q_embedding = self.embedding_service.encode_queries([query])
vector_results = self.collection.search(
data=[q_embedding["dense"][0]],
anns_field="embedding",
param={"metric_type": "IP", "params": {"ef": 64}},
limit=top_k * 2
)[0]
# 归一化向量分数 (0-1)
vector_scores = {}
if len(vector_results) > 0:
max_score = max(hit.distance for hit in vector_results)
min_score = min(hit.distance for hit in vector_results)
score_range = max_score - min_score if max_score != min_score else 1
for hit in vector_results:
normalized = (hit.distance - min_score) / score_range
vector_scores[hit.id] = normalized
# 2. BM25检索
import jieba
query_tokens = list(jieba.cut(query))
bm25_scores_raw = self.bm25.get_scores(query_tokens)
# 归一化BM25分数
max_bm25 = max(bm25_scores_raw) if len(bm25_scores_raw) > 0 else 1
bm25_scores = {
doc['id']: score / max_bm25
for doc, score in zip(self.documents, bm25_scores_raw)
}
# 3. 融合分数
all_doc_ids = set(vector_scores.keys()) | set(bm25_scores.keys())
hybrid_scores = {}
for doc_id in all_doc_ids:
vec_score = vector_scores.get(doc_id, 0)
bm25_score = bm25_scores.get(doc_id, 0)
# 加权融合
hybrid_scores[doc_id] = (
alpha * vec_score + (1 - alpha) * bm25_score
)
# 4. 排序并返回Top-K
ranked_ids = sorted(
hybrid_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]
# 获取文档详情
results = []
for doc_id, score in ranked_ids:
doc = next((d for d in self.documents if d['id'] == doc_id), None)
if doc:
results.append({
**doc,
'hybrid_score': score,
'vector_score': vector_scores.get(doc_id, 0),
'bm25_score': bm25_scores.get(doc_id, 0)
})
return results
效果对比(精确匹配查询):
| 查询类型 | 纯向量 | 纯BM25 | 混合(α=0.5) |
|---|---|---|---|
| 精确匹配 | 68% | 92% | 95% |
| 语义查询 | 88% | 71% | 91% |
| 综合 | 78% | 81% | 93% |
坑9:没有处理多模态文档
问题描述
企业文档包含大量表格、图片、公式,纯文本解析丢失了关键信息。
多模态解析方案
from unstructured.partition.auto import partition
from unstructured.staging.base import elements_to_json
from PIL import Image
import pytesseract
class MultimodalDocumentParser:
"""多模态文档解析器"""
def parse_document(self, file_path: str) -> List[Dict]:
"""解析文档(支持PDF、Word、PPT等)"""
# 使用Unstructured库自动解析
elements = partition(filename=file_path)
chunks = []
for element in elements:
element_type = type(element).__name__
chunk = {
"type": element_type,
"text": str(element),
"metadata": element.metadata.to_dict()
}
# 特殊处理不同元素类型
if element_type == "Table":
chunk["table_data"] = self.parse_table(element)
elif element_type == "Image":
chunk["image_description"] = self.describe_image(element)
chunks.append(chunk)
return chunks
def parse_table(self, table_element) -> Dict:
"""解析表格为结构化数据"""
# 转换为HTML
html = table_element.metadata.text_as_html
# 使用pandas解析
import pandas as pd
df = pd.read_html(html)[0]
return {
"html": html,
"dataframe": df.to_dict(),
"markdown": df.to_markdown(), # 用于LLM
"summary": self.summarize_table(df)
}
def summarize_table(self, df) -> str:
"""生成表格摘要"""
rows, cols = df.shape
columns = ", ".join(df.columns.tolist())
summary = f"表格包含{rows}行{cols}列。列名:{columns}。"
# 提取关键统计信息
numeric_cols = df.select_dtypes(include=['number']).columns
if len(numeric_cols) > 0:
stats = df[numeric_cols].describe().to_string()
summary += f"\n数值列统计:\n{stats}"
return summary
def describe_image(self, image_element) -> str:
"""
描述图片内容(使用OCR或多模态LLM)
"""
image_path = image_element.metadata.filename
# 方案1:OCR提取文字
image = Image.open(image_path)
ocr_text = pytesseract.image_to_string(image, lang='chi_sim+eng')
# 方案2:使用GPT-4V描述图片(生产环境推荐)
# image_description = self.describe_with_gpt4v(image_path)
return f"图片OCR文字:{ocr_text}"
def describe_with_gpt4v(self, image_path: str) -> str:
"""使用GPT-4V描述图片"""
import base64
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode()
response = openai.chat.completions.create(
model="gpt-4-vision-preview",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": "请详细描述这张图片的内容,包括文字、图表、关键信息。"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
}
}
]
}
],
max_tokens=500
)
return response.choices[0].message.content
坑10:缺少评估和监控
问题描述
没有评估指标,无法量化RAG系统的效果和发现问题。
评估框架
from dataclasses import dataclass
from typing import List, Dict
import numpy as np
@dataclass
class RAGEvaluation:
"""RAG评估指标"""
recall_at_k: float # 召回率@K
mrr: float # Mean Reciprocal Rank
ndcg: float # Normalized Discounted Cumulative Gain
precision_at_k: float # 精确率@K
answer_quality: float # 答案质量(人工评分)
latency_p50: float # 延迟P50
latency_p99: float # 延迟P99
class RAGEvaluator:
"""RAG系统评估器"""
def evaluate_retrieval(
self,
queries: List[str],
ground_truth: List[List[str]], # 每个query的相关文档ID列表
retrieval_results: List[List[str]], # 检索结果ID列表
k: int = 5
) -> Dict[str, float]:
"""评估检索效果"""
recalls = []
mrrs = []
ndcgs = []
precisions = []
for gt, results in zip(ground_truth, retrieval_results):
gt_set = set(gt[:k])
results_set = set(results[:k])
# Recall@K
recall = len(gt_set & results_set) / len(gt_set) if gt_set else 0
recalls.append(recall)
# MRR
for i, doc_id in enumerate(results):
if doc_id in gt_set:
mrrs.append(1.0 / (i + 1))
break
else:
mrrs.append(0)
# NDCG@K
ndcg = self.calculate_ndcg(results, gt, k)
ndcgs.append(ndcg)
# Precision@K
precision = len(gt_set & results_set) / k
precisions.append(precision)
return {
"recall@k": np.mean(recalls),
"mrr": np.mean(mrrs),
"ndcg@k": np.mean(ndcgs),
"precision@k": np.mean(precisions)
}
def calculate_ndcg(
self,
results: List[str],
ground_truth: List[str],
k: int
) -> float:
"""计算NDCG@K"""
dcg = 0.0
for i, doc_id in enumerate(results[:k]):
if doc_id in ground_truth:
# 相关性得分:在ground_truth中的排名越靠前,得分越高
relevance = len(ground_truth) - ground_truth.index(doc_id)
dcg += relevance / np.log2(i + 2)
# Ideal DCG
idcg = 0.0
for i in range(min(k, len(ground_truth))):
relevance = len(ground_truth) - i
idcg += relevance / np.log2(i + 2)
return dcg / idcg if idcg > 0 else 0
def evaluate_end_to_end(
self,
test_cases: List[Dict] # {"query": str, "expected_answer": str}
) -> Dict[str, float]:
"""端到端评估(使用LLM评判)"""
scores = []
for case in test_cases:
query = case["query"]
expected = case["expected_answer"]
# 运行RAG系统
actual = self.rag_system.query(query)
# 使用LLM评分
score = self.llm_judge(query, expected, actual)
scores.append(score)
return {
"average_quality_score": np.mean(scores),
"pass_rate": sum(1 for s in scores if s >= 0.7) / len(scores)
}
def llm_judge(
self,
query: str,
expected: str,
actual: str
) -> float:
"""使用LLM评判答案质量"""
prompt = f"""
请评估以下答案的质量(0-1分):
问题:{query}
标准答案:{expected}
实际答案:{actual}
评分标准:
- 0.9-1.0: 完全正确,信息完整
- 0.7-0.9: 基本正确,细节略有不足
- 0.5-0.7: 部分正确
- 0.3-0.5: 包含错误信息
- 0.0-0.3: 完全错误
请只返回数字分数。
"""
response = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.1
)
try:
score = float(response.choices[0].message.content.strip())
return max(0.0, min(1.0, score))
except:
return 0.5 # 默认分数
生产监控
# Prometheus监控指标
from prometheus_client import Counter, Histogram, Gauge
# 请求计数
rag_requests_total = Counter(
'rag_requests_total',
'Total RAG requests',
['status']
)
# 延迟分布
rag_latency = Histogram(
'rag_latency_seconds',
'RAG request latency',
['stage'], # retrieval, rerank, generation
buckets=[0.1, 0.5, 1.0, 2.0, 5.0]
)
# 检索质量
rag_retrieval_quality = Gauge(
'rag_retrieval_quality',
'Retrieval quality score',
['metric'] # recall, mrr, ndcg
)
# 向量库大小
vector_db_size = Gauge(
'vector_db_total_vectors',
'Total vectors in database'
)
# 使用示例
def query_with_monitoring(query: str):
rag_requests_total.labels(status='started').inc()
try:
# 检索阶段
with rag_latency.labels(stage='retrieval').time():
docs = retriever.retrieve(query)
# Rerank阶段
with rag_latency.labels(stage='rerank').time():
ranked_docs = reranker.rerank(query, docs)
# 生成阶段
with rag_latency.labels(stage='generation').time():
answer = llm.generate(query, ranked_docs)
rag_requests_total.labels(status='success').inc()
return answer
except Exception as e:
rag_requests_total.labels(status='error').inc()
raise
三、生产级架构总结
3.1 完整架构图
┌─────────────────────────────────────────────────────────────────┐
│ 用户查询入口 │
└───────────────────────────┬─────────────────────────────────────┘
│
┌───────▼────────┐
│ Query Rewriter │
│ (查询改写) │
└───────┬────────┘
│
┌───────────────┼───────────────┐
│ │ │
┌───────▼─────┐ ┌─────▼──────┐ ┌────▼─────┐
│ Vector Search│ │ BM25 Search │ │ HyDE │
└───────┬─────┘ └─────┬──────┘ └────┬─────┘
│ │ │
└───────────────┼───────────────┘
┌───────▼────────┐
│ Fusion (RRF) │
└───────┬────────┘
┌───────▼────────┐
│ Reranker │
│ (重排序) │
└───────┬────────┘
┌───────▼────────┐
│ Context Builder │
└───────┬────────┘
┌───────▼────────┐
│ LLM Generator │
└───────┬────────┘
┌───────▼────────┐
│ 返回结果 │
└────────────────┘
3.2 关键指标总结
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| 召回率@5 | 45% | 88% | +96% |
| MRR | 0.52 | 0.84 | +62% |
| 端到端延迟(P99) | 3.5s | 1.8s | -49% |
| 答案质量评分 | 3.2 | 4.5 | +41% |
| 月成本 | $5000 | $800 | -84% |