HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 技术面试完全指南

    • 技术面试完全指南
    • 8年面试官告诉你:90%的简历在第一轮就被刷掉了
    • 刷了500道LeetCode,终于明白大厂算法面试到底考什么
    • 高频算法题精讲-双指针与滑动窗口
    • 03-高频算法题精讲-二分查找与排序
    • 04-高频算法题精讲-树与递归
    • 05-高频算法题精讲-图与拓扑排序
    • 06-高频算法题精讲-动态规划
    • Go面试必问:一道GMP问题,干掉90%的候选人
    • 08-数据库面试高频题
    • 09-分布式系统面试题
    • 10-Kubernetes与云原生面试题
    • 11-系统设计面试方法论
    • 前端面试高频题
    • AI 与机器学习面试题
    • 行为面试与软技能

实战项目

1. 智能问答系统

构建完整的RAG问答系统,包含知识库构建、检索和生成全流程。

1.1 系统架构

用户问题
    ↓
问题预处理
    ↓
向量检索 → 向量数据库(FAISS/Chroma)
    ↓
文档重排序(Rerank)
    ↓
Prompt构建
    ↓
LLM生成答案
    ↓
后处理&引用标注
    ↓
返回结果

1.2 完整代码实现

import os
from typing import List, Dict
from dataclasses import dataclass
import numpy as np
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import DirectoryLoader, TextLoader
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import openai

@dataclass
class Document:
    """文档数据结构"""
    content: str
    metadata: Dict
    embedding: np.ndarray = None

class KnowledgeBase:
    """知识库管理"""

    def __init__(self, embedding_model="text-embedding-3-small"):
        self.embeddings = OpenAIEmbeddings(model=embedding_model)
        self.vectorstore = None
        self.documents = []

    def load_documents(self, directory: str, file_types: List[str] = ["txt", "md"]):
        """加载文档"""
        all_docs = []

        for file_type in file_types:
            loader = DirectoryLoader(
                directory,
                glob=f"**/*.{file_type}",
                loader_cls=TextLoader
            )
            docs = loader.load()
            all_docs.extend(docs)

        print(f"加载了 {len(all_docs)} 个文档")
        return all_docs

    def split_documents(self, documents: List, chunk_size=1000, chunk_overlap=200):
        """文档分块"""
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", "。", ".", " ", ""]
        )

        chunks = text_splitter.split_documents(documents)
        print(f"分割成 {len(chunks)} 个文本块")

        return chunks

    def build_index(self, chunks: List):
        """构建向量索引"""
        print("正在构建向量索引...")
        self.vectorstore = FAISS.from_documents(chunks, self.embeddings)
        print("索引构建完成")

    def save_index(self, path: str):
        """保存索引"""
        self.vectorstore.save_local(path)
        print(f"索引已保存到 {path}")

    def load_index(self, path: str):
        """加载索引"""
        self.vectorstore = FAISS.load_local(path, self.embeddings)
        print(f"索引已从 {path} 加载")

    def search(self, query: str, k: int = 5):
        """检索相关文档"""
        docs = self.vectorstore.similarity_search_with_score(query, k=k)
        return docs

class Reranker:
    """重排序模块"""

    def __init__(self, model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)

    def rerank(self, query: str, documents: List, top_k: int = 3):
        """对检索结果重排序"""
        # 构造query-doc对
        pairs = [[query, doc.page_content] for doc in documents]

        # 计算相关性分数
        scores = self.model.predict(pairs)

        # 排序
        ranked_indices = np.argsort(scores)[::-1][:top_k]
        ranked_docs = [documents[i] for i in ranked_indices]
        ranked_scores = [scores[i] for i in ranked_indices]

        return ranked_docs, ranked_scores

class PromptBuilder:
    """Prompt构建器"""

    @staticmethod
    def build_qa_prompt(query: str, contexts: List[str]) -> str:
        """构建问答Prompt"""
        # 格式化上下文
        context_str = "\n\n".join([
            f"[文档{i+1}]\n{ctx}"
            for i, ctx in enumerate(contexts)
        ])

        prompt = f"""你是一个专业的AI助手,请基于以下参考文档回答用户的问题。

要求:
1. 回答必须基于参考文档,不要编造信息
2. 如果参考文档中没有相关信息,请明确说明"根据提供的文档无法回答此问题"
3. 在回答中标注信息来源,使用[文档X]的格式
4. 回答要准确、简洁、专业

参考文档:
{context_str}

问题: {query}

回答:"""

        return prompt

class QASystem:
    """问答系统"""

    def __init__(self, knowledge_base: KnowledgeBase, use_rerank=True):
        self.kb = knowledge_base
        self.use_rerank = use_rerank
        self.reranker = Reranker() if use_rerank else None
        self.llm = OpenAI(temperature=0, max_tokens=500)

    def answer(self, query: str, verbose=False) -> Dict:
        """回答问题"""
        # 1. 检索
        if verbose:
            print(f"\n问题: {query}\n")
            print("正在检索相关文档...")

        retrieved_docs = self.kb.search(query, k=10)

        if verbose:
            print(f"检索到 {len(retrieved_docs)} 个候选文档")

        # 2. 重排序(可选)
        if self.use_rerank:
            if verbose:
                print("正在重排序...")

            docs_only = [doc for doc, score in retrieved_docs]
            ranked_docs, ranked_scores = self.reranker.rerank(query, docs_only, top_k=3)

            if verbose:
                print(f"重排序后保留 {len(ranked_docs)} 个文档")
        else:
            ranked_docs = [doc for doc, score in retrieved_docs[:3]]

        # 3. 构建Prompt
        contexts = [doc.page_content for doc in ranked_docs]
        prompt = PromptBuilder.build_qa_prompt(query, contexts)

        if verbose:
            print(f"\nPrompt长度: {len(prompt)} 字符\n")

        # 4. LLM生成
        if verbose:
            print("正在生成答案...")

        answer = self.llm.predict(prompt)

        # 5. 构建返回结果
        result = {
            "question": query,
            "answer": answer,
            "sources": [
                {
                    "content": doc.page_content[:200] + "...",
                    "metadata": doc.metadata,
                    "relevance_score": float(ranked_scores[i]) if self.use_rerank else None
                }
                for i, doc in enumerate(ranked_docs)
            ]
        }

        return result

# ============ 使用示例 ============

def main():
    # 1. 创建知识库
    kb = KnowledgeBase()

    # 2. 加载文档
    docs = kb.load_documents("./knowledge_docs")

    # 3. 分块
    chunks = kb.split_documents(docs, chunk_size=800, chunk_overlap=150)

    # 4. 构建索引
    kb.build_index(chunks)

    # 5. 保存索引(可选)
    kb.save_index("./faiss_index")

    # 或者直接加载已有索引
    # kb.load_index("./faiss_index")

    # 6. 创建问答系统
    qa_system = QASystem(kb, use_rerank=True)

    # 7. 测试问答
    questions = [
        "什么是RAG系统?",
        "如何优化向量检索的召回率?",
        "LoRA的原理是什么?"
    ]

    for question in questions:
        result = qa_system.answer(question, verbose=True)

        print(f"\n{'='*60}")
        print(f"问题: {result['question']}")
        print(f"\n答案: {result['answer']}")
        print(f"\n来源文档:")
        for i, source in enumerate(result['sources'], 1):
            print(f"\n[文档{i}]")
            print(f"内容: {source['content']}")
            print(f"来源: {source['metadata'].get('source', 'Unknown')}")
            if source['relevance_score']:
                print(f"相关性: {source['relevance_score']:.4f}")
        print(f"{'='*60}\n")

if __name__ == "__main__":
    main()

1.3 流式输出优化

from typing import Iterator
import openai

class StreamingQASystem(QASystem):
    """支持流式输出的问答系统"""

    def answer_stream(self, query: str) -> Iterator[str]:
        """流式生成答案"""
        # 检索和重排序
        retrieved_docs = self.kb.search(query, k=10)
        docs_only = [doc for doc, score in retrieved_docs]

        if self.use_rerank:
            ranked_docs, _ = self.reranker.rerank(query, docs_only, top_k=3)
        else:
            ranked_docs = docs_only[:3]

        # 构建Prompt
        contexts = [doc.page_content for doc in ranked_docs]
        prompt = PromptBuilder.build_qa_prompt(query, contexts)

        # 流式生成
        for chunk in openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        ):
            if chunk.choices[0].delta.get("content"):
                yield chunk.choices[0].delta.content

# 使用
qa_system = StreamingQASystem(kb, use_rerank=True)

print("答案: ", end="", flush=True)
for token in qa_system.answer_stream("什么是RAG?"):
    print(token, end="", flush=True)
print()

1.4 评估系统

from typing import List, Tuple
import json

class QAEvaluator:
    """问答系统评估器"""

    def __init__(self, qa_system: QASystem):
        self.qa_system = qa_system

    def evaluate(self, test_data: List[Dict]) -> Dict:
        """
        test_data格式:
        [
            {
                "question": "问题",
                "ground_truth": "标准答案",
                "relevant_docs": ["相关文档ID列表"]
            }
        ]
        """
        metrics = {
            "retrieval_recall": [],
            "retrieval_precision": [],
            "answer_relevance": [],
            "faithfulness": []
        }

        for item in test_data:
            question = item["question"]
            ground_truth = item.get("ground_truth")
            relevant_doc_ids = set(item.get("relevant_docs", []))

            # 获取系统回答
            result = self.qa_system.answer(question)

            # 评估检索质量
            retrieved_doc_ids = {
                source["metadata"].get("doc_id")
                for source in result["sources"]
            }

            # Recall: 相关文档被检索到的比例
            recall = len(relevant_doc_ids & retrieved_doc_ids) / len(relevant_doc_ids) if relevant_doc_ids else 0
            metrics["retrieval_recall"].append(recall)

            # Precision: 检索到的文档中相关的比例
            precision = len(relevant_doc_ids & retrieved_doc_ids) / len(retrieved_doc_ids) if retrieved_doc_ids else 0
            metrics["retrieval_precision"].append(precision)

            # 评估答案质量(需要LLM)
            if ground_truth:
                relevance = self._evaluate_answer_relevance(question, result["answer"])
                metrics["answer_relevance"].append(relevance)

                faithfulness = self._evaluate_faithfulness(result["answer"], result["sources"])
                metrics["faithfulness"].append(faithfulness)

        # 计算平均指标
        return {
            "retrieval_recall": np.mean(metrics["retrieval_recall"]),
            "retrieval_precision": np.mean(metrics["retrieval_precision"]),
            "answer_relevance": np.mean(metrics["answer_relevance"]),
            "faithfulness": np.mean(metrics["faithfulness"])
        }

    def _evaluate_answer_relevance(self, question: str, answer: str) -> float:
        """评估答案相关性"""
        prompt = f"""
问题: {question}
答案: {answer}

请评估答案是否充分回答了问题。打分0-10,其中10分表示完全回答了问题。
只输出数字,不要解释。

评分:"""

        score = self.qa_system.llm.predict(prompt).strip()
        try:
            return float(score) / 10.0
        except:
            return 0.5

    def _evaluate_faithfulness(self, answer: str, sources: List[Dict]) -> float:
        """评估答案忠实度(是否基于文档)"""
        context = "\n\n".join([s["content"] for s in sources])

        prompt = f"""
参考文档:
{context}

生成的答案:
{answer}

答案中的所有信息是否都能在参考文档中找到依据?打分0-10,其中10分表示完全基于文档。
只输出数字。

评分:"""

        score = self.qa_system.llm.predict(prompt).strip()
        try:
            return float(score) / 10.0
        except:
            return 0.5

# 使用
test_data = [
    {
        "question": "什么是LoRA?",
        "ground_truth": "LoRA是一种参数高效的微调方法...",
        "relevant_docs": ["doc_123", "doc_456"]
    }
]

evaluator = QAEvaluator(qa_system)
results = evaluator.evaluate(test_data)
print(json.dumps(results, indent=2, ensure_ascii=False))

2. 文本分类项目

使用BERT微调构建文本分类器。

2.1 数据准备

import pandas as pd
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict

# 加载数据
df = pd.read_csv("text_classification_data.csv")
# 列: text, label

# 划分数据集
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.1, random_state=42)

# 转换为HuggingFace Dataset
dataset = DatasetDict({
    "train": Dataset.from_pandas(train_df),
    "validation": Dataset.from_pandas(val_df),
    "test": Dataset.from_pandas(test_df)
})

print(f"训练集: {len(dataset['train'])}")
print(f"验证集: {len(dataset['validation'])}")
print(f"测试集: {len(dataset['test'])}")

2.2 BERT微调

from transformers import (
    BertTokenizer,
    BertForSequenceClassification,
    TrainingArguments,
    Trainer
)
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import torch

# 1. 加载模型和tokenizer
model_name = "bert-base-chinese"  # 或 "bert-base-uncased" for English
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(
    model_name,
    num_labels=len(df['label'].unique())
)

# 2. 数据预处理
def preprocess_function(examples):
    return tokenizer(
        examples["text"],
        truncation=True,
        padding="max_length",
        max_length=512
    )

tokenized_dataset = dataset.map(preprocess_function, batched=True)

# 3. 定义评估指标
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = logits.argmax(axis=-1)

    accuracy = accuracy_score(labels, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(
        labels,
        predictions,
        average='weighted'
    )

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1
    }

# 4. 训练参数
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    logging_dir="./logs",
    logging_steps=100,
    warmup_steps=500,
    fp16=True,  # 使用混合精度训练
)

# 5. 创建Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["validation"],
    compute_metrics=compute_metrics,
)

# 6. 训练
print("开始训练...")
trainer.train()

# 7. 评估
print("\n测试集评估:")
test_results = trainer.evaluate(tokenized_dataset["test"])
print(test_results)

# 8. 保存模型
model.save_pretrained("./text_classifier")
tokenizer.save_pretrained("./text_classifier")
print("模型已保存")

2.3 推理和部署

from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import torch

app = FastAPI()

# 加载模型
model = BertForSequenceClassification.from_pretrained("./text_classifier")
tokenizer = BertTokenizer.from_pretrained("./text_classifier")
model.eval()

# 标签映射
label_names = ["负面", "中性", "正面"]  # 根据实际情况修改

class TextInput(BaseModel):
    texts: List[str]

class Prediction(BaseModel):
    text: str
    label: str
    confidence: float

@app.post("/predict", response_model=List[Prediction])
def predict(input_data: TextInput):
    """批量预测"""
    # Tokenize
    inputs = tokenizer(
        input_data.texts,
        padding=True,
        truncation=True,
        max_length=512,
        return_tensors="pt"
    )

    # 预测
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        probabilities = torch.softmax(logits, dim=-1)

    # 解析结果
    predictions = []
    for i, text in enumerate(input_data.texts):
        confidence, label_id = probabilities[i].max(dim=-1)
        predictions.append(Prediction(
            text=text,
            label=label_names[label_id.item()],
            confidence=confidence.item()
        ))

    return predictions

# 运行: uvicorn main:app --reload

3. Agent系统

基于ReAct框架构建智能Agent。

3.1 ReAct框架实现

from typing import List, Dict, Callable
import re
from langchain.llms import OpenAI
from langchain.tools import Tool

class ReActAgent:
    """ReAct Agent实现"""

    def __init__(self, llm, tools: List[Tool], max_iterations=5):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
        self.max_iterations = max_iterations

    def run(self, task: str, verbose=True) -> str:
        """执行任务"""
        prompt_template = """你是一个能够使用工具解决问题的AI助手。

可用工具:
{tools}

使用以下格式:
Thought: 我需要思考下一步做什么
Action: 要使用的工具名称
Action Input: 工具的输入
Observation: 工具的输出
... (可以重复Thought/Action/Observation)
Thought: 我现在知道最终答案了
Final Answer: 最终答案

开始!

Question: {task}
{history}"""

        tools_desc = "\n".join([
            f"- {name}: {tool.description}"
            for name, tool in self.tools.items()
        ])

        history = ""

        for i in range(self.max_iterations):
            # 构建Prompt
            prompt = prompt_template.format(
                tools=tools_desc,
                task=task,
                history=history
            )

            # LLM生成
            response = self.llm.predict(prompt)

            if verbose:
                print(f"\n{'='*60}")
                print(f"Iteration {i+1}")
                print(f"{'='*60}")
                print(response)

            # 解析响应
            if "Final Answer:" in response:
                # 找到最终答案
                final_answer = response.split("Final Answer:")[-1].strip()
                return final_answer

            # 提取Action和Action Input
            action_match = re.search(r"Action:\s*(.+)", response)
            action_input_match = re.search(r"Action Input:\s*(.+)", response)

            if action_match and action_input_match:
                action = action_match.group(1).strip()
                action_input = action_input_match.group(1).strip()

                # 执行工具
                if action in self.tools:
                    observation = self.tools[action].func(action_input)

                    if verbose:
                        print(f"\nObservation: {observation}")

                    # 更新历史
                    history += f"\n{response}\nObservation: {observation}\n"
                else:
                    observation = f"错误: 工具 '{action}' 不存在"
                    history += f"\n{response}\nObservation: {observation}\n"
            else:
                # 无法解析Action,提示LLM
                history += f"\n{response}\nObservation: 请使用正确的格式指定Action和Action Input\n"

        return "达到最大迭代次数,未能完成任务"

# ============ 定义工具 ============

def search_wikipedia(query: str) -> str:
    """搜索Wikipedia"""
    import wikipedia
    try:
        return wikipedia.summary(query, sentences=3)
    except:
        return "未找到相关信息"

def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        result = eval(expression)
        return str(result)
    except Exception as e:
        return f"计算错误: {str(e)}"

def get_current_time(query: str) -> str:
    """获取当前时间"""
    from datetime import datetime
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# 创建工具列表
tools = [
    Tool(
        name="Wikipedia",
        func=search_wikipedia,
        description="搜索Wikipedia获取信息。输入应该是搜索查询。"
    ),
    Tool(
        name="Calculator",
        func=calculate,
        description="计算数学表达式。输入应该是有效的Python表达式,如'2+2'或'3*4'"
    ),
    Tool(
        name="CurrentTime",
        func=get_current_time,
        description="获取当前时间。不需要输入。"
    )
]

# ============ 使用Agent ============

llm = OpenAI(temperature=0)
agent = ReActAgent(llm, tools, max_iterations=5)

# 测试任务
tasks = [
    "埃菲尔铁塔的高度是多少?如果加上50米是多少?",
    "现在几点了?距离2024年1月1日还有多少天?",
]

for task in tasks:
    print(f"\n\n{'#'*60}")
    print(f"Task: {task}")
    print(f"{'#'*60}")

    answer = agent.run(task, verbose=True)

    print(f"\n{'='*60}")
    print(f"Final Answer: {answer}")
    print(f"{'='*60}\n")

3.2 LangChain Agent实现

from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
from langchain.tools import Tool
from langchain.memory import ConversationBufferMemory

# 定义工具(同上)

# 创建Memory(记住对话历史)
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)

# 初始化Agent
agent = initialize_agent(
    tools=tools,
    llm=OpenAI(temperature=0),
    agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
    memory=memory,
    verbose=True,
    max_iterations=5,
    early_stopping_method="generate"
)

# 运行
response = agent.run("埃菲尔铁塔有多高?")
print(response)

# 继续对话(会记住上下文)
response = agent.run("如果再加上50米呢?")
print(response)

3.3 Tool Calling (函数调用)

import openai
import json

# 定义函数schema
functions = [
    {
        "name": "get_weather",
        "description": "获取指定城市的天气信息",
        "parameters": {
            "type": "object",
            "properties": {
                "city": {
                    "type": "string",
                    "description": "城市名称,如'北京'"
                },
                "unit": {
                    "type": "string",
                    "enum": ["celsius", "fahrenheit"],
                    "description": "温度单位"
                }
            },
            "required": ["city"]
        }
    },
    {
        "name": "search_database",
        "description": "在数据库中搜索信息",
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "搜索关键词"
                }
            },
            "required": ["query"]
        }
    }
]

# 实际的函数实现
def get_weather(city: str, unit: str = "celsius") -> str:
    # 实际应该调用天气API
    return json.dumps({"city": city, "temperature": 22, "unit": unit, "condition": "晴"})

def search_database(query: str) -> str:
    # 实际应该查询数据库
    return json.dumps({"results": [f"关于{query}的结果1", f"关于{query}的结果2"]})

# Agent循环
def run_agent(user_message: str):
    messages = [{"role": "user", "content": user_message}]

    while True:
        # 调用LLM
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=messages,
            functions=functions,
            function_call="auto"
        )

        message = response.choices[0].message

        # 检查是否需要调用函数
        if message.get("function_call"):
            function_name = message["function_call"]["name"]
            function_args = json.loads(message["function_call"]["arguments"])

            print(f"\n调用函数: {function_name}")
            print(f"参数: {function_args}")

            # 执行函数
            if function_name == "get_weather":
                function_response = get_weather(**function_args)
            elif function_name == "search_database":
                function_response = search_database(**function_args)

            print(f"函数返回: {function_response}\n")

            # 将函数结果添加到对话
            messages.append(message)
            messages.append({
                "role": "function",
                "name": function_name,
                "content": function_response
            })
        else:
            # 没有函数调用,返回最终答案
            return message["content"]

# 测试
question = "北京今天天气怎么样?温度用摄氏度表示"
answer = run_agent(question)
print(f"最终答案: {answer}")

4. 多模态应用

4.1 图生文(Image Captioning)

from transformers import BlipProcessor, BlipForConditionalGeneration
from PIL import Image
import torch

# 加载模型
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large")

def generate_caption(image_path: str, prompt: str = None) -> str:
    """生成图片描述"""
    image = Image.open(image_path).convert('RGB')

    if prompt:
        # 条件生成
        inputs = processor(image, prompt, return_tensors="pt")
    else:
        # 无条件生成
        inputs = processor(image, return_tensors="pt")

    with torch.no_grad():
        outputs = model.generate(**inputs, max_length=50, num_beams=5)

    caption = processor.decode(outputs[0], skip_special_tokens=True)
    return caption

# 使用
caption = generate_caption("example.jpg")
print(f"图片描述: {caption}")

# 条件生成
caption = generate_caption("example.jpg", prompt="a photography of")
print(f"条件描述: {caption}")

4.2 文生图(Text-to-Image)

from diffusers import StableDiffusionPipeline
import torch

# 加载模型
model_id = "runwayml/stable-diffusion-v1-5"
pipe = StableDiffusionPipeline.from_pretrained(
    model_id,
    torch_dtype=torch.float16
)
pipe = pipe.to("cuda")

def generate_image(
    prompt: str,
    negative_prompt: str = "",
    num_inference_steps: int = 50,
    guidance_scale: float = 7.5,
    num_images: int = 1
):
    """根据文本生成图片"""
    images = pipe(
        prompt=prompt,
        negative_prompt=negative_prompt,
        num_inference_steps=num_inference_steps,
        guidance_scale=guidance_scale,
        num_images_per_prompt=num_images
    ).images

    return images

# 使用
prompt = "a beautiful landscape with mountains and a lake, sunset, photorealistic"
negative_prompt = "blurry, low quality, distorted"

images = generate_image(prompt, negative_prompt, num_images=2)

# 保存图片
for i, image in enumerate(images):
    image.save(f"generated_{i}.png")
    print(f"图片已保存: generated_{i}.png")

4.3 多模态RAG

from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import torch
import numpy as np

class MultimodalRAG:
    """支持图文检索的RAG系统"""

    def __init__(self):
        # CLIP模型(统一图文编码)
        self.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

        self.text_embeddings = []
        self.image_embeddings = []
        self.documents = []

    def add_document(self, text: str = None, image_path: str = None, metadata: dict = None):
        """添加文档(可以是纯文本、纯图片或图文混合)"""
        doc = {"text": text, "image_path": image_path, "metadata": metadata or {}}

        # 编码文本
        if text:
            inputs = self.processor(text=[text], return_tensors="pt", padding=True)
            with torch.no_grad():
                text_emb = self.model.get_text_features(**inputs)
            self.text_embeddings.append(text_emb.numpy())
        else:
            self.text_embeddings.append(None)

        # 编码图片
        if image_path:
            image = Image.open(image_path)
            inputs = self.processor(images=image, return_tensors="pt")
            with torch.no_grad():
                image_emb = self.model.get_image_features(**inputs)
            self.image_embeddings.append(image_emb.numpy())
        else:
            self.image_embeddings.append(None)

        self.documents.append(doc)

    def search(self, query: str = None, query_image_path: str = None, k: int = 5):
        """多模态检索"""
        # 编码查询
        if query:
            inputs = self.processor(text=[query], return_tensors="pt")
            with torch.no_grad():
                query_emb = self.model.get_text_features(**inputs).numpy()
        elif query_image_path:
            image = Image.open(query_image_path)
            inputs = self.processor(images=image, return_tensors="pt")
            with torch.no_grad():
                query_emb = self.model.get_image_features(**inputs).numpy()

        # 计算相似度
        scores = []
        for i in range(len(self.documents)):
            score = 0
            count = 0

            if self.text_embeddings[i] is not None:
                score += cosine_similarity(query_emb, self.text_embeddings[i])[0][0]
                count += 1

            if self.image_embeddings[i] is not None:
                score += cosine_similarity(query_emb, self.image_embeddings[i])[0][0]
                count += 1

            scores.append(score / count if count > 0 else 0)

        # 返回Top-K
        top_indices = np.argsort(scores)[-k:][::-1]
        results = [
            (self.documents[i], scores[i])
            for i in top_indices
        ]

        return results

def cosine_similarity(a, b):
    return np.dot(a, b.T) / (np.linalg.norm(a) * np.linalg.norm(b))

# 使用
mm_rag = MultimodalRAG()

# 添加文档
mm_rag.add_document(
    text="一只可爱的猫咪坐在沙发上",
    image_path="cat.jpg",
    metadata={"source": "doc1"}
)
mm_rag.add_document(
    text="美丽的日落风景",
    image_path="sunset.jpg",
    metadata={"source": "doc2"}
)

# 文本查询
results = mm_rag.search(query="猫", k=2)
for doc, score in results:
    print(f"相似度: {score:.4f}")
    print(f"文本: {doc['text']}")
    print(f"图片: {doc['image_path']}")
    print()

# 图片查询
results = mm_rag.search(query_image_path="query.jpg", k=2)

5. 项目部署

5.1 FastAPI服务

from fastapi import FastAPI, HTTPException, File, UploadFile
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import uvicorn

app = FastAPI(title="AI服务API")

# ============ 问答接口 ============

class QARequest(BaseModel):
    question: str
    stream: bool = False

@app.post("/qa")
async def qa_endpoint(request: QARequest):
    """问答接口"""
    try:
        if request.stream:
            # 流式返回
            def generate():
                for token in qa_system.answer_stream(request.question):
                    yield token

            return StreamingResponse(generate(), media_type="text/plain")
        else:
            # 一次性返回
            result = qa_system.answer(request.question)
            return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# ============ 分类接口 ============

class ClassificationRequest(BaseModel):
    texts: list[str]

@app.post("/classify")
async def classify_endpoint(request: ClassificationRequest):
    """文本分类接口"""
    try:
        predictions = classifier.predict(request.texts)
        return {"predictions": predictions}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# ============ 图片描述接口 ============

@app.post("/caption")
async def caption_endpoint(file: UploadFile = File(...)):
    """图片描述生成"""
    try:
        # 保存上传的图片
        with open(f"temp_{file.filename}", "wb") as f:
            f.write(await file.read())

        # 生成描述
        caption = generate_caption(f"temp_{file.filename}")

        return {"caption": caption}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# ============ 健康检查 ============

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

5.2 Docker部署

# Dockerfile
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04

# 安装Python
RUN apt-get update && apt-get install -y python3 python3-pip

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python3", "main.py"]
# docker-compose.yml
version: '3.8'

services:
  ai-service:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./models:/app/models
      - ./data:/app/data
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
# 构建和运行
docker-compose up --build

# 测试
curl -X POST "http://localhost:8000/qa" \
     -H "Content-Type: application/json" \
     -d '{"question": "什么是AI?"}'

5.3 监控和日志

import logging
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Request
import time

# ============ 配置日志 ============

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('app.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# ============ Prometheus指标 ============

REQUEST_COUNT = Counter('requests_total', 'Total requests', ['endpoint', 'status'])
REQUEST_LATENCY = Histogram('request_latency_seconds', 'Request latency', ['endpoint'])

# ============ 中间件 ============

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.time()

    # 记录请求
    logger.info(f"Request: {request.method} {request.url.path}")

    try:
        response = await call_next(request)
        status = response.status_code

        # 记录响应
        duration = time.time() - start_time
        logger.info(f"Response: {status} ({duration:.2f}s)")

        # 更新指标
        REQUEST_COUNT.labels(endpoint=request.url.path, status=status).inc()
        REQUEST_LATENCY.labels(endpoint=request.url.path).observe(duration)

        return response
    except Exception as e:
        logger.error(f"Error: {str(e)}", exc_info=True)
        REQUEST_COUNT.labels(endpoint=request.url.path, status=500).inc()
        raise

# ============ 指标端点 ============

@app.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type="text/plain")