文件与异常处理
文件操作
打开文件
# 基本语法
file = open("filename.txt", "mode", encoding="utf-8")
# ... 操作文件
file.close()
# 推荐:使用 with 语句(自动关闭文件)
with open("filename.txt", "r", encoding="utf-8") as file:
content = file.read()
# 文件在 with 块结束后自动关闭
文件模式
| 模式 | 说明 |
|---|---|
r | 只读(默认),文件不存在则报错 |
w | 写入,文件存在则清空,不存在则创建 |
a | 追加,在文件末尾添加内容 |
x | 创建,文件已存在则报错 |
r+ | 读写 |
w+ | 读写(清空原内容) |
a+ | 读写(追加模式) |
b | 二进制模式(如 rb, wb) |
读取文件
# 读取全部内容
with open("example.txt", "r", encoding="utf-8") as f:
content = f.read()
print(content)
# 读取指定字符数
with open("example.txt", "r", encoding="utf-8") as f:
chunk = f.read(100) # 读取前100个字符
# 读取一行
with open("example.txt", "r", encoding="utf-8") as f:
line = f.readline()
print(line)
# 读取所有行(返回列表)
with open("example.txt", "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
print(line.strip()) # strip() 去除换行符
# 逐行读取(推荐:内存友好)
with open("example.txt", "r", encoding="utf-8") as f:
for line in f:
print(line.strip())
写入文件
# 写入字符串
with open("output.txt", "w", encoding="utf-8") as f:
f.write("Hello, World!\n")
f.write("Python 文件操作\n")
# 写入多行
lines = ["第一行\n", "第二行\n", "第三行\n"]
with open("output.txt", "w", encoding="utf-8") as f:
f.writelines(lines)
# 追加内容
with open("output.txt", "a", encoding="utf-8") as f:
f.write("追加的内容\n")
# 使用 print 写入文件
with open("output.txt", "w", encoding="utf-8") as f:
print("使用 print 写入", file=f)
print("第二行内容", file=f)
典型案例:日志记录器
import datetime
def log_message(message, filename="app.log"):
"""记录日志到文件"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}\n"
with open(filename, "a", encoding="utf-8") as f:
f.write(log_entry)
# 使用
log_message("程序启动")
log_message("用户登录: 张三")
log_message("操作完成")
文件位置
with open("example.txt", "r", encoding="utf-8") as f:
print(f.tell()) # 当前位置: 0
f.read(10) # 读取10个字符
print(f.tell()) # 当前位置: 10
f.seek(0) # 移动到开头
print(f.tell()) # 0
f.seek(0, 2) # 移动到末尾
# seek(offset, whence)
# whence: 0=开头, 1=当前位置, 2=末尾
二进制文件
# 读取二进制文件
with open("image.png", "rb") as f:
data = f.read()
print(len(data), "字节")
# 写入二进制文件
with open("copy.png", "wb") as f:
f.write(data)
# 复制文件
def copy_file(src, dst, chunk_size=4096):
with open(src, "rb") as fsrc:
with open(dst, "wb") as fdst:
while True:
chunk = fsrc.read(chunk_size)
if not chunk:
break
fdst.write(chunk)
JSON 文件
import json
# 写入 JSON
data = {
"name": "张三",
"age": 25,
"skills": ["Python", "JavaScript"]
}
with open("data.json", "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 读取 JSON
with open("data.json", "r", encoding="utf-8") as f:
loaded_data = json.load(f)
print(loaded_data["name"]) # 张三
# JSON 字符串操作
json_str = json.dumps(data, ensure_ascii=False)
parsed = json.loads(json_str)
CSV 文件
import csv
# 写入 CSV
data = [
["姓名", "年龄", "城市"],
["张三", 25, "北京"],
["李四", 30, "上海"],
["王五", 28, "广州"]
]
with open("data.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerows(data)
# 读取 CSV
with open("data.csv", "r", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
print(row)
# 使用 DictReader/DictWriter
data = [
{"name": "张三", "age": 25},
{"name": "李四", "age": 30}
]
with open("data.csv", "w", encoding="utf-8", newline="") as f:
fieldnames = ["name", "age"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
with open("data.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
print(f"{row['name']}: {row['age']}岁")
路径操作
import os
from pathlib import Path
# os 模块
print(os.getcwd()) # 当前目录
print(os.listdir(".")) # 列出目录内容
os.makedirs("path/to/dir", exist_ok=True) # 创建目录
os.path.exists("file.txt") # 检查是否存在
os.path.isfile("file.txt") # 是否是文件
os.path.isdir("dir") # 是否是目录
os.path.join("path", "to", "file.txt") # 拼接路径
os.path.basename("/path/to/file.txt") # 文件名
os.path.dirname("/path/to/file.txt") # 目录名
os.path.splitext("file.txt") # ('file', '.txt')
# pathlib 模块(推荐)
p = Path("path/to/file.txt")
print(p.exists()) # 是否存在
print(p.is_file()) # 是否是文件
print(p.name) # 文件名
print(p.stem) # 不带扩展名的文件名
print(p.suffix) # 扩展名
print(p.parent) # 父目录
# 创建目录
Path("new_dir").mkdir(exist_ok=True)
# 遍历目录
for file in Path(".").glob("*.py"):
print(file)
# 递归遍历
for file in Path(".").rglob("*.txt"):
print(file)
典型案例:配置文件管理
import json
from pathlib import Path
class Config:
def __init__(self, config_file="config.json"):
self.config_file = Path(config_file)
self.data = self.load()
def load(self):
"""加载配置"""
if self.config_file.exists():
with open(self.config_file, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def save(self):
"""保存配置"""
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(self.data, f, ensure_ascii=False, indent=2)
def get(self, key, default=None):
"""获取配置项"""
return self.data.get(key, default)
def set(self, key, value):
"""设置配置项"""
self.data[key] = value
self.save()
# 使用
config = Config()
config.set("username", "admin")
config.set("theme", "dark")
print(config.get("username")) # admin
异常处理
基本语法
try:
# 可能出错的代码
result = 10 / 0
except ZeroDivisionError:
# 处理特定异常
print("不能除以零")
except Exception as e:
# 处理其他异常
print(f"发生错误: {e}")
else:
# 没有异常时执行
print(f"结果: {result}")
finally:
# 无论如何都会执行
print("清理操作")
常见异常类型
# 常见内置异常
# ZeroDivisionError - 除以零
# ValueError - 值错误
# TypeError - 类型错误
# IndexError - 索引越界
# KeyError - 字典键不存在
# FileNotFoundError - 文件不存在
# AttributeError - 属性错误
# ImportError - 导入错误
# NameError - 名称未定义
# 捕获多种异常
try:
x = int(input("输入数字: "))
result = 10 / x
except (ValueError, ZeroDivisionError) as e:
print(f"输入错误: {e}")
# 异常层次结构
# BaseException
# ├── SystemExit
# ├── KeyboardInterrupt
# └── Exception
# ├── ValueError
# ├── TypeError
# ├── KeyError
# └── ...
抛出异常
def divide(a, b):
if b == 0:
raise ValueError("除数不能为零")
return a / b
try:
divide(10, 0)
except ValueError as e:
print(e) # 除数不能为零
# 重新抛出异常
try:
divide(10, 0)
except ValueError:
print("记录错误日志...")
raise # 重新抛出
自定义异常
class ValidationError(Exception):
"""自定义验证错误"""
def __init__(self, field, message):
self.field = field
self.message = message
super().__init__(f"{field}: {message}")
class AgeError(ValidationError):
"""年龄验证错误"""
pass
def validate_age(age):
if not isinstance(age, int):
raise TypeError("年龄必须是整数")
if age < 0:
raise AgeError("age", "年龄不能为负数")
if age > 150:
raise AgeError("age", "年龄不能超过150")
return True
try:
validate_age(-5)
except AgeError as e:
print(f"验证失败 - {e.field}: {e.message}")
except TypeError as e:
print(f"类型错误: {e}")
上下文管理器
# with 语句的本质
class MyContext:
def __enter__(self):
print("进入上下文")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print("退出上下文")
if exc_type:
print(f"发生异常: {exc_val}")
return False # False 让异常继续传播
with MyContext() as ctx:
print("执行操作")
# 进入上下文
# 执行操作
# 退出上下文
# 使用 contextlib
from contextlib import contextmanager
@contextmanager
def timer():
import time
start = time.time()
yield
end = time.time()
print(f"耗时: {end - start:.4f}秒")
with timer():
# 执行一些操作
sum(range(1000000))
典型案例:数据库连接管理
class DatabaseConnection:
"""模拟数据库连接"""
def __init__(self, host, port):
self.host = host
self.port = port
self.connected = False
def connect(self):
print(f"连接到 {self.host}:{self.port}")
self.connected = True
def disconnect(self):
print("断开连接")
self.connected = False
def execute(self, query):
if not self.connected:
raise RuntimeError("未连接到数据库")
print(f"执行查询: {query}")
return [{"id": 1, "name": "张三"}]
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
return False
# 使用
with DatabaseConnection("localhost", 5432) as db:
result = db.execute("SELECT * FROM users")
print(result)
# 自动断开连接
典型案例:安全的文件处理
def safe_read_file(filename, default=None):
"""安全读取文件,出错返回默认值"""
try:
with open(filename, "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
print(f"文件不存在: {filename}")
return default
except PermissionError:
print(f"没有读取权限: {filename}")
return default
except Exception as e:
print(f"读取文件失败: {e}")
return default
def safe_write_file(filename, content, backup=True):
"""安全写入文件,支持备份"""
from pathlib import Path
import shutil
path = Path(filename)
# 备份原文件
if backup and path.exists():
backup_path = path.with_suffix(path.suffix + ".bak")
shutil.copy(path, backup_path)
try:
with open(filename, "w", encoding="utf-8") as f:
f.write(content)
return True
except Exception as e:
print(f"写入失败: {e}")
# 恢复备份
if backup and backup_path.exists():
shutil.move(backup_path, path)
return False
综合案例:待办事项应用
import json
from pathlib import Path
from datetime import datetime
class TodoApp:
"""待办事项应用"""
def __init__(self, data_file="todos.json"):
self.data_file = Path(data_file)
self.todos = self.load()
def load(self):
"""加载待办事项"""
try:
if self.data_file.exists():
with open(self.data_file, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError:
print("数据文件损坏,创建新文件")
except Exception as e:
print(f"加载失败: {e}")
return []
def save(self):
"""保存待办事项"""
try:
with open(self.data_file, "w", encoding="utf-8") as f:
json.dump(self.todos, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"保存失败: {e}")
return False
def add(self, title):
"""添加待办事项"""
if not title.strip():
raise ValueError("标题不能为空")
todo = {
"id": len(self.todos) + 1,
"title": title.strip(),
"completed": False,
"created_at": datetime.now().isoformat()
}
self.todos.append(todo)
self.save()
return todo
def complete(self, todo_id):
"""完成待办事项"""
for todo in self.todos:
if todo["id"] == todo_id:
todo["completed"] = True
todo["completed_at"] = datetime.now().isoformat()
self.save()
return todo
raise ValueError(f"未找到ID为 {todo_id} 的待办事项")
def delete(self, todo_id):
"""删除待办事项"""
for i, todo in enumerate(self.todos):
if todo["id"] == todo_id:
deleted = self.todos.pop(i)
self.save()
return deleted
raise ValueError(f"未找到ID为 {todo_id} 的待办事项")
def list(self, show_completed=True):
"""列出待办事项"""
for todo in self.todos:
if show_completed or not todo["completed"]:
status = "[x]" if todo["completed"] else "[ ]"
print(f"{todo['id']}. {status} {todo['title']}")
def clear_completed(self):
"""清除已完成的待办事项"""
self.todos = [t for t in self.todos if not t["completed"]]
self.save()
def main():
"""主程序"""
app = TodoApp()
while True:
print("\n=== 待办事项 ===")
print("1. 查看列表")
print("2. 添加事项")
print("3. 完成事项")
print("4. 删除事项")
print("5. 清除已完成")
print("0. 退出")
try:
choice = input("\n请选择: ").strip()
if choice == "0":
print("再见!")
break
elif choice == "1":
app.list()
elif choice == "2":
title = input("输入标题: ")
todo = app.add(title)
print(f"已添加: {todo['title']}")
elif choice == "3":
todo_id = int(input("输入ID: "))
app.complete(todo_id)
print("已完成")
elif choice == "4":
todo_id = int(input("输入ID: "))
app.delete(todo_id)
print("已删除")
elif choice == "5":
app.clear_completed()
print("已清除所有已完成事项")
else:
print("无效选择")
except ValueError as e:
print(f"错误: {e}")
except KeyboardInterrupt:
print("\n再见!")
break
if __name__ == "__main__":
main()
练习
练习1:统计单词频率
from collections import Counter
def word_frequency(filename):
try:
with open(filename, "r", encoding="utf-8") as f:
text = f.read().lower()
# 简单分词
words = text.split()
# 清理标点
words = [w.strip(".,!?;:\"'()[]") for w in words]
return Counter(words)
except FileNotFoundError:
print(f"文件不存在: {filename}")
return Counter()
freq = word_frequency("sample.txt")
print(freq.most_common(10))
练习2:文件批量重命名
from pathlib import Path
def batch_rename(directory, old_ext, new_ext):
"""批量修改文件扩展名"""
path = Path(directory)
count = 0
for file in path.glob(f"*{old_ext}"):
new_name = file.with_suffix(new_ext)
try:
file.rename(new_name)
count += 1
print(f"重命名: {file.name} -> {new_name.name}")
except Exception as e:
print(f"失败: {file.name} - {e}")
return count
# batch_rename("./images", ".jpeg", ".jpg")
小结
- 使用
with语句处理文件,确保资源正确释放 - 读取大文件时使用逐行读取,避免内存问题
json模块处理 JSON 数据csv模块处理 CSV 文件pathlib是现代 Python 推荐的路径处理方式try-except-else-finally处理异常- 自定义异常继承
Exception类 - 上下文管理器用于资源管理