HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • Python 新手入门教程

    • Python新手入门教程 - 零基础学Python,8章+HTTP服务实战 | HiHuo
    • Python简介与环境搭建 - 安装配置Python开发环境 | HiHuo
    • Python基础语法 - 缩进、变量、注释、输入输出 | HiHuo
    • Python数据类型详解 - 字符串、列表、字典、元组 | HiHuo
    • Python流程控制 - if条件判断、for/while循环 | HiHuo
    • Python函数与模块 - 定义函数、参数传递、模块导入 | HiHuo
    • Python面向对象编程 - 类、对象、继承、多态 | HiHuo
    • Python文件与异常处理 - 读写文件、try/except异常捕获 | HiHuo
    • Python HTTP服务项目实战 - 构建待办事项API服务 | HiHuo

文件与异常处理

文件操作

打开文件

# 基本语法
file = open("filename.txt", "mode", encoding="utf-8")
# ... 操作文件
file.close()

# 推荐:使用 with 语句(自动关闭文件)
with open("filename.txt", "r", encoding="utf-8") as file:
    content = file.read()
# 文件在 with 块结束后自动关闭

文件模式

模式说明
r只读(默认),文件不存在则报错
w写入,文件存在则清空,不存在则创建
a追加,在文件末尾添加内容
x创建,文件已存在则报错
r+读写
w+读写(清空原内容)
a+读写(追加模式)
b二进制模式(如 rb, wb)

读取文件

# 读取全部内容
with open("example.txt", "r", encoding="utf-8") as f:
    content = f.read()
    print(content)

# 读取指定字符数
with open("example.txt", "r", encoding="utf-8") as f:
    chunk = f.read(100)  # 读取前100个字符

# 读取一行
with open("example.txt", "r", encoding="utf-8") as f:
    line = f.readline()
    print(line)

# 读取所有行(返回列表)
with open("example.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
    for line in lines:
        print(line.strip())  # strip() 去除换行符

# 逐行读取(推荐:内存友好)
with open("example.txt", "r", encoding="utf-8") as f:
    for line in f:
        print(line.strip())

写入文件

# 写入字符串
with open("output.txt", "w", encoding="utf-8") as f:
    f.write("Hello, World!\n")
    f.write("Python 文件操作\n")

# 写入多行
lines = ["第一行\n", "第二行\n", "第三行\n"]
with open("output.txt", "w", encoding="utf-8") as f:
    f.writelines(lines)

# 追加内容
with open("output.txt", "a", encoding="utf-8") as f:
    f.write("追加的内容\n")

# 使用 print 写入文件
with open("output.txt", "w", encoding="utf-8") as f:
    print("使用 print 写入", file=f)
    print("第二行内容", file=f)

典型案例:日志记录器

import datetime

def log_message(message, filename="app.log"):
    """记录日志到文件"""
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] {message}\n"

    with open(filename, "a", encoding="utf-8") as f:
        f.write(log_entry)

# 使用
log_message("程序启动")
log_message("用户登录: 张三")
log_message("操作完成")

文件位置

with open("example.txt", "r", encoding="utf-8") as f:
    print(f.tell())      # 当前位置: 0

    f.read(10)           # 读取10个字符
    print(f.tell())      # 当前位置: 10

    f.seek(0)            # 移动到开头
    print(f.tell())      # 0

    f.seek(0, 2)         # 移动到末尾
    # seek(offset, whence)
    # whence: 0=开头, 1=当前位置, 2=末尾

二进制文件

# 读取二进制文件
with open("image.png", "rb") as f:
    data = f.read()
    print(len(data), "字节")

# 写入二进制文件
with open("copy.png", "wb") as f:
    f.write(data)

# 复制文件
def copy_file(src, dst, chunk_size=4096):
    with open(src, "rb") as fsrc:
        with open(dst, "wb") as fdst:
            while True:
                chunk = fsrc.read(chunk_size)
                if not chunk:
                    break
                fdst.write(chunk)

JSON 文件

import json

# 写入 JSON
data = {
    "name": "张三",
    "age": 25,
    "skills": ["Python", "JavaScript"]
}

with open("data.json", "w", encoding="utf-8") as f:
    json.dump(data, f, ensure_ascii=False, indent=2)

# 读取 JSON
with open("data.json", "r", encoding="utf-8") as f:
    loaded_data = json.load(f)
    print(loaded_data["name"])  # 张三

# JSON 字符串操作
json_str = json.dumps(data, ensure_ascii=False)
parsed = json.loads(json_str)

CSV 文件

import csv

# 写入 CSV
data = [
    ["姓名", "年龄", "城市"],
    ["张三", 25, "北京"],
    ["李四", 30, "上海"],
    ["王五", 28, "广州"]
]

with open("data.csv", "w", encoding="utf-8", newline="") as f:
    writer = csv.writer(f)
    writer.writerows(data)

# 读取 CSV
with open("data.csv", "r", encoding="utf-8") as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)

# 使用 DictReader/DictWriter
data = [
    {"name": "张三", "age": 25},
    {"name": "李四", "age": 30}
]

with open("data.csv", "w", encoding="utf-8", newline="") as f:
    fieldnames = ["name", "age"]
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(data)

with open("data.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(f"{row['name']}: {row['age']}岁")

路径操作

import os
from pathlib import Path

# os 模块
print(os.getcwd())              # 当前目录
print(os.listdir("."))          # 列出目录内容
os.makedirs("path/to/dir", exist_ok=True)  # 创建目录
os.path.exists("file.txt")      # 检查是否存在
os.path.isfile("file.txt")      # 是否是文件
os.path.isdir("dir")            # 是否是目录
os.path.join("path", "to", "file.txt")  # 拼接路径
os.path.basename("/path/to/file.txt")   # 文件名
os.path.dirname("/path/to/file.txt")    # 目录名
os.path.splitext("file.txt")    # ('file', '.txt')

# pathlib 模块(推荐)
p = Path("path/to/file.txt")
print(p.exists())               # 是否存在
print(p.is_file())              # 是否是文件
print(p.name)                   # 文件名
print(p.stem)                   # 不带扩展名的文件名
print(p.suffix)                 # 扩展名
print(p.parent)                 # 父目录

# 创建目录
Path("new_dir").mkdir(exist_ok=True)

# 遍历目录
for file in Path(".").glob("*.py"):
    print(file)

# 递归遍历
for file in Path(".").rglob("*.txt"):
    print(file)

典型案例:配置文件管理

import json
from pathlib import Path

class Config:
    def __init__(self, config_file="config.json"):
        self.config_file = Path(config_file)
        self.data = self.load()

    def load(self):
        """加载配置"""
        if self.config_file.exists():
            with open(self.config_file, "r", encoding="utf-8") as f:
                return json.load(f)
        return {}

    def save(self):
        """保存配置"""
        with open(self.config_file, "w", encoding="utf-8") as f:
            json.dump(self.data, f, ensure_ascii=False, indent=2)

    def get(self, key, default=None):
        """获取配置项"""
        return self.data.get(key, default)

    def set(self, key, value):
        """设置配置项"""
        self.data[key] = value
        self.save()


# 使用
config = Config()
config.set("username", "admin")
config.set("theme", "dark")
print(config.get("username"))  # admin

异常处理

基本语法

try:
    # 可能出错的代码
    result = 10 / 0
except ZeroDivisionError:
    # 处理特定异常
    print("不能除以零")
except Exception as e:
    # 处理其他异常
    print(f"发生错误: {e}")
else:
    # 没有异常时执行
    print(f"结果: {result}")
finally:
    # 无论如何都会执行
    print("清理操作")

常见异常类型

# 常见内置异常
# ZeroDivisionError    - 除以零
# ValueError           - 值错误
# TypeError            - 类型错误
# IndexError           - 索引越界
# KeyError             - 字典键不存在
# FileNotFoundError    - 文件不存在
# AttributeError       - 属性错误
# ImportError          - 导入错误
# NameError            - 名称未定义

# 捕获多种异常
try:
    x = int(input("输入数字: "))
    result = 10 / x
except (ValueError, ZeroDivisionError) as e:
    print(f"输入错误: {e}")

# 异常层次结构
# BaseException
#  ├── SystemExit
#  ├── KeyboardInterrupt
#  └── Exception
#       ├── ValueError
#       ├── TypeError
#       ├── KeyError
#       └── ...

抛出异常

def divide(a, b):
    if b == 0:
        raise ValueError("除数不能为零")
    return a / b

try:
    divide(10, 0)
except ValueError as e:
    print(e)  # 除数不能为零

# 重新抛出异常
try:
    divide(10, 0)
except ValueError:
    print("记录错误日志...")
    raise  # 重新抛出

自定义异常

class ValidationError(Exception):
    """自定义验证错误"""
    def __init__(self, field, message):
        self.field = field
        self.message = message
        super().__init__(f"{field}: {message}")


class AgeError(ValidationError):
    """年龄验证错误"""
    pass


def validate_age(age):
    if not isinstance(age, int):
        raise TypeError("年龄必须是整数")
    if age < 0:
        raise AgeError("age", "年龄不能为负数")
    if age > 150:
        raise AgeError("age", "年龄不能超过150")
    return True


try:
    validate_age(-5)
except AgeError as e:
    print(f"验证失败 - {e.field}: {e.message}")
except TypeError as e:
    print(f"类型错误: {e}")

上下文管理器

# with 语句的本质
class MyContext:
    def __enter__(self):
        print("进入上下文")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print("退出上下文")
        if exc_type:
            print(f"发生异常: {exc_val}")
        return False  # False 让异常继续传播

with MyContext() as ctx:
    print("执行操作")
# 进入上下文
# 执行操作
# 退出上下文

# 使用 contextlib
from contextlib import contextmanager

@contextmanager
def timer():
    import time
    start = time.time()
    yield
    end = time.time()
    print(f"耗时: {end - start:.4f}秒")

with timer():
    # 执行一些操作
    sum(range(1000000))

典型案例:数据库连接管理

class DatabaseConnection:
    """模拟数据库连接"""

    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.connected = False

    def connect(self):
        print(f"连接到 {self.host}:{self.port}")
        self.connected = True

    def disconnect(self):
        print("断开连接")
        self.connected = False

    def execute(self, query):
        if not self.connected:
            raise RuntimeError("未连接到数据库")
        print(f"执行查询: {query}")
        return [{"id": 1, "name": "张三"}]

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.disconnect()
        return False


# 使用
with DatabaseConnection("localhost", 5432) as db:
    result = db.execute("SELECT * FROM users")
    print(result)
# 自动断开连接

典型案例:安全的文件处理

def safe_read_file(filename, default=None):
    """安全读取文件,出错返回默认值"""
    try:
        with open(filename, "r", encoding="utf-8") as f:
            return f.read()
    except FileNotFoundError:
        print(f"文件不存在: {filename}")
        return default
    except PermissionError:
        print(f"没有读取权限: {filename}")
        return default
    except Exception as e:
        print(f"读取文件失败: {e}")
        return default


def safe_write_file(filename, content, backup=True):
    """安全写入文件,支持备份"""
    from pathlib import Path
    import shutil

    path = Path(filename)

    # 备份原文件
    if backup and path.exists():
        backup_path = path.with_suffix(path.suffix + ".bak")
        shutil.copy(path, backup_path)

    try:
        with open(filename, "w", encoding="utf-8") as f:
            f.write(content)
        return True
    except Exception as e:
        print(f"写入失败: {e}")
        # 恢复备份
        if backup and backup_path.exists():
            shutil.move(backup_path, path)
        return False

综合案例:待办事项应用

import json
from pathlib import Path
from datetime import datetime


class TodoApp:
    """待办事项应用"""

    def __init__(self, data_file="todos.json"):
        self.data_file = Path(data_file)
        self.todos = self.load()

    def load(self):
        """加载待办事项"""
        try:
            if self.data_file.exists():
                with open(self.data_file, "r", encoding="utf-8") as f:
                    return json.load(f)
        except json.JSONDecodeError:
            print("数据文件损坏,创建新文件")
        except Exception as e:
            print(f"加载失败: {e}")
        return []

    def save(self):
        """保存待办事项"""
        try:
            with open(self.data_file, "w", encoding="utf-8") as f:
                json.dump(self.todos, f, ensure_ascii=False, indent=2)
            return True
        except Exception as e:
            print(f"保存失败: {e}")
            return False

    def add(self, title):
        """添加待办事项"""
        if not title.strip():
            raise ValueError("标题不能为空")

        todo = {
            "id": len(self.todos) + 1,
            "title": title.strip(),
            "completed": False,
            "created_at": datetime.now().isoformat()
        }
        self.todos.append(todo)
        self.save()
        return todo

    def complete(self, todo_id):
        """完成待办事项"""
        for todo in self.todos:
            if todo["id"] == todo_id:
                todo["completed"] = True
                todo["completed_at"] = datetime.now().isoformat()
                self.save()
                return todo
        raise ValueError(f"未找到ID为 {todo_id} 的待办事项")

    def delete(self, todo_id):
        """删除待办事项"""
        for i, todo in enumerate(self.todos):
            if todo["id"] == todo_id:
                deleted = self.todos.pop(i)
                self.save()
                return deleted
        raise ValueError(f"未找到ID为 {todo_id} 的待办事项")

    def list(self, show_completed=True):
        """列出待办事项"""
        for todo in self.todos:
            if show_completed or not todo["completed"]:
                status = "[x]" if todo["completed"] else "[ ]"
                print(f"{todo['id']}. {status} {todo['title']}")

    def clear_completed(self):
        """清除已完成的待办事项"""
        self.todos = [t for t in self.todos if not t["completed"]]
        self.save()


def main():
    """主程序"""
    app = TodoApp()

    while True:
        print("\n=== 待办事项 ===")
        print("1. 查看列表")
        print("2. 添加事项")
        print("3. 完成事项")
        print("4. 删除事项")
        print("5. 清除已完成")
        print("0. 退出")

        try:
            choice = input("\n请选择: ").strip()

            if choice == "0":
                print("再见!")
                break
            elif choice == "1":
                app.list()
            elif choice == "2":
                title = input("输入标题: ")
                todo = app.add(title)
                print(f"已添加: {todo['title']}")
            elif choice == "3":
                todo_id = int(input("输入ID: "))
                app.complete(todo_id)
                print("已完成")
            elif choice == "4":
                todo_id = int(input("输入ID: "))
                app.delete(todo_id)
                print("已删除")
            elif choice == "5":
                app.clear_completed()
                print("已清除所有已完成事项")
            else:
                print("无效选择")

        except ValueError as e:
            print(f"错误: {e}")
        except KeyboardInterrupt:
            print("\n再见!")
            break


if __name__ == "__main__":
    main()

练习

练习1:统计单词频率

from collections import Counter

def word_frequency(filename):
    try:
        with open(filename, "r", encoding="utf-8") as f:
            text = f.read().lower()
            # 简单分词
            words = text.split()
            # 清理标点
            words = [w.strip(".,!?;:\"'()[]") for w in words]
            return Counter(words)
    except FileNotFoundError:
        print(f"文件不存在: {filename}")
        return Counter()

freq = word_frequency("sample.txt")
print(freq.most_common(10))

练习2:文件批量重命名

from pathlib import Path

def batch_rename(directory, old_ext, new_ext):
    """批量修改文件扩展名"""
    path = Path(directory)
    count = 0

    for file in path.glob(f"*{old_ext}"):
        new_name = file.with_suffix(new_ext)
        try:
            file.rename(new_name)
            count += 1
            print(f"重命名: {file.name} -> {new_name.name}")
        except Exception as e:
            print(f"失败: {file.name} - {e}")

    return count

# batch_rename("./images", ".jpeg", ".jpg")

小结

  • 使用 with 语句处理文件,确保资源正确释放
  • 读取大文件时使用逐行读取,避免内存问题
  • json 模块处理 JSON 数据
  • csv 模块处理 CSV 文件
  • pathlib 是现代 Python 推荐的路径处理方式
  • try-except-else-finally 处理异常
  • 自定义异常继承 Exception 类
  • 上下文管理器用于资源管理
Prev
Python面向对象编程 - 类、对象、继承、多态 | HiHuo
Next
Python HTTP服务项目实战 - 构建待办事项API服务 | HiHuo