实战项目
1. 智能问答系统
构建完整的RAG问答系统,包含知识库构建、检索和生成全流程。
1.1 系统架构
用户问题
↓
问题预处理
↓
向量检索 → 向量数据库(FAISS/Chroma)
↓
文档重排序(Rerank)
↓
Prompt构建
↓
LLM生成答案
↓
后处理&引用标注
↓
返回结果
1.2 完整代码实现
import os
from typing import List, Dict
from dataclasses import dataclass
import numpy as np
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import DirectoryLoader, TextLoader
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import openai
@dataclass
class Document:
"""文档数据结构"""
content: str
metadata: Dict
embedding: np.ndarray = None
class KnowledgeBase:
"""知识库管理"""
def __init__(self, embedding_model="text-embedding-3-small"):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.vectorstore = None
self.documents = []
def load_documents(self, directory: str, file_types: List[str] = ["txt", "md"]):
"""加载文档"""
all_docs = []
for file_type in file_types:
loader = DirectoryLoader(
directory,
glob=f"**/*.{file_type}",
loader_cls=TextLoader
)
docs = loader.load()
all_docs.extend(docs)
print(f"加载了 {len(all_docs)} 个文档")
return all_docs
def split_documents(self, documents: List, chunk_size=1000, chunk_overlap=200):
"""文档分块"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", "。", ".", " ", ""]
)
chunks = text_splitter.split_documents(documents)
print(f"分割成 {len(chunks)} 个文本块")
return chunks
def build_index(self, chunks: List):
"""构建向量索引"""
print("正在构建向量索引...")
self.vectorstore = FAISS.from_documents(chunks, self.embeddings)
print("索引构建完成")
def save_index(self, path: str):
"""保存索引"""
self.vectorstore.save_local(path)
print(f"索引已保存到 {path}")
def load_index(self, path: str):
"""加载索引"""
self.vectorstore = FAISS.load_local(path, self.embeddings)
print(f"索引已从 {path} 加载")
def search(self, query: str, k: int = 5):
"""检索相关文档"""
docs = self.vectorstore.similarity_search_with_score(query, k=k)
return docs
class Reranker:
"""重排序模块"""
def __init__(self, model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: List, top_k: int = 3):
"""对检索结果重排序"""
# 构造query-doc对
pairs = [[query, doc.page_content] for doc in documents]
# 计算相关性分数
scores = self.model.predict(pairs)
# 排序
ranked_indices = np.argsort(scores)[::-1][:top_k]
ranked_docs = [documents[i] for i in ranked_indices]
ranked_scores = [scores[i] for i in ranked_indices]
return ranked_docs, ranked_scores
class PromptBuilder:
"""Prompt构建器"""
@staticmethod
def build_qa_prompt(query: str, contexts: List[str]) -> str:
"""构建问答Prompt"""
# 格式化上下文
context_str = "\n\n".join([
f"[文档{i+1}]\n{ctx}"
for i, ctx in enumerate(contexts)
])
prompt = f"""你是一个专业的AI助手,请基于以下参考文档回答用户的问题。
要求:
1. 回答必须基于参考文档,不要编造信息
2. 如果参考文档中没有相关信息,请明确说明"根据提供的文档无法回答此问题"
3. 在回答中标注信息来源,使用[文档X]的格式
4. 回答要准确、简洁、专业
参考文档:
{context_str}
问题: {query}
回答:"""
return prompt
class QASystem:
"""问答系统"""
def __init__(self, knowledge_base: KnowledgeBase, use_rerank=True):
self.kb = knowledge_base
self.use_rerank = use_rerank
self.reranker = Reranker() if use_rerank else None
self.llm = OpenAI(temperature=0, max_tokens=500)
def answer(self, query: str, verbose=False) -> Dict:
"""回答问题"""
# 1. 检索
if verbose:
print(f"\n问题: {query}\n")
print("正在检索相关文档...")
retrieved_docs = self.kb.search(query, k=10)
if verbose:
print(f"检索到 {len(retrieved_docs)} 个候选文档")
# 2. 重排序(可选)
if self.use_rerank:
if verbose:
print("正在重排序...")
docs_only = [doc for doc, score in retrieved_docs]
ranked_docs, ranked_scores = self.reranker.rerank(query, docs_only, top_k=3)
if verbose:
print(f"重排序后保留 {len(ranked_docs)} 个文档")
else:
ranked_docs = [doc for doc, score in retrieved_docs[:3]]
# 3. 构建Prompt
contexts = [doc.page_content for doc in ranked_docs]
prompt = PromptBuilder.build_qa_prompt(query, contexts)
if verbose:
print(f"\nPrompt长度: {len(prompt)} 字符\n")
# 4. LLM生成
if verbose:
print("正在生成答案...")
answer = self.llm.predict(prompt)
# 5. 构建返回结果
result = {
"question": query,
"answer": answer,
"sources": [
{
"content": doc.page_content[:200] + "...",
"metadata": doc.metadata,
"relevance_score": float(ranked_scores[i]) if self.use_rerank else None
}
for i, doc in enumerate(ranked_docs)
]
}
return result
# ============ 使用示例 ============
def main():
# 1. 创建知识库
kb = KnowledgeBase()
# 2. 加载文档
docs = kb.load_documents("./knowledge_docs")
# 3. 分块
chunks = kb.split_documents(docs, chunk_size=800, chunk_overlap=150)
# 4. 构建索引
kb.build_index(chunks)
# 5. 保存索引(可选)
kb.save_index("./faiss_index")
# 或者直接加载已有索引
# kb.load_index("./faiss_index")
# 6. 创建问答系统
qa_system = QASystem(kb, use_rerank=True)
# 7. 测试问答
questions = [
"什么是RAG系统?",
"如何优化向量检索的召回率?",
"LoRA的原理是什么?"
]
for question in questions:
result = qa_system.answer(question, verbose=True)
print(f"\n{'='*60}")
print(f"问题: {result['question']}")
print(f"\n答案: {result['answer']}")
print(f"\n来源文档:")
for i, source in enumerate(result['sources'], 1):
print(f"\n[文档{i}]")
print(f"内容: {source['content']}")
print(f"来源: {source['metadata'].get('source', 'Unknown')}")
if source['relevance_score']:
print(f"相关性: {source['relevance_score']:.4f}")
print(f"{'='*60}\n")
if __name__ == "__main__":
main()
1.3 流式输出优化
from typing import Iterator
import openai
class StreamingQASystem(QASystem):
"""支持流式输出的问答系统"""
def answer_stream(self, query: str) -> Iterator[str]:
"""流式生成答案"""
# 检索和重排序
retrieved_docs = self.kb.search(query, k=10)
docs_only = [doc for doc, score in retrieved_docs]
if self.use_rerank:
ranked_docs, _ = self.reranker.rerank(query, docs_only, top_k=3)
else:
ranked_docs = docs_only[:3]
# 构建Prompt
contexts = [doc.page_content for doc in ranked_docs]
prompt = PromptBuilder.build_qa_prompt(query, contexts)
# 流式生成
for chunk in openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True
):
if chunk.choices[0].delta.get("content"):
yield chunk.choices[0].delta.content
# 使用
qa_system = StreamingQASystem(kb, use_rerank=True)
print("答案: ", end="", flush=True)
for token in qa_system.answer_stream("什么是RAG?"):
print(token, end="", flush=True)
print()
1.4 评估系统
from typing import List, Tuple
import json
class QAEvaluator:
"""问答系统评估器"""
def __init__(self, qa_system: QASystem):
self.qa_system = qa_system
def evaluate(self, test_data: List[Dict]) -> Dict:
"""
test_data格式:
[
{
"question": "问题",
"ground_truth": "标准答案",
"relevant_docs": ["相关文档ID列表"]
}
]
"""
metrics = {
"retrieval_recall": [],
"retrieval_precision": [],
"answer_relevance": [],
"faithfulness": []
}
for item in test_data:
question = item["question"]
ground_truth = item.get("ground_truth")
relevant_doc_ids = set(item.get("relevant_docs", []))
# 获取系统回答
result = self.qa_system.answer(question)
# 评估检索质量
retrieved_doc_ids = {
source["metadata"].get("doc_id")
for source in result["sources"]
}
# Recall: 相关文档被检索到的比例
recall = len(relevant_doc_ids & retrieved_doc_ids) / len(relevant_doc_ids) if relevant_doc_ids else 0
metrics["retrieval_recall"].append(recall)
# Precision: 检索到的文档中相关的比例
precision = len(relevant_doc_ids & retrieved_doc_ids) / len(retrieved_doc_ids) if retrieved_doc_ids else 0
metrics["retrieval_precision"].append(precision)
# 评估答案质量(需要LLM)
if ground_truth:
relevance = self._evaluate_answer_relevance(question, result["answer"])
metrics["answer_relevance"].append(relevance)
faithfulness = self._evaluate_faithfulness(result["answer"], result["sources"])
metrics["faithfulness"].append(faithfulness)
# 计算平均指标
return {
"retrieval_recall": np.mean(metrics["retrieval_recall"]),
"retrieval_precision": np.mean(metrics["retrieval_precision"]),
"answer_relevance": np.mean(metrics["answer_relevance"]),
"faithfulness": np.mean(metrics["faithfulness"])
}
def _evaluate_answer_relevance(self, question: str, answer: str) -> float:
"""评估答案相关性"""
prompt = f"""
问题: {question}
答案: {answer}
请评估答案是否充分回答了问题。打分0-10,其中10分表示完全回答了问题。
只输出数字,不要解释。
评分:"""
score = self.qa_system.llm.predict(prompt).strip()
try:
return float(score) / 10.0
except:
return 0.5
def _evaluate_faithfulness(self, answer: str, sources: List[Dict]) -> float:
"""评估答案忠实度(是否基于文档)"""
context = "\n\n".join([s["content"] for s in sources])
prompt = f"""
参考文档:
{context}
生成的答案:
{answer}
答案中的所有信息是否都能在参考文档中找到依据?打分0-10,其中10分表示完全基于文档。
只输出数字。
评分:"""
score = self.qa_system.llm.predict(prompt).strip()
try:
return float(score) / 10.0
except:
return 0.5
# 使用
test_data = [
{
"question": "什么是LoRA?",
"ground_truth": "LoRA是一种参数高效的微调方法...",
"relevant_docs": ["doc_123", "doc_456"]
}
]
evaluator = QAEvaluator(qa_system)
results = evaluator.evaluate(test_data)
print(json.dumps(results, indent=2, ensure_ascii=False))
2. 文本分类项目
使用BERT微调构建文本分类器。
2.1 数据准备
import pandas as pd
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict
# 加载数据
df = pd.read_csv("text_classification_data.csv")
# 列: text, label
# 划分数据集
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.1, random_state=42)
# 转换为HuggingFace Dataset
dataset = DatasetDict({
"train": Dataset.from_pandas(train_df),
"validation": Dataset.from_pandas(val_df),
"test": Dataset.from_pandas(test_df)
})
print(f"训练集: {len(dataset['train'])}")
print(f"验证集: {len(dataset['validation'])}")
print(f"测试集: {len(dataset['test'])}")
2.2 BERT微调
from transformers import (
BertTokenizer,
BertForSequenceClassification,
TrainingArguments,
Trainer
)
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import torch
# 1. 加载模型和tokenizer
model_name = "bert-base-chinese" # 或 "bert-base-uncased" for English
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(
model_name,
num_labels=len(df['label'].unique())
)
# 2. 数据预处理
def preprocess_function(examples):
return tokenizer(
examples["text"],
truncation=True,
padding="max_length",
max_length=512
)
tokenized_dataset = dataset.map(preprocess_function, batched=True)
# 3. 定义评估指标
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = logits.argmax(axis=-1)
accuracy = accuracy_score(labels, predictions)
precision, recall, f1, _ = precision_recall_fscore_support(
labels,
predictions,
average='weighted'
)
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1": f1
}
# 4. 训练参数
training_args = TrainingArguments(
output_dir="./results",
evaluation_strategy="epoch",
save_strategy="epoch",
learning_rate=2e-5,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
num_train_epochs=3,
weight_decay=0.01,
load_best_model_at_end=True,
metric_for_best_model="f1",
logging_dir="./logs",
logging_steps=100,
warmup_steps=500,
fp16=True, # 使用混合精度训练
)
# 5. 创建Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset["train"],
eval_dataset=tokenized_dataset["validation"],
compute_metrics=compute_metrics,
)
# 6. 训练
print("开始训练...")
trainer.train()
# 7. 评估
print("\n测试集评估:")
test_results = trainer.evaluate(tokenized_dataset["test"])
print(test_results)
# 8. 保存模型
model.save_pretrained("./text_classifier")
tokenizer.save_pretrained("./text_classifier")
print("模型已保存")
2.3 推理和部署
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import torch
app = FastAPI()
# 加载模型
model = BertForSequenceClassification.from_pretrained("./text_classifier")
tokenizer = BertTokenizer.from_pretrained("./text_classifier")
model.eval()
# 标签映射
label_names = ["负面", "中性", "正面"] # 根据实际情况修改
class TextInput(BaseModel):
texts: List[str]
class Prediction(BaseModel):
text: str
label: str
confidence: float
@app.post("/predict", response_model=List[Prediction])
def predict(input_data: TextInput):
"""批量预测"""
# Tokenize
inputs = tokenizer(
input_data.texts,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
)
# 预测
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.softmax(logits, dim=-1)
# 解析结果
predictions = []
for i, text in enumerate(input_data.texts):
confidence, label_id = probabilities[i].max(dim=-1)
predictions.append(Prediction(
text=text,
label=label_names[label_id.item()],
confidence=confidence.item()
))
return predictions
# 运行: uvicorn main:app --reload
3. Agent系统
基于ReAct框架构建智能Agent。
3.1 ReAct框架实现
from typing import List, Dict, Callable
import re
from langchain.llms import OpenAI
from langchain.tools import Tool
class ReActAgent:
"""ReAct Agent实现"""
def __init__(self, llm, tools: List[Tool], max_iterations=5):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.max_iterations = max_iterations
def run(self, task: str, verbose=True) -> str:
"""执行任务"""
prompt_template = """你是一个能够使用工具解决问题的AI助手。
可用工具:
{tools}
使用以下格式:
Thought: 我需要思考下一步做什么
Action: 要使用的工具名称
Action Input: 工具的输入
Observation: 工具的输出
... (可以重复Thought/Action/Observation)
Thought: 我现在知道最终答案了
Final Answer: 最终答案
开始!
Question: {task}
{history}"""
tools_desc = "\n".join([
f"- {name}: {tool.description}"
for name, tool in self.tools.items()
])
history = ""
for i in range(self.max_iterations):
# 构建Prompt
prompt = prompt_template.format(
tools=tools_desc,
task=task,
history=history
)
# LLM生成
response = self.llm.predict(prompt)
if verbose:
print(f"\n{'='*60}")
print(f"Iteration {i+1}")
print(f"{'='*60}")
print(response)
# 解析响应
if "Final Answer:" in response:
# 找到最终答案
final_answer = response.split("Final Answer:")[-1].strip()
return final_answer
# 提取Action和Action Input
action_match = re.search(r"Action:\s*(.+)", response)
action_input_match = re.search(r"Action Input:\s*(.+)", response)
if action_match and action_input_match:
action = action_match.group(1).strip()
action_input = action_input_match.group(1).strip()
# 执行工具
if action in self.tools:
observation = self.tools[action].func(action_input)
if verbose:
print(f"\nObservation: {observation}")
# 更新历史
history += f"\n{response}\nObservation: {observation}\n"
else:
observation = f"错误: 工具 '{action}' 不存在"
history += f"\n{response}\nObservation: {observation}\n"
else:
# 无法解析Action,提示LLM
history += f"\n{response}\nObservation: 请使用正确的格式指定Action和Action Input\n"
return "达到最大迭代次数,未能完成任务"
# ============ 定义工具 ============
def search_wikipedia(query: str) -> str:
"""搜索Wikipedia"""
import wikipedia
try:
return wikipedia.summary(query, sentences=3)
except:
return "未找到相关信息"
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return str(result)
except Exception as e:
return f"计算错误: {str(e)}"
def get_current_time(query: str) -> str:
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 创建工具列表
tools = [
Tool(
name="Wikipedia",
func=search_wikipedia,
description="搜索Wikipedia获取信息。输入应该是搜索查询。"
),
Tool(
name="Calculator",
func=calculate,
description="计算数学表达式。输入应该是有效的Python表达式,如'2+2'或'3*4'"
),
Tool(
name="CurrentTime",
func=get_current_time,
description="获取当前时间。不需要输入。"
)
]
# ============ 使用Agent ============
llm = OpenAI(temperature=0)
agent = ReActAgent(llm, tools, max_iterations=5)
# 测试任务
tasks = [
"埃菲尔铁塔的高度是多少?如果加上50米是多少?",
"现在几点了?距离2024年1月1日还有多少天?",
]
for task in tasks:
print(f"\n\n{'#'*60}")
print(f"Task: {task}")
print(f"{'#'*60}")
answer = agent.run(task, verbose=True)
print(f"\n{'='*60}")
print(f"Final Answer: {answer}")
print(f"{'='*60}\n")
3.2 LangChain Agent实现
from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
from langchain.tools import Tool
from langchain.memory import ConversationBufferMemory
# 定义工具(同上)
# 创建Memory(记住对话历史)
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
# 初始化Agent
agent = initialize_agent(
tools=tools,
llm=OpenAI(temperature=0),
agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
memory=memory,
verbose=True,
max_iterations=5,
early_stopping_method="generate"
)
# 运行
response = agent.run("埃菲尔铁塔有多高?")
print(response)
# 继续对话(会记住上下文)
response = agent.run("如果再加上50米呢?")
print(response)
3.3 Tool Calling (函数调用)
import openai
import json
# 定义函数schema
functions = [
{
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,如'北京'"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "温度单位"
}
},
"required": ["city"]
}
},
{
"name": "search_database",
"description": "在数据库中搜索信息",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
}
},
"required": ["query"]
}
}
]
# 实际的函数实现
def get_weather(city: str, unit: str = "celsius") -> str:
# 实际应该调用天气API
return json.dumps({"city": city, "temperature": 22, "unit": unit, "condition": "晴"})
def search_database(query: str) -> str:
# 实际应该查询数据库
return json.dumps({"results": [f"关于{query}的结果1", f"关于{query}的结果2"]})
# Agent循环
def run_agent(user_message: str):
messages = [{"role": "user", "content": user_message}]
while True:
# 调用LLM
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
functions=functions,
function_call="auto"
)
message = response.choices[0].message
# 检查是否需要调用函数
if message.get("function_call"):
function_name = message["function_call"]["name"]
function_args = json.loads(message["function_call"]["arguments"])
print(f"\n调用函数: {function_name}")
print(f"参数: {function_args}")
# 执行函数
if function_name == "get_weather":
function_response = get_weather(**function_args)
elif function_name == "search_database":
function_response = search_database(**function_args)
print(f"函数返回: {function_response}\n")
# 将函数结果添加到对话
messages.append(message)
messages.append({
"role": "function",
"name": function_name,
"content": function_response
})
else:
# 没有函数调用,返回最终答案
return message["content"]
# 测试
question = "北京今天天气怎么样?温度用摄氏度表示"
answer = run_agent(question)
print(f"最终答案: {answer}")
4. 多模态应用
4.1 图生文(Image Captioning)
from transformers import BlipProcessor, BlipForConditionalGeneration
from PIL import Image
import torch
# 加载模型
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large")
def generate_caption(image_path: str, prompt: str = None) -> str:
"""生成图片描述"""
image = Image.open(image_path).convert('RGB')
if prompt:
# 条件生成
inputs = processor(image, prompt, return_tensors="pt")
else:
# 无条件生成
inputs = processor(image, return_tensors="pt")
with torch.no_grad():
outputs = model.generate(**inputs, max_length=50, num_beams=5)
caption = processor.decode(outputs[0], skip_special_tokens=True)
return caption
# 使用
caption = generate_caption("example.jpg")
print(f"图片描述: {caption}")
# 条件生成
caption = generate_caption("example.jpg", prompt="a photography of")
print(f"条件描述: {caption}")
4.2 文生图(Text-to-Image)
from diffusers import StableDiffusionPipeline
import torch
# 加载模型
model_id = "runwayml/stable-diffusion-v1-5"
pipe = StableDiffusionPipeline.from_pretrained(
model_id,
torch_dtype=torch.float16
)
pipe = pipe.to("cuda")
def generate_image(
prompt: str,
negative_prompt: str = "",
num_inference_steps: int = 50,
guidance_scale: float = 7.5,
num_images: int = 1
):
"""根据文本生成图片"""
images = pipe(
prompt=prompt,
negative_prompt=negative_prompt,
num_inference_steps=num_inference_steps,
guidance_scale=guidance_scale,
num_images_per_prompt=num_images
).images
return images
# 使用
prompt = "a beautiful landscape with mountains and a lake, sunset, photorealistic"
negative_prompt = "blurry, low quality, distorted"
images = generate_image(prompt, negative_prompt, num_images=2)
# 保存图片
for i, image in enumerate(images):
image.save(f"generated_{i}.png")
print(f"图片已保存: generated_{i}.png")
4.3 多模态RAG
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import torch
import numpy as np
class MultimodalRAG:
"""支持图文检索的RAG系统"""
def __init__(self):
# CLIP模型(统一图文编码)
self.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
self.text_embeddings = []
self.image_embeddings = []
self.documents = []
def add_document(self, text: str = None, image_path: str = None, metadata: dict = None):
"""添加文档(可以是纯文本、纯图片或图文混合)"""
doc = {"text": text, "image_path": image_path, "metadata": metadata or {}}
# 编码文本
if text:
inputs = self.processor(text=[text], return_tensors="pt", padding=True)
with torch.no_grad():
text_emb = self.model.get_text_features(**inputs)
self.text_embeddings.append(text_emb.numpy())
else:
self.text_embeddings.append(None)
# 编码图片
if image_path:
image = Image.open(image_path)
inputs = self.processor(images=image, return_tensors="pt")
with torch.no_grad():
image_emb = self.model.get_image_features(**inputs)
self.image_embeddings.append(image_emb.numpy())
else:
self.image_embeddings.append(None)
self.documents.append(doc)
def search(self, query: str = None, query_image_path: str = None, k: int = 5):
"""多模态检索"""
# 编码查询
if query:
inputs = self.processor(text=[query], return_tensors="pt")
with torch.no_grad():
query_emb = self.model.get_text_features(**inputs).numpy()
elif query_image_path:
image = Image.open(query_image_path)
inputs = self.processor(images=image, return_tensors="pt")
with torch.no_grad():
query_emb = self.model.get_image_features(**inputs).numpy()
# 计算相似度
scores = []
for i in range(len(self.documents)):
score = 0
count = 0
if self.text_embeddings[i] is not None:
score += cosine_similarity(query_emb, self.text_embeddings[i])[0][0]
count += 1
if self.image_embeddings[i] is not None:
score += cosine_similarity(query_emb, self.image_embeddings[i])[0][0]
count += 1
scores.append(score / count if count > 0 else 0)
# 返回Top-K
top_indices = np.argsort(scores)[-k:][::-1]
results = [
(self.documents[i], scores[i])
for i in top_indices
]
return results
def cosine_similarity(a, b):
return np.dot(a, b.T) / (np.linalg.norm(a) * np.linalg.norm(b))
# 使用
mm_rag = MultimodalRAG()
# 添加文档
mm_rag.add_document(
text="一只可爱的猫咪坐在沙发上",
image_path="cat.jpg",
metadata={"source": "doc1"}
)
mm_rag.add_document(
text="美丽的日落风景",
image_path="sunset.jpg",
metadata={"source": "doc2"}
)
# 文本查询
results = mm_rag.search(query="猫", k=2)
for doc, score in results:
print(f"相似度: {score:.4f}")
print(f"文本: {doc['text']}")
print(f"图片: {doc['image_path']}")
print()
# 图片查询
results = mm_rag.search(query_image_path="query.jpg", k=2)
5. 项目部署
5.1 FastAPI服务
from fastapi import FastAPI, HTTPException, File, UploadFile
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import uvicorn
app = FastAPI(title="AI服务API")
# ============ 问答接口 ============
class QARequest(BaseModel):
question: str
stream: bool = False
@app.post("/qa")
async def qa_endpoint(request: QARequest):
"""问答接口"""
try:
if request.stream:
# 流式返回
def generate():
for token in qa_system.answer_stream(request.question):
yield token
return StreamingResponse(generate(), media_type="text/plain")
else:
# 一次性返回
result = qa_system.answer(request.question)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============ 分类接口 ============
class ClassificationRequest(BaseModel):
texts: list[str]
@app.post("/classify")
async def classify_endpoint(request: ClassificationRequest):
"""文本分类接口"""
try:
predictions = classifier.predict(request.texts)
return {"predictions": predictions}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============ 图片描述接口 ============
@app.post("/caption")
async def caption_endpoint(file: UploadFile = File(...)):
"""图片描述生成"""
try:
# 保存上传的图片
with open(f"temp_{file.filename}", "wb") as f:
f.write(await file.read())
# 生成描述
caption = generate_caption(f"temp_{file.filename}")
return {"caption": caption}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============ 健康检查 ============
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
5.2 Docker部署
# Dockerfile
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04
# 安装Python
RUN apt-get update && apt-get install -y python3 python3-pip
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python3", "main.py"]
# docker-compose.yml
version: '3.8'
services:
ai-service:
build: .
ports:
- "8000:8000"
volumes:
- ./models:/app/models
- ./data:/app/data
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
# 构建和运行
docker-compose up --build
# 测试
curl -X POST "http://localhost:8000/qa" \
-H "Content-Type: application/json" \
-d '{"question": "什么是AI?"}'
5.3 监控和日志
import logging
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Request
import time
# ============ 配置日志 ============
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# ============ Prometheus指标 ============
REQUEST_COUNT = Counter('requests_total', 'Total requests', ['endpoint', 'status'])
REQUEST_LATENCY = Histogram('request_latency_seconds', 'Request latency', ['endpoint'])
# ============ 中间件 ============
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
# 记录请求
logger.info(f"Request: {request.method} {request.url.path}")
try:
response = await call_next(request)
status = response.status_code
# 记录响应
duration = time.time() - start_time
logger.info(f"Response: {status} ({duration:.2f}s)")
# 更新指标
REQUEST_COUNT.labels(endpoint=request.url.path, status=status).inc()
REQUEST_LATENCY.labels(endpoint=request.url.path).observe(duration)
return response
except Exception as e:
logger.error(f"Error: {str(e)}", exc_info=True)
REQUEST_COUNT.labels(endpoint=request.url.path, status=500).inc()
raise
# ============ 指标端点 ============
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")