HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 技术面试完全指南

    • 技术面试完全指南
    • 8年面试官告诉你:90%的简历在第一轮就被刷掉了
    • 刷了500道LeetCode,终于明白大厂算法面试到底考什么
    • 高频算法题精讲-双指针与滑动窗口
    • 03-高频算法题精讲-二分查找与排序
    • 04-高频算法题精讲-树与递归
    • 05-高频算法题精讲-图与拓扑排序
    • 06-高频算法题精讲-动态规划
    • Go面试必问:一道GMP问题,干掉90%的候选人
    • 08-数据库面试高频题
    • 09-分布式系统面试题
    • 10-Kubernetes与云原生面试题
    • 11-系统设计面试方法论
    • 前端面试高频题
    • AI 与机器学习面试题
    • 行为面试与软技能

Python 面试宝典

从语言特性到并发编程,从 Web 框架到性能优化,全面掌握 Python 核心技术

一、Python 语言特性(必考基础)

1.1 数据类型与结构

场景:面试官问"Python 的可变类型和不可变类型有哪些?区别是什么?"

不可变类型

# 1. 数字类型(int, float, complex, bool)
a = 10
b = a  # 指向同一对象
a = 20  # a 指向新对象,b 仍是 10

# 2. 字符串(str)
s = "hello"
s[0] = "H"  # TypeError: 'str' object does not support item assignment

# 修改字符串会创建新对象
s = "hello"
s = s.upper()  # 创建新字符串 "HELLO"

# 3. 元组(tuple)
t = (1, 2, 3)
t[0] = 10  # TypeError: 'tuple' object does not support item assignment元组内的可变对象可以修改
t = ([1, 2], 3)
t[0].append(3)  # OK,t = ([1, 2, 3], 3)
t[0] = [4, 5]   # Error

可变类型

# 1. 列表(list)
lst = [1, 2, 3]
lst.append(4)  # 原地修改
lst[0] = 10    # 原地修改

# 2. 字典(dict)
d = {"a": 1}
d["b"] = 2  # 原地修改

# 3. 集合(set)
s = {1, 2, 3}
s.add(4)  # 原地修改

实际影响:

# 函数参数传递
def modify_immutable(x):
    x = x + 10  # 创建新对象,不影响外部
    return x

def modify_mutable(lst):
    lst.append(4)  # 修改原对象

a = 10
modify_immutable(a)
print(a)  # 10

lst = [1, 2, 3]
modify_mutable(lst)
print(lst)  # [1, 2, 3, 4]

# 字典 key 必须是不可变类型
d = {[1, 2]: "value"}  # TypeError: unhashable type: 'list'
d = {(1, 2): "value"}  # OK

1.2 深拷贝与浅拷贝

场景:面试官问"深拷贝和浅拷贝的区别?何时使用?"

import copy

# 浅拷贝:拷贝第一层,嵌套对象仍共享
original = [[1, 2], [3, 4]]
shallow = copy.copy(original)

shallow[0].append(3)  # 修改嵌套列表
print(original)  # [[1, 2, 3], [3, 4]]  # 原始对象也被修改
print(shallow)   # [[1, 2, 3], [3, 4]]

shallow.append([5, 6])  # 修改第一层
print(original)  # [[1, 2, 3], [3, 4]]  # 原始对象不变
print(shallow)   # [[1, 2, 3], [3, 4], [5, 6]]

# 深拷贝:递归拷贝所有层级
original = [[1, 2], [3, 4]]
deep = copy.deepcopy(original)

deep[0].append(3)
print(original)  # [[1, 2], [3, 4]]  # 原始对象不变
print(deep)      # [[1, 2, 3], [3, 4]]

实际应用:

# 场景:缓存数据,避免被修改
class DataCache:
    def __init__(self):
        self._cache = {
            "users": [{"id": 1, "name": "张三"}, {"id": 2, "name": "李四"}]
        }

    def get_users(self):
        # 错误:返回引用,外部可以修改缓存
        return self._cache["users"]

    def get_users_safe(self):
        # 正确:返回深拷贝,外部修改不影响缓存
        return copy.deepcopy(self._cache["users"])

# 使用
cache = DataCache()
users = cache.get_users()
users[0]["name"] = "王五"  # 缓存被污染

users = cache.get_users_safe()
users[0]["name"] = "赵六"  # 缓存不受影响

1.3 装饰器原理

场景:面试官问"什么是装饰器?如何实现一个带参数的装饰器?"

基础装饰器

# 场景:记录函数执行时间
import time
from functools import wraps

def timer(func):
    @wraps(func)  # 保留原函数的元信息(__name__, __doc__)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} 耗时: {end - start:.2f}s")
        return result
    return wrapper

@timer
def slow_function():
    time.sleep(2)
    return "done"

# 等价于:slow_function = timer(slow_function)

slow_function()  # slow_function 耗时: 2.00s

带参数的装饰器

# 场景:重试装饰器
def retry(max_attempts=3, delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt < max_attempts - 1:
                        print(f"第 {attempt + 1} 次失败,{delay}s 后重试")
                        time.sleep(delay)
                    else:
                        raise e
        return wrapper
    return decorator

@retry(max_attempts=5, delay=2)
def fetch_data(url):
    # 模拟网络请求
    import random
    if random.random() < 0.7:
        raise Exception("网络错误")
    return "数据"

# 等价于:fetch_data = retry(max_attempts=5, delay=2)(fetch_data)

类装饰器

# 场景:缓存函数结果
class Memoize:
    def __init__(self, func):
        self.func = func
        self.cache = {}

    def __call__(self, *args):
        if args not in self.cache:
            self.cache[args] = self.func(*args)
        return self.cache[args]

@Memoize
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

# 使用
print(fibonacci(100))  # 瞬间返回,不会重复计算

实战:权限验证装饰器

from functools import wraps

def require_auth(roles):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 假设从上下文获取当前用户
            current_user = get_current_user()
            if current_user.role not in roles:
                raise PermissionError(f"需要权限: {roles}")
            return func(*args, **kwargs)
        return wrapper
    return decorator

@require_auth(roles=["admin", "superuser"])
def delete_user(user_id):
    # 只有 admin 和 superuser 可以删除用户
    pass

# Flask 应用
from flask import Flask, request, jsonify

def login_required(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        token = request.headers.get("Authorization")
        if not token or not validate_token(token):
            return jsonify({"error": "未授权"}), 401
        return func(*args, **kwargs)
    return wrapper

@app.route("/api/users")
@login_required
def get_users():
    return jsonify({"users": []})

1.4 生成器与迭代器

场景:面试官问"生成器和普通函数的区别?yield 的作用?"

迭代器

# 可迭代对象(Iterable):实现 __iter__()
# 迭代器(Iterator):实现 __iter__() 和 __next__()

class Fibonacci:
    def __init__(self, n):
        self.n = n
        self.a, self.b = 0, 1
        self.count = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.count < self.n:
            result = self.a
            self.a, self.b = self.b, self.a + self.b
            self.count += 1
            return result
        else:
            raise StopIteration

# 使用
for num in Fibonacci(10):
    print(num, end=" ")  # 0 1 1 2 3 5 8 13 21 34

生成器

# 使用 yield 的函数是生成器函数
def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        yield a  # 暂停执行,返回 a,下次从这里继续
        a, b = b, a + b

# 使用
for num in fibonacci(10):
    print(num, end=" ")  # 0 1 1 2 3 5 8 13 21 34

# 生成器表达式
gen = (x * x for x in range(10))  # 不会立即计算
print(next(gen))  # 0
print(next(gen))  # 1

优势:惰性计算,节省内存

# 读取大文件
def read_large_file(file_path):
    with open(file_path) as f:
        for line in f:  # 逐行读取,不会一次性加载到内存
            yield line.strip()

# 无限序列
def infinite_sequence():
    num = 0
    while True:
        yield num
        num += 1

# 管道处理
def read_logs(file_path):
    with open(file_path) as f:
        for line in f:
            yield line

def filter_errors(lines):
    for line in lines:
        if "ERROR" in line:
            yield line

def extract_timestamp(lines):
    for line in lines:
        yield line.split()[0]

# 使用
timestamps = extract_timestamp(filter_errors(read_logs("app.log")))
for ts in timestamps:
    print(ts)

二、并发编程

2.1 GIL(全局解释器锁)

场景:面试官问"什么是 GIL?对多线程有什么影响?如何绕过 GIL?"

GIL 原理

"""
GIL(Global Interpreter Lock):CPython 的全局锁
作用:保证同一时刻只有一个线程执行 Python 字节码

为什么需要 GIL?
1. CPython 的内存管理不是线程安全的(引用计数)
2. 简化实现,避免复杂的锁机制

影响:
1. CPU 密集型任务:多线程性能反而下降(线程切换开销)
2. IO 密集型任务:影响较小(IO 等待时释放 GIL)
"""

import threading
import time

# CPU 密集型任务
def cpu_bound(n):
    count = 0
    for i in range(n):
        count += i * i
    return count

# 单线程
start = time.time()
cpu_bound(10000000)
cpu_bound(10000000)
print(f"单线程耗时: {time.time() - start:.2f}s")  # 1.5s

# 多线程
start = time.time()
t1 = threading.Thread(target=cpu_bound, args=(10000000,))
t2 = threading.Thread(target=cpu_bound, args=(10000000,))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"多线程耗时: {time.time() - start:.2f}s")  # 1.8s(更慢!)

绕过 GIL 的方法

(1) 多进程

from multiprocessing import Pool

def cpu_bound(n):
    count = 0
    for i in range(n):
        count += i * i
    return count

if __name__ == "__main__":
    start = time.time()
    with Pool(processes=2) as pool:
        results = pool.map(cpu_bound, [10000000, 10000000])
    print(f"多进程耗时: {time.time() - start:.2f}s")  # 0.8s(快!)

(2) C 扩展

# NumPy 等库使用 C 实现,释放 GIL
import numpy as np

start = time.time()
a = np.random.rand(10000000)
b = np.random.rand(10000000)
c = a + b  # C 代码执行,释放 GIL
print(f"NumPy 耗时: {time.time() - start:.2f}s")

(3) 异步 IO

import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [f"http://example.com/api/{i}" for i in range(100)]
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)  # 并发执行
    return results

# 单线程执行 100 个请求,耗时取决于网络延迟
asyncio.run(main())

2.2 多线程与多进程对比

场景:面试官问"什么时候用多线程?什么时候用多进程?"

"""
多线程(threading):
- 适合:IO 密集型(网络请求、文件读写、数据库查询)
- 优点:共享内存,通信简单,开销小
- 缺点:受 GIL 限制,无法利用多核

多进程(multiprocessing):
- 适合:CPU 密集型(数据处理、图像处理、科学计算)
- 优点:绕过 GIL,充分利用多核
- 缺点:进程间通信复杂,内存开销大

协程(asyncio):
- 适合:高并发 IO 密集型(Web 服务、爬虫)
- 优点:单线程,无锁,性能高
- 缺点:生态不完善,需要异步库支持
"""

# 示例 1:IO 密集型 - 多线程
import requests
from concurrent.futures import ThreadPoolExecutor

def download(url):
    response = requests.get(url)
    return len(response.content)

urls = [f"http://example.com/{i}" for i in range(100)]

# 多线程
with ThreadPoolExecutor(max_workers=10) as executor:
    results = executor.map(download, urls)

# 示例 2:CPU 密集型 - 多进程
from multiprocessing import Pool

def process_image(image_path):
    # 图像处理
    from PIL import Image
    img = Image.open(image_path)
    img = img.resize((800, 600))
    img.save(f"processed_{image_path}")

images = [f"img_{i}.jpg" for i in range(100)]

with Pool(processes=4) as pool:
    pool.map(process_image, images)

# 示例 3:高并发 IO - 协程
import asyncio

async def handle_request(request):
    # 处理请求
    await asyncio.sleep(0.1)  # 模拟 IO 操作
    return "response"

async def main():
    tasks = [handle_request(i) for i in range(10000)]
    results = await asyncio.gather(*tasks)

asyncio.run(main())

2.3 线程安全

场景:面试官问"如何保证 Python 多线程安全?"

问题场景

import threading

counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1  # 不是原子操作

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # 结果不是 1000000!

解决方案

(1) Lock(互斥锁)

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:  # 或 lock.acquire() ... lock.release()
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # 1000000

(2) RLock(可重入锁)

import threading

lock = threading.RLock()

def outer():
    with lock:
        print("outer acquired")
        inner()  # 同一线程可以多次获取

def inner():
    with lock:
        print("inner acquired")

outer()  # 正常执行

(3) Semaphore(信号量)

import threading
import time

# 限制同时访问的线程数
semaphore = threading.Semaphore(3)

def access_resource(n):
    with semaphore:
        print(f"线程 {n} 访问资源")
        time.sleep(2)

threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

(4) Queue(线程安全队列)

from queue import Queue
import threading

def producer(queue):
    for i in range(10):
        queue.put(i)
        print(f"生产: {i}")

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消费: {item}")
        queue.task_done()

q = Queue()
t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))

t1.start()
t2.start()

t1.join()
q.put(None)  # 停止信号
t2.join()

(5) ThreadLocal(线程本地存储)

import threading

# 每个线程都有自己的副本,不共享
local_data = threading.local()

def process():
    local_data.value = threading.current_thread().name
    print(f"{threading.current_thread().name}: {local_data.value}")

threads = [threading.Thread(target=process, name=f"Thread-{i}") for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

三、Web 框架(Django/Flask)

3.1 Django 核心机制

场景:面试官问"Django 的请求处理流程?"

请求-响应流程

1. WSGI Server(gunicorn/uwsgi)接收请求
   ↓
2. Middleware 前置处理(process_request)
   ↓
3. URL 路由匹配(urls.py)
   ↓
4. View 视图函数执行
   - 调用 Model(ORM 查询)
   - 渲染 Template
   ↓
5. Middleware 后置处理(process_response)
   ↓
6. 返回 HTTP Response

ORM 查询优化

from django.db import models

class Author(models.Model):
    name = models.CharField(max_length=100)

class Book(models.Model):
    title = models.CharField(max_length=200)
    author = models.ForeignKey(Author, on_delete=models.CASCADE)

# 问题 1:N+1 查询
books = Book.objects.all()
for book in books:
    print(book.author.name)  # 每次都查询数据库

# SQL:
# SELECT * FROM book;  -- 1 次
# SELECT * FROM author WHERE id = 1;  -- N 次
# SELECT * FROM author WHERE id = 2;
# ...select_related(正向外键,一对一)
books = Book.objects.select_related("author").all()
for book in books:
    print(book.author.name)  # 不会再查询

# SQL:
# SELECT * FROM book LEFT JOIN author ON book.author_id = author.id;

# 问题 2:反向查询
authors = Author.objects.all()
for author in authors:
    print(author.book_set.count())  # N+1 查询prefetch_related(反向外键,多对多)
authors = Author.objects.prefetch_related("book_set").all()
for author in authors:
    print(author.book_set.count())

# SQL:
# SELECT * FROM author;  -- 1 次
# SELECT * FROM book WHERE author_id IN (1, 2, 3, ...);  -- 1 次

# 问题 3:只查询需要的字段
books = Book.objects.all()  # 查询所有字段

# 优化:only() / values()
books = Book.objects.only("title", "author__name")
books = Book.objects.values("title", "author__name")  # 返回字典

# 问题 4:批量操作
# 慢:
for book in books:
    book.price *= 1.1
    book.save()  # N 次 SQL

# 快:
Book.objects.filter(id__in=book_ids).update(price=F("price") * 1.1)  # 1 次 SQL

# 批量创建
Book.objects.bulk_create([
    Book(title="书1", author_id=1),
    Book(title="书2", author_id=2),
])  # 1 次 SQL

缓存策略

from django.core.cache import cache

# 1. 全站缓存(settings.py)
MIDDLEWARE = [
    'django.middleware.cache.UpdateCacheMiddleware',  # 最前
    # ...
    'django.middleware.cache.FetchFromCacheMiddleware',  # 最后
]

# 2. 视图缓存
from django.views.decorators.cache import cache_page

@cache_page(60 * 15)  # 缓存 15 分钟
def my_view(request):
    return HttpResponse("Cached response")

# 3. 模板片段缓存
{% load cache %}
{% cache 500 sidebar request.user.username %}
    ... 侧边栏内容 ...
{% endcache %}

# 4. 低层级缓存
def get_user(user_id):
    cache_key = f"user:{user_id}"
    user = cache.get(cache_key)

    if user is None:
        user = User.objects.get(id=user_id)
        cache.set(cache_key, user, timeout=3600)

    return user

# 5. 缓存失效
cache.delete(f"user:{user_id}")
cache.delete_pattern("user:*")  # Redis backend

3.2 Flask 核心机制

场景:面试官问"Flask 的上下文管理机制?"

应用上下文与请求上下文

from flask import Flask, request, g, current_app

app = Flask(__name__)

"""
两种上下文:
1. 应用上下文(Application Context):
   - current_app:当前应用实例
   - g:全局临时存储(请求级别)

2. 请求上下文(Request Context):
   - request:当前请求对象
   - session:当前会话对象

生命周期:
- 请求开始:压入请求上下文和应用上下文
- 请求结束:弹出上下文,清理 g 对象
"""

@app.before_request
def before_request():
    # 每次请求前执行
    g.user = get_current_user()  # 存储用户信息到 g

@app.route("/api/data")
def get_data():
    # 访问 g 对象
    user = g.user
    return {"user": user.name}

@app.teardown_request
def teardown_request(exception):
    # 请求结束时执行(无论成功或失败)
    db_session = g.pop("db_session", None)
    if db_session is not None:
        db_session.close()

蓝图(Blueprint)

# auth.py
from flask import Blueprint

auth_bp = Blueprint("auth", __name__, url_prefix="/auth")

@auth_bp.route("/login", methods=["POST"])
def login():
    return {"message": "登录成功"}

@auth_bp.route("/logout")
def logout():
    return {"message": "退出成功"}

# app.py
from flask import Flask
from auth import auth_bp

app = Flask(__name__)
app.register_blueprint(auth_bp)

# 访问:
# POST /auth/login
# GET /auth/logout

扩展集成

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager

app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://user:pass@localhost/db"

# 数据库
db = SQLAlchemy(app)

# 数据库迁移
migrate = Migrate(app, db)

# 用户认证
login_manager = LoginManager(app)
login_manager.login_view = "auth.login"

@login_manager.user_loader
def load_user(user_id):
    return User.query.get(int(user_id))

四、性能优化

4.1 代码优化技巧

场景:面试官问"如何优化 Python 代码性能?"

(1) 使用内置函数和库

import time

# 慢:Python 循环
start = time.time()
result = []
for i in range(1000000):
    result.append(i * 2)
print(f"循环耗时: {time.time() - start:.3f}s")  # 0.15s

# 快:列表推导式
start = time.time()
result = [i * 2 for i in range(1000000)]
print(f"推导式耗时: {time.time() - start:.3f}s")  # 0.08s

# 更快:map + lambda
start = time.time()
result = list(map(lambda x: x * 2, range(1000000)))
print(f"map 耗时: {time.time() - start:.3f}s")  # 0.06s

# 最快:NumPy
import numpy as np
start = time.time()
result = np.arange(1000000) * 2
print(f"NumPy 耗时: {time.time() - start:.3f}s")  # 0.002s

(2) 避免全局变量查找

import math

# 慢:每次查找 math.sqrt
def slow():
    result = []
    for i in range(100000):
        result.append(math.sqrt(i))

# 快:局部变量
def fast():
    sqrt = math.sqrt  # 局部变量
    result = []
    for i in range(100000):
        result.append(sqrt(i))

(3) 使用 slots

# 默认:每个实例有 __dict__,占用内存
class User:
    def __init__(self, name, age):
        self.name = name
        self.age = age

# 优化:使用 __slots__,节省内存
class User:
    __slots__ = ["name", "age"]

    def __init__(self, name, age):
        self.name = name
        self.age = age

# 内存对比(100 万个实例)
# 默认:~200MB
# __slots__:~80MB

(4) 使用生成器

# 内存占用大
def get_all_data():
    return [process(i) for i in range(1000000)]

data = get_all_data()  # 加载所有数据到内存

# 内存占用小
def get_data():
    for i in range(1000000):
        yield process(i)

data = get_data()  # 惰性生成

4.2 数据库查询优化

场景:Flask + SQLAlchemy 慢查询优化

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func

db = SQLAlchemy()

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100))
    email = db.Column(db.String(100))
    orders = db.relationship("Order", backref="user")

class Order(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
    total = db.Column(db.Float)

# 问题 1:N+1 查询
users = User.query.all()
for user in users:
    print(len(user.orders))  # N+1 查询joinedload
from sqlalchemy.orm import joinedload

users = User.query.options(joinedload(User.orders)).all()
for user in users:
    print(len(user.orders))  # 1 次查询

# 问题 2:查询所有字段
users = User.query.all()  # SELECT *只查询需要的字段
users = db.session.query(User.name, User.email).all()

# 问题 3:子查询优化
# 慢:查询每个用户的订单总额
users = User.query.all()
for user in users:
    total = db.session.query(func.sum(Order.total)).filter_by(user_id=user.id).scalar()

# 快:使用子查询
subquery = db.session.query(
    Order.user_id,
    func.sum(Order.total).label("total")
).group_by(Order.user_id).subquery()

users = db.session.query(User, subquery.c.total).join(
    subquery, User.id == subquery.c.user_id
).all()

# 问题 4:分页查询
# 慢:OFFSET 很大时性能差
users = User.query.offset(100000).limit(10).all()

# 快:游标分页
last_id = request.args.get("last_id", 0)
users = User.query.filter(User.id > last_id).limit(10).all()

4.3 并发性能优化

场景:Web 爬虫,抓取 1000 个网页

import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
import aiohttp

urls = [f"http://example.com/{i}" for i in range(1000)]

# 方案 1:串行(慢)
start = time.time()
for url in urls:
    requests.get(url)
print(f"串行耗时: {time.time() - start:.2f}s")  # 1000s

# 方案 2:多线程
start = time.time()
with ThreadPoolExecutor(max_workers=50) as executor:
    futures = [executor.submit(requests.get, url) for url in urls]
    for future in as_completed(futures):
        result = future.result()
print(f"多线程耗时: {time.time() - start:.2f}s")  # 20s

# 方案 3:协程(最快)
async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

start = time.time()
asyncio.run(main())
print(f"协程耗时: {time.time() - start:.2f}s")  # 5s

五、常见面试题速答

5.1 基础篇

Q1:Python2 和 Python3 的区别?

# 1. print 语句 vs 函数
Python2: print "hello"
Python3: print("hello")

# 2. 整数除法
Python2: 3 / 2 = 1  # 整数除法
Python3: 3 / 2 = 1.5  # 真除法,3 // 2 = 1

# 3. Unicode
Python2: str 是字节串,unicode 是 Unicode
Python3: str 是 Unicode,bytes 是字节串

# 4. range
Python2: range() 返回列表,xrange() 返回迭代器
Python3: range() 返回迭代器

# 5. 异常语法
Python2: except Exception, e:
Python3: except Exception as e:

# 6. input
Python2: raw_input() 返回字符串,input() 执行表达式
Python3: input() 返回字符串

**Q2:*args 和 kwargs 的作用?

def func(*args, **kwargs):
    print(args)    # 元组:(1, 2, 3)
    print(kwargs)  # 字典:{'a': 4, 'b': 5}

func(1, 2, 3, a=4, b=5)

# 应用:装饰器
def decorator(func):
    def wrapper(*args, **kwargs):
        print("before")
        result = func(*args, **kwargs)  # 转发所有参数
        print("after")
        return result
    return wrapper

# 应用:合并字典
d1 = {"a": 1, "b": 2}
d2 = {"c": 3, "d": 4}
merged = {**d1, **d2}  # {'a': 1, 'b': 2, 'c': 3, 'd': 4}

Q3:is 和 == 的区别?

a = [1, 2, 3]
b = [1, 2, 3]
c = a

a == b  # True(值相等)
a is b  # False(不是同一对象)
a is c  # True(同一对象)

# 小整数池(-5 到 256)
x = 10
y = 10
x is y  # True(指向同一对象)

x = 300
y = 300
x is y  # False(CPython 实现,不是语言规范)

# 字符串驻留
s1 = "hello"
s2 = "hello"
s1 is s2  # True(字符串驻留)

5.2 进阶篇

Q4:什么是鸭子类型?

"""
"当看到一只鸟走起来像鸭子、游泳起来像鸭子、叫起来也像鸭子,
那么这只鸟就可以被称为鸭子"

Python 不关心对象的类型,只关心对象的行为(方法)
"""

class Duck:
    def quack(self):
        print("嘎嘎嘎")

class Person:
    def quack(self):
        print("我在学鸭子叫")

def make_it_quack(duck):
    duck.quack()  # 不检查类型,只要有 quack 方法就行

make_it_quack(Duck())    # 嘎嘎嘎
make_it_quack(Person())  # 我在学鸭子叫

# 应用:文件对象
def save_data(file_obj):
    file_obj.write("data")  # 不关心是文件、Socket 还是 StringIO

import io
save_data(open("file.txt", "w"))
save_data(io.StringIO())

Q5:什么是元类(Metaclass)?

"""
元类是类的类,用于控制类的创建

type 是 Python 中所有类的元类
"""

# 动态创建类
MyClass = type("MyClass", (object,), {"x": 10, "say": lambda self: print("hello")})

obj = MyClass()
obj.say()  # hello

# 自定义元类
class SingletonMeta(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

class Database(metaclass=SingletonMeta):
    def __init__(self):
        print("初始化数据库连接")

db1 = Database()  # 初始化数据库连接
db2 = Database()  # 不会再初始化
print(db1 is db2)  # True

Q6:如何实现单例模式?

# 方法 1:模块级变量(推荐)
# singleton.py
class Database:
    def __init__(self):
        self.connection = None

database = Database()  # 模块导入时创建

# 使用
from singleton import database

# 方法 2:装饰器
def singleton(cls):
    instances = {}
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return get_instance

@singleton
class Database:
    def __init__(self):
        print("初始化")

# 方法 3:元类(见上面)

# 方法 4:__new__
class Database:
    _instance = None

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

六、实战案例

6.1 性能分析工具

场景:定位代码性能瓶颈

# 1. cProfile:性能分析
import cProfile
import pstats

def slow_function():
    total = 0
    for i in range(1000000):
        total += i
    return total

# 分析
cProfile.run("slow_function()", "profile_stats")

# 查看结果
p = pstats.Stats("profile_stats")
p.sort_stats("cumulative").print_stats(10)  # 显示耗时最多的 10 个函数

# 2. line_profiler:逐行分析
# 安装:pip install line_profiler
# 使用:
from line_profiler import LineProfiler

def analyze_me():
    a = [i for i in range(100000)]
    b = [i * 2 for i in a]
    return sum(b)

lp = LineProfiler()
lp.add_function(analyze_me)
lp.run("analyze_me()")
lp.print_stats()

# 3. memory_profiler:内存分析
# 安装:pip install memory_profiler
from memory_profiler import profile

@profile
def memory_hog():
    a = [i for i in range(1000000)]
    b = [i * 2 for i in a]
    return sum(b)

memory_hog()

6.2 异步任务队列

场景:使用 Celery 处理异步任务

# celery_app.py
from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379/0")

@app.task
def send_email(to, subject, body):
    # 模拟发送邮件
    import time
    time.sleep(5)
    print(f"邮件已发送至 {to}")
    return "success"

@app.task
def process_image(image_path):
    from PIL import Image
    img = Image.open(image_path)
    img.thumbnail((200, 200))
    img.save(f"thumb_{image_path}")
    return f"thumb_{image_path}"

# Flask 应用
from flask import Flask
from celery_app import send_email, process_image

app = Flask(__name__)

@app.route("/register", methods=["POST"])
def register():
    # 注册用户
    user = create_user(request.json)

    # 异步发送欢迎邮件
    send_email.delay(user.email, "欢迎注册", "感谢您的注册!")

    return {"message": "注册成功"}

# 启动 worker
# celery -A celery_app worker --loglevel=info

七、面试宝典总结

核心考点分布

语言特性:30%
并发编程:25%
Web 框架:20%
性能优化:15%
数据处理:10%

学习路径

Python 基础(数据类型、函数、装饰器、生成器)
并发编程(GIL、多线程、多进程、协程)
Django/Flask 框架(ORM、中间件、蓝图)
性能优化(代码优化、数据库优化、缓存)
实战项目(爬虫、API 服务、异步任务)

高频场景题

  1. 如何优化 Python 代码性能?
  2. 如何处理大规模数据(百万级/千万级)?
  3. 如何设计高并发 Web 服务?
  4. 如何排查内存泄漏?
  5. 如何实现分布式爬虫?

每个场景都要准备:技术选型 → 架构设计 → 性能优化