RAG系统
1. RAG架构
1.1 什么是RAG
RAG (Retrieval-Augmented Generation) 是一种结合检索和生成的技术,通过检索外部知识增强大模型的生成能力。
核心优势:
- 减少幻觉(Hallucination)
- 知识可更新(无需重新训练)
- 可解释性强(可追溯信息来源)
- 成本低(相比微调)
vs 微调:
| 特性 | RAG | 微调 |
|---|---|---|
| 知识更新 | 实时 | 需重新训练 |
| 成本 | 低 | 高 |
| 领域适应 | 快速 | 慢 |
| 信息来源 | 可追溯 | 黑盒 |
| 适用场景 | 知识密集型任务 | 任务特定优化 |
1.2 完整流程
┌─────────────┐
│ 用户问题 │
└──────┬──────┘
│
┌─────────────┐
│ Query重写/扩展│
└──────┬──────┘
│
┌─────────────┐ ┌──────────────┐
│ Embedding │────│ 向量数据库 │
│ 模型 │ │ (检索) │
└──────┬──────┘ └──────┬───────┘
│ │
└────────┬───────────┘
┌─────────────┐
│ Rerank │
│ (重排序) │
└──────┬──────┘
┌─────────────┐
│ Prompt构建 │
│ (上下文+问题)│
└──────┬──────┘
┌─────────────┐
│ LLM生成 │
└──────┬──────┘
┌─────────────┐
│ 最终答案 │
└─────────────┘
1.3 核心组件
1. 文档处理
from langchain.document_loaders import PDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 加载文档
loader = PDFLoader("document.pdf")
documents = loader.load()
# 分块
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # 块大小
chunk_overlap=200, # 重叠大小
separators=["\n\n", "\n", "。", ".", " ", ""]
)
chunks = text_splitter.split_documents(documents)
2. 向量化
from langchain.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
vectors = embeddings.embed_documents([chunk.page_content for chunk in chunks])
3. 存储与检索
from langchain.vectorstores import Chroma
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db"
)
# 检索
query = "什么是RAG?"
docs = vectorstore.similarity_search(query, k=5)
4. 生成答案
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
llm = OpenAI(temperature=0)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever()
)
answer = qa_chain.run(query)
2. Embedding模型
2.1 主流模型对比
| 模型 | 维度 | 最大长度 | 性能(MTEB) | 速度 | 成本 |
|---|---|---|---|---|---|
| text-embedding-ada-002 | 1536 | 8191 | 60.99 | 快 | API调用 |
| text-embedding-3-small | 1536 | 8191 | 62.26 | 快 | API调用 |
| text-embedding-3-large | 3072 | 8191 | 64.59 | 中 | API调用 |
| sentence-t5-xxl | 768 | 512 | 63.57 | 慢 | 免费 |
| bge-large-zh | 1024 | 512 | 64.53 | 中 | 免费 |
| m3e-large | 1024 | 512 | 63.12 | 中 | 免费 |
| gte-large | 1024 | 512 | 63.78 | 中 | 免费 |
2.2 OpenAI Embeddings
使用示例:
import openai
def get_embedding(text, model="text-embedding-3-small"):
text = text.replace("\n", " ")
response = openai.Embedding.create(
input=[text],
model=model
)
return response['data'][0]['embedding']
# 批量处理
texts = ["文本1", "文本2", "文本3"]
embeddings = openai.Embedding.create(
input=texts,
model="text-embedding-3-small"
)
for i, emb in enumerate(embeddings['data']):
print(f"文本{i+1}向量维度: {len(emb['embedding'])}")
成本:
- text-embedding-3-small: $0.02 / 1M tokens
- text-embedding-3-large: $0.13 / 1M tokens
- text-embedding-ada-002: $0.10 / 1M tokens
2.3 开源模型
sentence-transformers:
from sentence_transformers import SentenceTransformer
# 英文模型
model = SentenceTransformer('all-MiniLM-L6-v2')
# 中文模型
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
# 编码
sentences = ["这是第一句话", "这是第二句话"]
embeddings = model.encode(sentences)
# 计算相似度
from sentence_transformers.util import cos_sim
similarity = cos_sim(embeddings[0], embeddings[1])
print(f"相似度: {similarity.item():.4f}")
BGE (BAAI General Embedding):
from FlagEmbedding import FlagModel
# 中文模型
model = FlagModel('BAAI/bge-large-zh-v1.5', use_fp16=True)
# 编码文档
docs = ["文档1", "文档2"]
doc_embeddings = model.encode(docs)
# 编码查询(加指令前缀)
queries = ["为这个句子生成表示"]
query_embeddings = model.encode_queries(queries)
# 计算相似度
scores = query_embeddings @ doc_embeddings.T
M3E (Moka Massive Mixed Embedding):
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('moka-ai/m3e-large')
# 支持中英文混合
texts = [
"Machine Learning is awesome",
"机器学习很棒",
"Deep Learning深度学习"
]
embeddings = model.encode(texts)
2.4 选择建议
场景1: 生产环境,预算充足 → OpenAI text-embedding-3-small (性价比最高)
场景2: 中文为主,私有部署 → bge-large-zh 或 m3e-large
场景3: 资源受限 → all-MiniLM-L6-v2 (维度384,速度快)
场景4: 多语言 → text-embedding-3-large 或 paraphrase-multilingual
3. 向量数据库
3.1 数据库对比
| 数据库 | 类型 | 索引方法 | 性能 | 易用性 | 适用场景 |
|---|---|---|---|---|---|
| FAISS | 库 | IVF, HNSW | 极高 | 中 | 单机,高性能 |
| Chroma | 嵌入式 | HNSW | 高 | 高 | 原型开发 |
| Milvus | 分布式 | IVF, HNSW | 高 | 中 | 生产环境 |
| Pinecone | 云服务 | 专有 | 高 | 极高 | 快速上线 |
| Weaviate | 分布式 | HNSW | 高 | 高 | 混合搜索 |
| Qdrant | 分布式 | HNSW | 高 | 高 | Rust实现,高效 |
3.2 FAISS
Meta开源的高性能向量检索库。
基础使用:
import faiss
import numpy as np
# 创建索引
dimension = 128
index = faiss.IndexFlatL2(dimension) # L2距离
# 添加向量
vectors = np.random.random((1000, dimension)).astype('float32')
index.add(vectors)
# 搜索
query = np.random.random((1, dimension)).astype('float32')
k = 5
distances, indices = index.search(query, k)
print(f"最近的{k}个向量索引: {indices[0]}")
print(f"距离: {distances[0]}")
高级索引:
# IVF (Inverted File Index)
# 适合大规模数据
nlist = 100 # 聚类中心数
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
# 训练索引
index.train(vectors)
index.add(vectors)
# 搜索时设置探测的聚类数
index.nprobe = 10
distances, indices = index.search(query, k)
# HNSW (Hierarchical Navigable Small World)
# 高召回率
M = 32 # 连接数
index = faiss.IndexHNSWFlat(dimension, M)
index.add(vectors)
# Product Quantization (压缩)
m = 8 # 子向量数
index = faiss.IndexIVFPQ(quantizer, dimension, nlist, m, 8)
index.train(vectors)
index.add(vectors)
性能对比:
import time
def benchmark_index(index_type, vectors, queries):
start = time.time()
index_type.add(vectors)
add_time = time.time() - start
start = time.time()
distances, indices = index_type.search(queries, 10)
search_time = time.time() - start
return {
'add_time': add_time,
'search_time': search_time,
'qps': len(queries) / search_time
}
# 对比不同索引
results = {
'Flat': benchmark_index(faiss.IndexFlatL2(dim), vecs, queries),
'IVF': benchmark_index(ivf_index, vecs, queries),
'HNSW': benchmark_index(hnsw_index, vecs, queries)
}
3.3 Chroma
轻量级嵌入式向量数据库。
import chromadb
from chromadb.config import Settings
# 创建客户端
client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./chroma_data"
))
# 创建集合
collection = client.create_collection(
name="my_collection",
metadata={"hnsw:space": "cosine"}
)
# 添加文档
collection.add(
documents=["文档1内容", "文档2内容", "文档3内容"],
metadatas=[{"source": "doc1"}, {"source": "doc2"}, {"source": "doc3"}],
ids=["id1", "id2", "id3"]
)
# 查询
results = collection.query(
query_texts=["查询文本"],
n_results=5,
where={"source": "doc1"} # 过滤条件
)
print(results['documents'])
print(results['distances'])
3.4 Milvus
生产级分布式向量数据库。
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
# 连接
connections.connect(host="localhost", port="19530")
# 定义Schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535)
]
schema = CollectionSchema(fields, description="文档集合")
# 创建集合
collection = Collection(name="documents", schema=schema)
# 创建索引
index_params = {
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {"nlist": 1024}
}
collection.create_index(field_name="embedding", index_params=index_params)
# 插入数据
import numpy as np
entities = [
np.random.random((10000, 128)).tolist(), # embeddings
[f"文档{i}" for i in range(10000)] # texts
]
collection.insert(entities)
# 加载到内存
collection.load()
# 搜索
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
data=[np.random.random(128).tolist()],
anns_field="embedding",
param=search_params,
limit=10,
output_fields=["text"]
)
for hits in results:
for hit in hits:
print(f"ID: {hit.id}, 距离: {hit.distance}, 文本: {hit.entity.get('text')}")
3.5 Pinecone
托管式向量数据库服务。
import pinecone
# 初始化
pinecone.init(api_key="your-api-key", environment="us-west1-gcp")
# 创建索引
pinecone.create_index("my-index", dimension=1536, metric="cosine")
# 连接索引
index = pinecone.Index("my-index")
# 插入向量
index.upsert(vectors=[
("id1", [0.1] * 1536, {"text": "文档1"}),
("id2", [0.2] * 1536, {"text": "文档2"}),
])
# 查询
results = index.query(
vector=[0.15] * 1536,
top_k=5,
include_metadata=True
)
for match in results['matches']:
print(f"ID: {match['id']}, Score: {match['score']}, Text: {match['metadata']['text']}")
4. 检索策略
4.1 向量检索
基础相似度计算:
1. 余弦相似度(Cosine Similarity)
def cosine_similarity(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
# 范围: [-1, 1], 1表示完全相同
2. 欧氏距离(L2 Distance)
def euclidean_distance(a, b):
return np.linalg.norm(a - b)
# 范围: [0, ∞), 0表示完全相同
3. 点积(Dot Product)
def dot_product(a, b):
return np.dot(a, b)
# 向量归一化后等价于余弦相似度
近似最近邻(ANN)算法:
HNSW (Hierarchical Navigable Small World):
分层图结构:
- 顶层: 稀疏长距离连接(快速定位区域)
- 底层: 密集短距离连接(精确搜索)
优点: 高召回率,高QPS
缺点: 内存占用大,不支持删除
IVF (Inverted File Index):
1. 聚类: 将向量聚成nlist个簇
2. 建立倒排表: cluster_id → vector_ids
3. 搜索: 找最近的nprobe个簇,只在这些簇中搜索
优点: 内存友好,支持大规模
缺点: 召回率略低于HNSW
4.2 混合检索
结合稀疏检索(BM25)和密集检索(向量)。
BM25算法:
from rank_bm25 import BM25Okapi
# 建立BM25索引
corpus = [doc.split() for doc in documents]
bm25 = BM25Okapi(corpus)
# 检索
query = "机器学习"
scores = bm25.get_scores(query.split())
top_docs = np.argsort(scores)[-5:][::-1]
混合检索:
class HybridRetriever:
def __init__(self, vector_store, bm25_index, alpha=0.5):
self.vector_store = vector_store
self.bm25 = bm25_index
self.alpha = alpha # 向量权重
def search(self, query, k=10):
# 向量检索
vector_results = self.vector_store.similarity_search_with_score(query, k=k*2)
vector_scores = {doc.metadata['id']: score for doc, score in vector_results}
# BM25检索
bm25_scores = self.bm25.get_scores(query.split())
# 归一化
vector_scores_norm = self._normalize(vector_scores)
bm25_scores_norm = self._normalize(dict(enumerate(bm25_scores)))
# 融合
combined_scores = {}
all_ids = set(vector_scores_norm.keys()) | set(bm25_scores_norm.keys())
for doc_id in all_ids:
v_score = vector_scores_norm.get(doc_id, 0)
b_score = bm25_scores_norm.get(doc_id, 0)
combined_scores[doc_id] = self.alpha * v_score + (1 - self.alpha) * b_score
# 返回Top-K
top_ids = sorted(combined_scores, key=combined_scores.get, reverse=True)[:k]
return [documents[i] for i in top_ids]
def _normalize(self, scores):
if not scores:
return {}
min_score = min(scores.values())
max_score = max(scores.values())
if max_score == min_score:
return {k: 1.0 for k in scores}
return {k: (v - min_score) / (max_score - min_score) for k, v in scores.items()}
4.3 Rerank
对初筛结果进行精排。
Cross-Encoder重排:
from sentence_transformers import CrossEncoder
# 加载Rerank模型
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
def rerank(query, documents, top_k=5):
# 构造query-doc对
pairs = [[query, doc] for doc in documents]
# 计算相关性分数
scores = reranker.predict(pairs)
# 排序
ranked_indices = np.argsort(scores)[::-1][:top_k]
return [documents[i] for i in ranked_indices], scores[ranked_indices]
# 使用
initial_docs = vector_store.similarity_search(query, k=20)
reranked_docs, scores = rerank(query, initial_docs, top_k=5)
Cohere Rerank API:
import cohere
co = cohere.Client('your-api-key')
response = co.rerank(
query="什么是机器学习?",
documents=["文档1", "文档2", "文档3"],
top_n=3,
model="rerank-multilingual-v2.0"
)
for result in response:
print(f"索引: {result.index}, 分数: {result.relevance_score}")
5. 优化技巧
5.1 分块策略
固定长度分块:
def fixed_length_split(text, chunk_size=500, overlap=50):
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start += chunk_size - overlap
return chunks
句子分块:
from langchain.text_splitter import SentenceTransformersTokenTextSplitter
splitter = SentenceTransformersTokenTextSplitter(
chunk_size=256, # token数
chunk_overlap=50,
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
chunks = splitter.split_text(text)
语义分块:
def semantic_chunking(text, embeddings_model, threshold=0.5):
sentences = sent_tokenize(text)
embeddings = embeddings_model.encode(sentences)
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
# 计算与当前块的相似度
similarity = cosine_similarity(
embeddings[i].reshape(1, -1),
np.mean([embeddings[j] for j in range(len(current_chunk))], axis=0).reshape(1, -1)
)[0][0]
if similarity > threshold:
current_chunk.append(sentences[i])
else:
chunks.append(' '.join(current_chunk))
current_chunk = [sentences[i]]
chunks.append(' '.join(current_chunk))
return chunks
最佳实践:
- chunk_size: 500-1000字符(中文), 200-500 tokens(英文)
- overlap: 10-20%的chunk_size
- 保持句子完整性
- 添加metadata(来源、页码、标题)
5.2 索引优化
1. 预过滤(Metadata Filtering)
# 先用metadata过滤,再向量检索
results = collection.query(
query_embeddings=[query_vector],
n_results=10,
where={"year": {"$gte": 2020}, "category": "AI"}
)
2. 分层检索
# 第一层: 粗粒度检索(章节级别)
chapter_results = chapter_index.search(query, k=3)
# 第二层: 细粒度检索(段落级别)
paragraph_results = []
for chapter in chapter_results:
chapter_paragraphs = paragraph_index.search(
query,
k=5,
filter={"chapter_id": chapter.id}
)
paragraph_results.extend(chapter_paragraphs)
3. 索引压缩
# FAISS PQ量化
m = 8 # 子向量数
nbits = 8 # 每个子向量的位数
index = faiss.IndexIVFPQ(quantizer, dimension, nlist, m, nbits)
# 压缩比: dimension / (m * nbits)
# 例如: 128维 → 8个8bit = 64位 (压缩至原来的1/16)
5.3 Prompt工程
基础模板:
def build_prompt(query, contexts):
context_str = "\n\n".join([f"[{i+1}] {ctx}" for i, ctx in enumerate(contexts)])
prompt = f"""
基于以下参考信息回答问题。如果参考信息中没有答案,请说"根据提供的信息无法回答"。
参考信息:
{context_str}
问题: {query}
回答:
"""
return prompt
高级技巧:
1. 上下文压缩
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# 使用LLM提取相关部分
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=vectorstore.as_retriever()
)
compressed_docs = compression_retriever.get_relevant_documents(query)
2. 引用标注
prompt = f"""
基于以下参考信息回答问题,并在答案中标注引用来源(使用[1]、[2]等)。
参考信息:
[1] {context1}
[2] {context2}
[3] {context3}
问题: {query}
回答(请标注引用):
"""
3. 多跳推理
def multi_hop_rag(query, max_hops=3):
contexts = []
current_query = query
for hop in range(max_hops):
# 检索
docs = vectorstore.similarity_search(current_query, k=3)
contexts.extend(docs)
# 生成子问题
if hop < max_hops - 1:
prompt = f"""
基于以下信息:
{docs}
原问题: {query}
为了更好地回答原问题,下一步应该问什么?请生成一个更具体的子问题。
子问题:
"""
current_query = llm.predict(prompt)
# 最终回答
return generate_answer(query, contexts)
6. 完整实现
6.1 LangChain实现
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
# 1. 加载文档
loader = DirectoryLoader('./documents', glob="**/*.txt")
documents = loader.load()
# 2. 分块
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
texts = text_splitter.split_documents(documents)
# 3. 创建向量库
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(texts, embeddings, persist_directory="./chroma_db")
# 4. 创建检索链
llm = OpenAI(temperature=0)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
return_source_documents=True
)
# 5. 查询
query = "什么是向量数据库?"
result = qa_chain({"query": query})
print(f"答案: {result['result']}")
print(f"\n来源文档:")
for doc in result['source_documents']:
print(f"- {doc.metadata['source']}: {doc.page_content[:100]}...")
6.2 LlamaIndex实现
from llama_index import VectorStoreIndex, SimpleDirectoryReader, ServiceContext
from llama_index.llms import OpenAI
from llama_index.embeddings import OpenAIEmbedding
# 1. 加载文档
documents = SimpleDirectoryReader('./documents').load_data()
# 2. 配置服务
llm = OpenAI(model="gpt-3.5-turbo", temperature=0)
embed_model = OpenAIEmbedding(model="text-embedding-3-small")
service_context = ServiceContext.from_defaults(llm=llm, embed_model=embed_model)
# 3. 创建索引
index = VectorStoreIndex.from_documents(documents, service_context=service_context)
# 4. 查询
query_engine = index.as_query_engine(similarity_top_k=5)
response = query_engine.query("什么是向量数据库?")
print(f"答案: {response}")
print(f"\n来源:")
for node in response.source_nodes:
print(f"- 相似度: {node.score:.4f}")
print(f" 内容: {node.text[:100]}...")
7. 高频面试题
面试题1: RAG中的分块策略如何影响效果?
答案:
影响因素:
1. 块大小(Chunk Size)
太小(< 200字符):
✗ 上下文不完整
✗ 检索结果碎片化
精确性高
适中(500-1000字符):
平衡完整性和精确性
大多数场景最佳
太大(> 2000字符):
上下文丰富
✗ 噪声多
✗ 相关性降低
2. 重叠(Overlap)
# 无重叠
chunks = ["ABC", "DEF", "GHI"]关键信息可能被切断
# 有重叠(推荐20%)
chunks = ["ABC", "BCD", "CDE", "DEF"]
# 好处: 保证信息连续性
实验对比:
def evaluate_chunking_strategy(docs, queries, chunk_sizes, overlaps):
results = []
for chunk_size in chunk_sizes:
for overlap in overlaps:
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap
)
chunks = splitter.split_documents(docs)
# 建立索引并评估
vectorstore = Chroma.from_documents(chunks, embeddings)
qa_chain = RetrievalQA.from_chain_type(llm, retriever=vectorstore.as_retriever())
# 计算指标
score = evaluate(qa_chain, queries)
results.append({
'chunk_size': chunk_size,
'overlap': overlap,
'num_chunks': len(chunks),
'score': score
})
return pd.DataFrame(results)
# 运行实验
results = evaluate_chunking_strategy(
docs=documents,
queries=test_questions,
chunk_sizes=[300, 500, 800, 1000, 1500],
overlaps=[0, 50, 100, 150, 200]
)
# 可视化
import seaborn as sns
pivot = results.pivot(index='chunk_size', columns='overlap', values='score')
sns.heatmap(pivot, annot=True, fmt='.3f')
最佳实践:
- 技术文档: 800-1000字符, 150-200重叠
- 对话/问答: 300-500字符, 50-100重叠
- 长篇文章: 1000-1500字符, 200-300重叠
面试题2: 如何评估RAG系统的效果?
答案:
评估维度:
1. 检索质量(Retrieval Quality)
命中率(Hit Rate):
def hit_rate(relevant_docs, retrieved_docs):
"""前k个结果中是否包含相关文档"""
return any(doc in relevant_docs for doc in retrieved_docs)
# 在测试集上计算
hit_rates = [hit_rate(rel, retr) for rel, retr in zip(relevant_list, retrieved_list)]
avg_hit_rate = np.mean(hit_rates)
MRR (Mean Reciprocal Rank):
def mrr(relevant_docs, retrieved_docs):
"""相关文档的排名倒数"""
for i, doc in enumerate(retrieved_docs):
if doc in relevant_docs:
return 1.0 / (i + 1)
return 0.0
avg_mrr = np.mean([mrr(rel, retr) for rel, retr in zip(relevant_list, retrieved_list)])
NDCG (Normalized Discounted Cumulative Gain):
from sklearn.metrics import ndcg_score
# relevance: [3, 2, 3, 0, 1, 2] (相关性分数)
# 排名越靠前,权重越大
ndcg = ndcg_score([relevance_true], [relevance_pred])
2. 生成质量(Generation Quality)
BLEU (机器翻译评估):
from nltk.translate.bleu_score import sentence_bleu
reference = [['this', 'is', 'a', 'test']]
candidate = ['this', 'is', 'test']
score = sentence_bleu(reference, candidate)
ROUGE (摘要评估):
from rouge import Rouge
rouge = Rouge()
scores = rouge.get_scores(predicted_answer, reference_answer)
print(f"ROUGE-L: {scores[0]['rouge-l']['f']:.4f}")
BERTScore (语义相似度):
from bert_score import score
P, R, F1 = score([predicted], [reference], lang='zh', model_type='bert-base-chinese')
print(f"BERTScore F1: {F1.mean():.4f}")
3. 端到端评估
Faithfulness (忠实度):
def check_faithfulness(answer, context):
"""答案是否基于上下文,无幻觉"""
prompt = f"""
上下文: {context}
答案: {answer}
问题: 答案中的信息是否都能在上下文中找到依据?回答"是"或"否",并说明理由。
"""
result = llm.predict(prompt)
return "是" in result
faithfulness = np.mean([check_faithfulness(ans, ctx) for ans, ctx in zip(answers, contexts)])
Relevance (相关性):
def check_relevance(question, answer):
"""答案是否回答了问题"""
prompt = f"""
问题: {question}
答案: {answer}
这个答案是否直接回答了问题?打分1-5 (5=完全回答)。
"""
score = int(llm.predict(prompt))
return score / 5.0
完整评估框架:
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_recall, context_precision
# 准备数据
eval_data = {
'question': questions,
'answer': answers,
'contexts': contexts,
'ground_truths': ground_truths
}
# 评估
result = evaluate(
dataset=eval_data,
metrics=[faithfulness, answer_relevancy, context_recall, context_precision]
)
print(result)
"""
{'faithfulness': 0.82,
'answer_relevancy': 0.91,
'context_recall': 0.73,
'context_precision': 0.85}
"""
面试题3: 向量检索的召回率不高怎么办?
答案:
问题诊断:
def diagnose_retrieval(query, relevant_docs, k=10):
# 检索
results = vectorstore.similarity_search_with_score(query, k=k)
# 分析
print(f"查询: {query}")
print(f"相关文档数: {len(relevant_docs)}")
found = 0
for i, (doc, score) in enumerate(results):
is_relevant = doc.metadata['id'] in relevant_docs
found += is_relevant
print(f"{i+1}. {'' if is_relevant else '✗'} 分数:{score:.4f} ID:{doc.metadata['id']}")
recall = found / len(relevant_docs)
print(f"\n召回率: {recall:.2%}")
优化方案:
1. Query扩展
def expand_query(query, llm):
prompt = f"""
原始问题: {query}
请生成3个语义相似但表述不同的问题,用于改进检索效果:
1.
2.
3.
"""
expanded = llm.predict(prompt).split('\n')
# 检索多个query并合并
all_docs = []
for q in [query] + expanded:
docs = vectorstore.similarity_search(q, k=5)
all_docs.extend(docs)
# 去重
unique_docs = list({doc.metadata['id']: doc for doc in all_docs}.values())
return unique_docs
2. HyDE (Hypothetical Document Embeddings)
def hyde_retrieval(query, llm, vectorstore):
# 生成假设性文档
prompt = f"""
问题: {query}
请写一段能够回答这个问题的文档内容(即使你不确定具体答案):
"""
hypothetical_doc = llm.predict(prompt)
# 用假设文档检索
results = vectorstore.similarity_search(hypothetical_doc, k=10)
return results
3. 多路召回
def multi_path_retrieval(query, k=10):
results = []
# 路径1: 向量检索
vector_results = vectorstore.similarity_search(query, k=k)
results.extend([('vector', doc, score) for doc, score in vector_results])
# 路径2: BM25
bm25_results = bm25.get_top_n(query, documents, n=k)
results.extend([('bm25', doc, score) for doc, score in bm25_results])
# 路径3: 关键词匹配
keyword_results = keyword_search(query, k=k)
results.extend([('keyword', doc, score) for doc, score in keyword_results])
# 融合(Reciprocal Rank Fusion)
return reciprocal_rank_fusion(results, k=k)
def reciprocal_rank_fusion(results, k=60):
scores = {}
for source, doc, rank in results:
doc_id = doc.metadata['id']
scores[doc_id] = scores.get(doc_id, 0) + 1.0 / (k + rank)
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
4. 调整索引参数
# FAISS HNSW
index = faiss.IndexHNSWFlat(dimension, M=64) # 增大M提高召回
index.hnsw.efConstruction = 200 # 训练时的搜索深度
index.hnsw.efSearch = 100 # 查询时的搜索深度
# IVF
index.nprobe = 50 # 探测更多聚类中心
面试题4: 如何处理多模态RAG(图文混合)?
答案:
架构:
文档
├── 文本 → Text Embedding → 向量库1
└── 图片 → Vision Embedding → 向量库2
查询
├── 文本查询 → 检索向量库1和向量库2
└── 图片查询 → 检索向量库2
融合结果 → LLM生成
实现:
from transformers import CLIPModel, CLIPProcessor
import torch
# 1. 多模态编码器(CLIP)
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
def encode_text(text):
inputs = processor(text=[text], return_tensors="pt", padding=True)
with torch.no_grad():
embeddings = model.get_text_features(**inputs)
return embeddings.numpy()
def encode_image(image):
inputs = processor(images=image, return_tensors="pt")
with torch.no_grad():
embeddings = model.get_image_features(**inputs)
return embeddings.numpy()
# 2. 建立多模态索引
from PIL import Image
text_embeddings = []
image_embeddings = []
metadata = []
for doc in documents:
# 文本
text_emb = encode_text(doc['text'])
text_embeddings.append(text_emb)
# 图片
if 'image_path' in doc:
image = Image.open(doc['image_path'])
image_emb = encode_image(image)
image_embeddings.append(image_emb)
metadata.append({'type': 'text' if 'image_path' not in doc else 'multimodal',
'id': doc['id']})
# 3. 统一向量空间存储
combined_embeddings = text_embeddings + image_embeddings
vectorstore.add(combined_embeddings, metadata)
# 4. 多模态检索
def multimodal_search(query, query_type='text', k=5):
if query_type == 'text':
query_emb = encode_text(query)
else: # image
query_emb = encode_image(query)
results = vectorstore.search(query_emb, k=k)
return results
# 5. 多模态生成(GPT-4V)
def generate_answer(query, text_docs, image_docs):
prompt = {
"text": f"问题: {query}\n\n文本上下文:\n" + "\n".join(text_docs),
"images": image_docs
}
response = openai.ChatCompletion.create(
model="gpt-4-vision-preview",
messages=[
{"role": "user", "content": [
{"type": "text", "text": prompt["text"]},
*[{"type": "image_url", "image_url": {"url": img}} for img in prompt["images"]]
]}
]
)
return response.choices[0].message.content
应用场景:
- 产品文档(说明书+图片)
- 医疗诊断(病历+影像)
- 教育(课本+图表)
- 电商(商品描述+图片)
面试题5: RAG vs Fine-tuning,如何选择?
答案:
决策矩阵:
| 因素 | RAG | Fine-tuning | 推荐 |
|---|---|---|---|
| 知识更新频率 | 高 | 低 | RAG |
| 需要引用来源 | 是 | 否 | RAG |
| 任务特定优化 | 否 | 是 | Fine-tuning |
| 预算有限 | 是 | 否 | RAG |
| 私有化部署 | 都可 | 都可 | - |
| 响应延迟要求 | 宽松 | 严格 | Fine-tuning |
| 数据量 | 少(<1000) | 多(>1000) | RAG / Fine-tuning |
组合方案:
# RAG + Fine-tuning
# 1. 在领域数据上微调base model
fine_tuned_model = fine_tune(base_model, domain_data)
# 2. 使用微调模型做RAG
qa_chain = RetrievalQA.from_chain_type(
llm=fine_tuned_model, # 使用微调后的模型
retriever=vectorstore.as_retriever()
)
# 优势: 兼具领域知识(微调)和事实准确性(RAG)
选择流程:
def choose_strategy(task_requirements):
if task_requirements['knowledge_updates'] == 'frequent':
return 'RAG'
if task_requirements['need_citation']:
return 'RAG'
if task_requirements['task_specific'] and task_requirements['budget'] > 'medium':
if task_requirements['data_size'] > 1000:
return 'Fine-tuning'
else:
return 'RAG + PEFT (LoRA)'
if task_requirements['latency'] == 'critical':
return 'Fine-tuning'
return 'RAG' # 默认推荐RAG