HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 技术面试完全指南

    • 技术面试完全指南
    • 8年面试官告诉你:90%的简历在第一轮就被刷掉了
    • 刷了500道LeetCode,终于明白大厂算法面试到底考什么
    • 高频算法题精讲-双指针与滑动窗口
    • 03-高频算法题精讲-二分查找与排序
    • 04-高频算法题精讲-树与递归
    • 05-高频算法题精讲-图与拓扑排序
    • 06-高频算法题精讲-动态规划
    • Go面试必问:一道GMP问题,干掉90%的候选人
    • 08-数据库面试高频题
    • 09-分布式系统面试题
    • 10-Kubernetes与云原生面试题
    • 11-系统设计面试方法论
    • 前端面试高频题
    • AI 与机器学习面试题
    • 行为面试与软技能

RAG系统

1. RAG架构

1.1 什么是RAG

RAG (Retrieval-Augmented Generation) 是一种结合检索和生成的技术,通过检索外部知识增强大模型的生成能力。

核心优势:

  • 减少幻觉(Hallucination)
  • 知识可更新(无需重新训练)
  • 可解释性强(可追溯信息来源)
  • 成本低(相比微调)

vs 微调:

特性RAG微调
知识更新实时需重新训练
成本低高
领域适应快速慢
信息来源可追溯黑盒
适用场景知识密集型任务任务特定优化

1.2 完整流程

┌─────────────┐
│   用户问题   │
└──────┬──────┘
       │
       
┌─────────────┐
│ Query重写/扩展│
└──────┬──────┘
       │
       
┌─────────────┐     ┌──────────────┐
│  Embedding   │────│  向量数据库   │
│   模型       │     │  (检索)      │
└──────┬──────┘     └──────┬───────┘
       │                    │
       └────────┬───────────┘
                
         ┌─────────────┐
         │  Rerank     │
         │  (重排序)    │
         └──────┬──────┘
                
         ┌─────────────┐
         │ Prompt构建  │
         │ (上下文+问题)│
         └──────┬──────┘
                
         ┌─────────────┐
         │   LLM生成   │
         └──────┬──────┘
                
         ┌─────────────┐
         │   最终答案   │
         └─────────────┘

1.3 核心组件

1. 文档处理

from langchain.document_loaders import PDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

# 加载文档
loader = PDFLoader("document.pdf")
documents = loader.load()

# 分块
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,        # 块大小
    chunk_overlap=200,      # 重叠大小
    separators=["\n\n", "\n", "。", ".", " ", ""]
)
chunks = text_splitter.split_documents(documents)

2. 向量化

from langchain.embeddings import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
vectors = embeddings.embed_documents([chunk.page_content for chunk in chunks])

3. 存储与检索

from langchain.vectorstores import Chroma

vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./chroma_db"
)

# 检索
query = "什么是RAG?"
docs = vectorstore.similarity_search(query, k=5)

4. 生成答案

from langchain.llms import OpenAI
from langchain.chains import RetrievalQA

llm = OpenAI(temperature=0)
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever()
)

answer = qa_chain.run(query)

2. Embedding模型

2.1 主流模型对比

模型维度最大长度性能(MTEB)速度成本
text-embedding-ada-0021536819160.99快API调用
text-embedding-3-small1536819162.26快API调用
text-embedding-3-large3072819164.59中API调用
sentence-t5-xxl76851263.57慢免费
bge-large-zh102451264.53中免费
m3e-large102451263.12中免费
gte-large102451263.78中免费

2.2 OpenAI Embeddings

使用示例:

import openai

def get_embedding(text, model="text-embedding-3-small"):
    text = text.replace("\n", " ")
    response = openai.Embedding.create(
        input=[text],
        model=model
    )
    return response['data'][0]['embedding']

# 批量处理
texts = ["文本1", "文本2", "文本3"]
embeddings = openai.Embedding.create(
    input=texts,
    model="text-embedding-3-small"
)

for i, emb in enumerate(embeddings['data']):
    print(f"文本{i+1}向量维度: {len(emb['embedding'])}")

成本:

  • text-embedding-3-small: $0.02 / 1M tokens
  • text-embedding-3-large: $0.13 / 1M tokens
  • text-embedding-ada-002: $0.10 / 1M tokens

2.3 开源模型

sentence-transformers:

from sentence_transformers import SentenceTransformer

# 英文模型
model = SentenceTransformer('all-MiniLM-L6-v2')

# 中文模型
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')

# 编码
sentences = ["这是第一句话", "这是第二句话"]
embeddings = model.encode(sentences)

# 计算相似度
from sentence_transformers.util import cos_sim

similarity = cos_sim(embeddings[0], embeddings[1])
print(f"相似度: {similarity.item():.4f}")

BGE (BAAI General Embedding):

from FlagEmbedding import FlagModel

# 中文模型
model = FlagModel('BAAI/bge-large-zh-v1.5', use_fp16=True)

# 编码文档
docs = ["文档1", "文档2"]
doc_embeddings = model.encode(docs)

# 编码查询(加指令前缀)
queries = ["为这个句子生成表示"]
query_embeddings = model.encode_queries(queries)

# 计算相似度
scores = query_embeddings @ doc_embeddings.T

M3E (Moka Massive Mixed Embedding):

from sentence_transformers import SentenceTransformer

model = SentenceTransformer('moka-ai/m3e-large')

# 支持中英文混合
texts = [
    "Machine Learning is awesome",
    "机器学习很棒",
    "Deep Learning深度学习"
]
embeddings = model.encode(texts)

2.4 选择建议

场景1: 生产环境,预算充足 → OpenAI text-embedding-3-small (性价比最高)

场景2: 中文为主,私有部署 → bge-large-zh 或 m3e-large

场景3: 资源受限 → all-MiniLM-L6-v2 (维度384,速度快)

场景4: 多语言 → text-embedding-3-large 或 paraphrase-multilingual

3. 向量数据库

3.1 数据库对比

数据库类型索引方法性能易用性适用场景
FAISS库IVF, HNSW极高中单机,高性能
Chroma嵌入式HNSW高高原型开发
Milvus分布式IVF, HNSW高中生产环境
Pinecone云服务专有高极高快速上线
Weaviate分布式HNSW高高混合搜索
Qdrant分布式HNSW高高Rust实现,高效

3.2 FAISS

Meta开源的高性能向量检索库。

基础使用:

import faiss
import numpy as np

# 创建索引
dimension = 128
index = faiss.IndexFlatL2(dimension)  # L2距离

# 添加向量
vectors = np.random.random((1000, dimension)).astype('float32')
index.add(vectors)

# 搜索
query = np.random.random((1, dimension)).astype('float32')
k = 5
distances, indices = index.search(query, k)

print(f"最近的{k}个向量索引: {indices[0]}")
print(f"距离: {distances[0]}")

高级索引:

# IVF (Inverted File Index)
# 适合大规模数据
nlist = 100  # 聚类中心数
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)

# 训练索引
index.train(vectors)
index.add(vectors)

# 搜索时设置探测的聚类数
index.nprobe = 10
distances, indices = index.search(query, k)

# HNSW (Hierarchical Navigable Small World)
# 高召回率
M = 32  # 连接数
index = faiss.IndexHNSWFlat(dimension, M)
index.add(vectors)

# Product Quantization (压缩)
m = 8  # 子向量数
index = faiss.IndexIVFPQ(quantizer, dimension, nlist, m, 8)
index.train(vectors)
index.add(vectors)

性能对比:

import time

def benchmark_index(index_type, vectors, queries):
    start = time.time()
    index_type.add(vectors)
    add_time = time.time() - start

    start = time.time()
    distances, indices = index_type.search(queries, 10)
    search_time = time.time() - start

    return {
        'add_time': add_time,
        'search_time': search_time,
        'qps': len(queries) / search_time
    }

# 对比不同索引
results = {
    'Flat': benchmark_index(faiss.IndexFlatL2(dim), vecs, queries),
    'IVF': benchmark_index(ivf_index, vecs, queries),
    'HNSW': benchmark_index(hnsw_index, vecs, queries)
}

3.3 Chroma

轻量级嵌入式向量数据库。

import chromadb
from chromadb.config import Settings

# 创建客户端
client = chromadb.Client(Settings(
    chroma_db_impl="duckdb+parquet",
    persist_directory="./chroma_data"
))

# 创建集合
collection = client.create_collection(
    name="my_collection",
    metadata={"hnsw:space": "cosine"}
)

# 添加文档
collection.add(
    documents=["文档1内容", "文档2内容", "文档3内容"],
    metadatas=[{"source": "doc1"}, {"source": "doc2"}, {"source": "doc3"}],
    ids=["id1", "id2", "id3"]
)

# 查询
results = collection.query(
    query_texts=["查询文本"],
    n_results=5,
    where={"source": "doc1"}  # 过滤条件
)

print(results['documents'])
print(results['distances'])

3.4 Milvus

生产级分布式向量数据库。

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility

# 连接
connections.connect(host="localhost", port="19530")

# 定义Schema
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535)
]
schema = CollectionSchema(fields, description="文档集合")

# 创建集合
collection = Collection(name="documents", schema=schema)

# 创建索引
index_params = {
    "metric_type": "L2",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 1024}
}
collection.create_index(field_name="embedding", index_params=index_params)

# 插入数据
import numpy as np
entities = [
    np.random.random((10000, 128)).tolist(),  # embeddings
    [f"文档{i}" for i in range(10000)]         # texts
]
collection.insert(entities)

# 加载到内存
collection.load()

# 搜索
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
    data=[np.random.random(128).tolist()],
    anns_field="embedding",
    param=search_params,
    limit=10,
    output_fields=["text"]
)

for hits in results:
    for hit in hits:
        print(f"ID: {hit.id}, 距离: {hit.distance}, 文本: {hit.entity.get('text')}")

3.5 Pinecone

托管式向量数据库服务。

import pinecone

# 初始化
pinecone.init(api_key="your-api-key", environment="us-west1-gcp")

# 创建索引
pinecone.create_index("my-index", dimension=1536, metric="cosine")

# 连接索引
index = pinecone.Index("my-index")

# 插入向量
index.upsert(vectors=[
    ("id1", [0.1] * 1536, {"text": "文档1"}),
    ("id2", [0.2] * 1536, {"text": "文档2"}),
])

# 查询
results = index.query(
    vector=[0.15] * 1536,
    top_k=5,
    include_metadata=True
)

for match in results['matches']:
    print(f"ID: {match['id']}, Score: {match['score']}, Text: {match['metadata']['text']}")

4. 检索策略

4.1 向量检索

基础相似度计算:

1. 余弦相似度(Cosine Similarity)

def cosine_similarity(a, b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

# 范围: [-1, 1], 1表示完全相同

2. 欧氏距离(L2 Distance)

def euclidean_distance(a, b):
    return np.linalg.norm(a - b)

# 范围: [0, ∞), 0表示完全相同

3. 点积(Dot Product)

def dot_product(a, b):
    return np.dot(a, b)

# 向量归一化后等价于余弦相似度

近似最近邻(ANN)算法:

HNSW (Hierarchical Navigable Small World):

分层图结构:
- 顶层: 稀疏长距离连接(快速定位区域)
- 底层: 密集短距离连接(精确搜索)

优点: 高召回率,高QPS
缺点: 内存占用大,不支持删除

IVF (Inverted File Index):

1. 聚类: 将向量聚成nlist个簇
2. 建立倒排表: cluster_id → vector_ids
3. 搜索: 找最近的nprobe个簇,只在这些簇中搜索

优点: 内存友好,支持大规模
缺点: 召回率略低于HNSW

4.2 混合检索

结合稀疏检索(BM25)和密集检索(向量)。

BM25算法:

from rank_bm25 import BM25Okapi

# 建立BM25索引
corpus = [doc.split() for doc in documents]
bm25 = BM25Okapi(corpus)

# 检索
query = "机器学习"
scores = bm25.get_scores(query.split())
top_docs = np.argsort(scores)[-5:][::-1]

混合检索:

class HybridRetriever:
    def __init__(self, vector_store, bm25_index, alpha=0.5):
        self.vector_store = vector_store
        self.bm25 = bm25_index
        self.alpha = alpha  # 向量权重

    def search(self, query, k=10):
        # 向量检索
        vector_results = self.vector_store.similarity_search_with_score(query, k=k*2)
        vector_scores = {doc.metadata['id']: score for doc, score in vector_results}

        # BM25检索
        bm25_scores = self.bm25.get_scores(query.split())

        # 归一化
        vector_scores_norm = self._normalize(vector_scores)
        bm25_scores_norm = self._normalize(dict(enumerate(bm25_scores)))

        # 融合
        combined_scores = {}
        all_ids = set(vector_scores_norm.keys()) | set(bm25_scores_norm.keys())

        for doc_id in all_ids:
            v_score = vector_scores_norm.get(doc_id, 0)
            b_score = bm25_scores_norm.get(doc_id, 0)
            combined_scores[doc_id] = self.alpha * v_score + (1 - self.alpha) * b_score

        # 返回Top-K
        top_ids = sorted(combined_scores, key=combined_scores.get, reverse=True)[:k]
        return [documents[i] for i in top_ids]

    def _normalize(self, scores):
        if not scores:
            return {}
        min_score = min(scores.values())
        max_score = max(scores.values())
        if max_score == min_score:
            return {k: 1.0 for k in scores}
        return {k: (v - min_score) / (max_score - min_score) for k, v in scores.items()}

4.3 Rerank

对初筛结果进行精排。

Cross-Encoder重排:

from sentence_transformers import CrossEncoder

# 加载Rerank模型
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

def rerank(query, documents, top_k=5):
    # 构造query-doc对
    pairs = [[query, doc] for doc in documents]

    # 计算相关性分数
    scores = reranker.predict(pairs)

    # 排序
    ranked_indices = np.argsort(scores)[::-1][:top_k]
    return [documents[i] for i in ranked_indices], scores[ranked_indices]

# 使用
initial_docs = vector_store.similarity_search(query, k=20)
reranked_docs, scores = rerank(query, initial_docs, top_k=5)

Cohere Rerank API:

import cohere

co = cohere.Client('your-api-key')

response = co.rerank(
    query="什么是机器学习?",
    documents=["文档1", "文档2", "文档3"],
    top_n=3,
    model="rerank-multilingual-v2.0"
)

for result in response:
    print(f"索引: {result.index}, 分数: {result.relevance_score}")

5. 优化技巧

5.1 分块策略

固定长度分块:

def fixed_length_split(text, chunk_size=500, overlap=50):
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start += chunk_size - overlap
    return chunks

句子分块:

from langchain.text_splitter import SentenceTransformersTokenTextSplitter

splitter = SentenceTransformersTokenTextSplitter(
    chunk_size=256,       # token数
    chunk_overlap=50,
    model_name="sentence-transformers/all-MiniLM-L6-v2"
)
chunks = splitter.split_text(text)

语义分块:

def semantic_chunking(text, embeddings_model, threshold=0.5):
    sentences = sent_tokenize(text)
    embeddings = embeddings_model.encode(sentences)

    chunks = []
    current_chunk = [sentences[0]]

    for i in range(1, len(sentences)):
        # 计算与当前块的相似度
        similarity = cosine_similarity(
            embeddings[i].reshape(1, -1),
            np.mean([embeddings[j] for j in range(len(current_chunk))], axis=0).reshape(1, -1)
        )[0][0]

        if similarity > threshold:
            current_chunk.append(sentences[i])
        else:
            chunks.append(' '.join(current_chunk))
            current_chunk = [sentences[i]]

    chunks.append(' '.join(current_chunk))
    return chunks

最佳实践:

  • chunk_size: 500-1000字符(中文), 200-500 tokens(英文)
  • overlap: 10-20%的chunk_size
  • 保持句子完整性
  • 添加metadata(来源、页码、标题)

5.2 索引优化

1. 预过滤(Metadata Filtering)

# 先用metadata过滤,再向量检索
results = collection.query(
    query_embeddings=[query_vector],
    n_results=10,
    where={"year": {"$gte": 2020}, "category": "AI"}
)

2. 分层检索

# 第一层: 粗粒度检索(章节级别)
chapter_results = chapter_index.search(query, k=3)

# 第二层: 细粒度检索(段落级别)
paragraph_results = []
for chapter in chapter_results:
    chapter_paragraphs = paragraph_index.search(
        query,
        k=5,
        filter={"chapter_id": chapter.id}
    )
    paragraph_results.extend(chapter_paragraphs)

3. 索引压缩

# FAISS PQ量化
m = 8  # 子向量数
nbits = 8  # 每个子向量的位数
index = faiss.IndexIVFPQ(quantizer, dimension, nlist, m, nbits)

# 压缩比: dimension / (m * nbits)
# 例如: 128维 → 8个8bit = 64位 (压缩至原来的1/16)

5.3 Prompt工程

基础模板:

def build_prompt(query, contexts):
    context_str = "\n\n".join([f"[{i+1}] {ctx}" for i, ctx in enumerate(contexts)])

    prompt = f"""
基于以下参考信息回答问题。如果参考信息中没有答案,请说"根据提供的信息无法回答"。

参考信息:
{context_str}

问题: {query}

回答:
"""
    return prompt

高级技巧:

1. 上下文压缩

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

# 使用LLM提取相关部分
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=vectorstore.as_retriever()
)

compressed_docs = compression_retriever.get_relevant_documents(query)

2. 引用标注

prompt = f"""
基于以下参考信息回答问题,并在答案中标注引用来源(使用[1]、[2]等)。

参考信息:
[1] {context1}
[2] {context2}
[3] {context3}

问题: {query}

回答(请标注引用):
"""

3. 多跳推理

def multi_hop_rag(query, max_hops=3):
    contexts = []
    current_query = query

    for hop in range(max_hops):
        # 检索
        docs = vectorstore.similarity_search(current_query, k=3)
        contexts.extend(docs)

        # 生成子问题
        if hop < max_hops - 1:
            prompt = f"""
基于以下信息:
{docs}

原问题: {query}

为了更好地回答原问题,下一步应该问什么?请生成一个更具体的子问题。

子问题:
"""
            current_query = llm.predict(prompt)

    # 最终回答
    return generate_answer(query, contexts)

6. 完整实现

6.1 LangChain实现

from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma

# 1. 加载文档
loader = DirectoryLoader('./documents', glob="**/*.txt")
documents = loader.load()

# 2. 分块
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
texts = text_splitter.split_documents(documents)

# 3. 创建向量库
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(texts, embeddings, persist_directory="./chroma_db")

# 4. 创建检索链
llm = OpenAI(temperature=0)
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
    return_source_documents=True
)

# 5. 查询
query = "什么是向量数据库?"
result = qa_chain({"query": query})

print(f"答案: {result['result']}")
print(f"\n来源文档:")
for doc in result['source_documents']:
    print(f"- {doc.metadata['source']}: {doc.page_content[:100]}...")

6.2 LlamaIndex实现

from llama_index import VectorStoreIndex, SimpleDirectoryReader, ServiceContext
from llama_index.llms import OpenAI
from llama_index.embeddings import OpenAIEmbedding

# 1. 加载文档
documents = SimpleDirectoryReader('./documents').load_data()

# 2. 配置服务
llm = OpenAI(model="gpt-3.5-turbo", temperature=0)
embed_model = OpenAIEmbedding(model="text-embedding-3-small")
service_context = ServiceContext.from_defaults(llm=llm, embed_model=embed_model)

# 3. 创建索引
index = VectorStoreIndex.from_documents(documents, service_context=service_context)

# 4. 查询
query_engine = index.as_query_engine(similarity_top_k=5)
response = query_engine.query("什么是向量数据库?")

print(f"答案: {response}")
print(f"\n来源:")
for node in response.source_nodes:
    print(f"- 相似度: {node.score:.4f}")
    print(f"  内容: {node.text[:100]}...")

7. 高频面试题

面试题1: RAG中的分块策略如何影响效果?

答案:

影响因素:

1. 块大小(Chunk Size)

太小(< 200字符):
✗ 上下文不完整
✗ 检索结果碎片化
 精确性高

适中(500-1000字符):
 平衡完整性和精确性
 大多数场景最佳

太大(> 2000字符):
 上下文丰富
✗ 噪声多
✗ 相关性降低

2. 重叠(Overlap)

# 无重叠
chunks = ["ABC", "DEF", "GHI"]关键信息可能被切断

# 有重叠(推荐20%)
chunks = ["ABC", "BCD", "CDE", "DEF"]
# 好处: 保证信息连续性

实验对比:

def evaluate_chunking_strategy(docs, queries, chunk_sizes, overlaps):
    results = []

    for chunk_size in chunk_sizes:
        for overlap in overlaps:
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=overlap
            )
            chunks = splitter.split_documents(docs)

            # 建立索引并评估
            vectorstore = Chroma.from_documents(chunks, embeddings)
            qa_chain = RetrievalQA.from_chain_type(llm, retriever=vectorstore.as_retriever())

            # 计算指标
            score = evaluate(qa_chain, queries)
            results.append({
                'chunk_size': chunk_size,
                'overlap': overlap,
                'num_chunks': len(chunks),
                'score': score
            })

    return pd.DataFrame(results)

# 运行实验
results = evaluate_chunking_strategy(
    docs=documents,
    queries=test_questions,
    chunk_sizes=[300, 500, 800, 1000, 1500],
    overlaps=[0, 50, 100, 150, 200]
)

# 可视化
import seaborn as sns
pivot = results.pivot(index='chunk_size', columns='overlap', values='score')
sns.heatmap(pivot, annot=True, fmt='.3f')

最佳实践:

  • 技术文档: 800-1000字符, 150-200重叠
  • 对话/问答: 300-500字符, 50-100重叠
  • 长篇文章: 1000-1500字符, 200-300重叠

面试题2: 如何评估RAG系统的效果?

答案:

评估维度:

1. 检索质量(Retrieval Quality)

命中率(Hit Rate):

def hit_rate(relevant_docs, retrieved_docs):
    """前k个结果中是否包含相关文档"""
    return any(doc in relevant_docs for doc in retrieved_docs)

# 在测试集上计算
hit_rates = [hit_rate(rel, retr) for rel, retr in zip(relevant_list, retrieved_list)]
avg_hit_rate = np.mean(hit_rates)

MRR (Mean Reciprocal Rank):

def mrr(relevant_docs, retrieved_docs):
    """相关文档的排名倒数"""
    for i, doc in enumerate(retrieved_docs):
        if doc in relevant_docs:
            return 1.0 / (i + 1)
    return 0.0

avg_mrr = np.mean([mrr(rel, retr) for rel, retr in zip(relevant_list, retrieved_list)])

NDCG (Normalized Discounted Cumulative Gain):

from sklearn.metrics import ndcg_score

# relevance: [3, 2, 3, 0, 1, 2] (相关性分数)
# 排名越靠前,权重越大
ndcg = ndcg_score([relevance_true], [relevance_pred])

2. 生成质量(Generation Quality)

BLEU (机器翻译评估):

from nltk.translate.bleu_score import sentence_bleu

reference = [['this', 'is', 'a', 'test']]
candidate = ['this', 'is', 'test']
score = sentence_bleu(reference, candidate)

ROUGE (摘要评估):

from rouge import Rouge

rouge = Rouge()
scores = rouge.get_scores(predicted_answer, reference_answer)
print(f"ROUGE-L: {scores[0]['rouge-l']['f']:.4f}")

BERTScore (语义相似度):

from bert_score import score

P, R, F1 = score([predicted], [reference], lang='zh', model_type='bert-base-chinese')
print(f"BERTScore F1: {F1.mean():.4f}")

3. 端到端评估

Faithfulness (忠实度):

def check_faithfulness(answer, context):
    """答案是否基于上下文,无幻觉"""
    prompt = f"""
上下文: {context}
答案: {answer}

问题: 答案中的信息是否都能在上下文中找到依据?回答"是"或"否",并说明理由。
"""
    result = llm.predict(prompt)
    return "是" in result

faithfulness = np.mean([check_faithfulness(ans, ctx) for ans, ctx in zip(answers, contexts)])

Relevance (相关性):

def check_relevance(question, answer):
    """答案是否回答了问题"""
    prompt = f"""
问题: {question}
答案: {answer}

这个答案是否直接回答了问题?打分1-5 (5=完全回答)。
"""
    score = int(llm.predict(prompt))
    return score / 5.0

完整评估框架:

from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_recall, context_precision

# 准备数据
eval_data = {
    'question': questions,
    'answer': answers,
    'contexts': contexts,
    'ground_truths': ground_truths
}

# 评估
result = evaluate(
    dataset=eval_data,
    metrics=[faithfulness, answer_relevancy, context_recall, context_precision]
)

print(result)
"""
{'faithfulness': 0.82,
 'answer_relevancy': 0.91,
 'context_recall': 0.73,
 'context_precision': 0.85}
"""

面试题3: 向量检索的召回率不高怎么办?

答案:

问题诊断:

def diagnose_retrieval(query, relevant_docs, k=10):
    # 检索
    results = vectorstore.similarity_search_with_score(query, k=k)

    # 分析
    print(f"查询: {query}")
    print(f"相关文档数: {len(relevant_docs)}")

    found = 0
    for i, (doc, score) in enumerate(results):
        is_relevant = doc.metadata['id'] in relevant_docs
        found += is_relevant
        print(f"{i+1}. {'' if is_relevant else '✗'} 分数:{score:.4f} ID:{doc.metadata['id']}")

    recall = found / len(relevant_docs)
    print(f"\n召回率: {recall:.2%}")

优化方案:

1. Query扩展

def expand_query(query, llm):
    prompt = f"""
原始问题: {query}

请生成3个语义相似但表述不同的问题,用于改进检索效果:
1.
2.
3.
"""
    expanded = llm.predict(prompt).split('\n')

    # 检索多个query并合并
    all_docs = []
    for q in [query] + expanded:
        docs = vectorstore.similarity_search(q, k=5)
        all_docs.extend(docs)

    # 去重
    unique_docs = list({doc.metadata['id']: doc for doc in all_docs}.values())
    return unique_docs

2. HyDE (Hypothetical Document Embeddings)

def hyde_retrieval(query, llm, vectorstore):
    # 生成假设性文档
    prompt = f"""
问题: {query}

请写一段能够回答这个问题的文档内容(即使你不确定具体答案):
"""
    hypothetical_doc = llm.predict(prompt)

    # 用假设文档检索
    results = vectorstore.similarity_search(hypothetical_doc, k=10)
    return results

3. 多路召回

def multi_path_retrieval(query, k=10):
    results = []

    # 路径1: 向量检索
    vector_results = vectorstore.similarity_search(query, k=k)
    results.extend([('vector', doc, score) for doc, score in vector_results])

    # 路径2: BM25
    bm25_results = bm25.get_top_n(query, documents, n=k)
    results.extend([('bm25', doc, score) for doc, score in bm25_results])

    # 路径3: 关键词匹配
    keyword_results = keyword_search(query, k=k)
    results.extend([('keyword', doc, score) for doc, score in keyword_results])

    # 融合(Reciprocal Rank Fusion)
    return reciprocal_rank_fusion(results, k=k)

def reciprocal_rank_fusion(results, k=60):
    scores = {}
    for source, doc, rank in results:
        doc_id = doc.metadata['id']
        scores[doc_id] = scores.get(doc_id, 0) + 1.0 / (k + rank)

    return sorted(scores.items(), key=lambda x: x[1], reverse=True)

4. 调整索引参数

# FAISS HNSW
index = faiss.IndexHNSWFlat(dimension, M=64)  # 增大M提高召回
index.hnsw.efConstruction = 200  # 训练时的搜索深度
index.hnsw.efSearch = 100         # 查询时的搜索深度

# IVF
index.nprobe = 50  # 探测更多聚类中心

面试题4: 如何处理多模态RAG(图文混合)?

答案:

架构:

文档
├── 文本 → Text Embedding → 向量库1
└── 图片 → Vision Embedding → 向量库2

查询
├── 文本查询 → 检索向量库1和向量库2
└── 图片查询 → 检索向量库2

融合结果 → LLM生成

实现:

from transformers import CLIPModel, CLIPProcessor
import torch

# 1. 多模态编码器(CLIP)
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

def encode_text(text):
    inputs = processor(text=[text], return_tensors="pt", padding=True)
    with torch.no_grad():
        embeddings = model.get_text_features(**inputs)
    return embeddings.numpy()

def encode_image(image):
    inputs = processor(images=image, return_tensors="pt")
    with torch.no_grad():
        embeddings = model.get_image_features(**inputs)
    return embeddings.numpy()

# 2. 建立多模态索引
from PIL import Image

text_embeddings = []
image_embeddings = []
metadata = []

for doc in documents:
    # 文本
    text_emb = encode_text(doc['text'])
    text_embeddings.append(text_emb)

    # 图片
    if 'image_path' in doc:
        image = Image.open(doc['image_path'])
        image_emb = encode_image(image)
        image_embeddings.append(image_emb)

    metadata.append({'type': 'text' if 'image_path' not in doc else 'multimodal',
                     'id': doc['id']})

# 3. 统一向量空间存储
combined_embeddings = text_embeddings + image_embeddings
vectorstore.add(combined_embeddings, metadata)

# 4. 多模态检索
def multimodal_search(query, query_type='text', k=5):
    if query_type == 'text':
        query_emb = encode_text(query)
    else:  # image
        query_emb = encode_image(query)

    results = vectorstore.search(query_emb, k=k)
    return results

# 5. 多模态生成(GPT-4V)
def generate_answer(query, text_docs, image_docs):
    prompt = {
        "text": f"问题: {query}\n\n文本上下文:\n" + "\n".join(text_docs),
        "images": image_docs
    }

    response = openai.ChatCompletion.create(
        model="gpt-4-vision-preview",
        messages=[
            {"role": "user", "content": [
                {"type": "text", "text": prompt["text"]},
                *[{"type": "image_url", "image_url": {"url": img}} for img in prompt["images"]]
            ]}
        ]
    )

    return response.choices[0].message.content

应用场景:

  • 产品文档(说明书+图片)
  • 医疗诊断(病历+影像)
  • 教育(课本+图表)
  • 电商(商品描述+图片)

面试题5: RAG vs Fine-tuning,如何选择?

答案:

决策矩阵:

因素RAGFine-tuning推荐
知识更新频率高低RAG
需要引用来源是否RAG
任务特定优化否是Fine-tuning
预算有限是否RAG
私有化部署都可都可-
响应延迟要求宽松严格Fine-tuning
数据量少(<1000)多(>1000)RAG / Fine-tuning

组合方案:

# RAG + Fine-tuning
# 1. 在领域数据上微调base model
fine_tuned_model = fine_tune(base_model, domain_data)

# 2. 使用微调模型做RAG
qa_chain = RetrievalQA.from_chain_type(
    llm=fine_tuned_model,  # 使用微调后的模型
    retriever=vectorstore.as_retriever()
)

# 优势: 兼具领域知识(微调)和事实准确性(RAG)

选择流程:

def choose_strategy(task_requirements):
    if task_requirements['knowledge_updates'] == 'frequent':
        return 'RAG'

    if task_requirements['need_citation']:
        return 'RAG'

    if task_requirements['task_specific'] and task_requirements['budget'] > 'medium':
        if task_requirements['data_size'] > 1000:
            return 'Fine-tuning'
        else:
            return 'RAG + PEFT (LoRA)'

    if task_requirements['latency'] == 'critical':
        return 'Fine-tuning'

    return 'RAG'  # 默认推荐RAG