Python 文件系统操作深度实战——pathlib、watchdog、文件监控
Python 文件系统操作深度实战——pathlib、watchdog、文件监控
适读人群:Python 开发者、运维工程师、做数据管道的同学 | 阅读时长:约16分钟 | 核心价值:掌握现代 Python 文件操作的完整工程方案
那个让服务器差点崩掉的文件处理脚本
大概三年前,我刚开始在公司做数据工程,有个任务是监控一个数据落地目录,有新文件进来就自动处理并入库。
当时写的方案极其简陋:一个 while True 循环,每5秒扫一次目录,拿到所有文件列表,和内存里的历史列表比对,发现新文件就处理。
这个方案稳定运行了一个多月,然后某天目录里突然多了3万个小文件(上游系统故障,积压了好几天的数据),我的脚本开始每5秒 os.listdir() 扫3万个文件,CPU 直接飙到90%,最后把整个数据处理服务都拖慢了。
这件事让我彻底反思了文件系统操作的工程方案,从此建立了几个核心原则:
- 用
pathlib替代os.path,代码更清晰 - 用
watchdog替代轮询,基于内核事件,效率提升100倍 - 大目录操作要用生成器,不要一次全加载进内存
一、pathlib——现代 Python 文件操作
pathlib 是 Python 3.4 引入的,用面向对象的方式操作路径,比 os.path 的函数式调用清晰很多。
基础操作对比
from pathlib import Path
import os
# 旧方式 vs pathlib
# 路径拼接
old_way = os.path.join("/data", "2024", "01", "report.csv")
new_way = Path("/data") / "2024" / "01" / "report.csv"
print(new_way) # /data/2024/01/report.csv
# 获取文件信息
p = Path("/data/2024/01/report.csv")
print(p.name) # report.csv
print(p.stem) # report
print(p.suffix) # .csv
print(p.parent) # /data/2024/01
print(p.parts) # ('/', 'data', '2024', '01', 'report.csv')
# 判断和检查
print(p.exists())
print(p.is_file())
print(p.is_dir())
# 文件属性
if p.exists():
stat = p.stat()
print(f"大小: {stat.st_size:,} bytes")
from datetime import datetime
print(f"修改时间: {datetime.fromtimestamp(stat.st_mtime)}")目录遍历——生成器方案
from pathlib import Path
import time
def iter_files(
root: Path,
pattern: str = "*",
min_size: int = 0,
newer_than: float = 0,
) -> iter:
"""
高效遍历目录,返回生成器(不一次性加载所有文件)
:param root: 根目录
:param pattern: glob 模式
:param min_size: 最小文件大小(字节)
:param newer_than: 只返回比这个时间戳新的文件
"""
for path in root.rglob(pattern):
if not path.is_file():
continue
try:
stat = path.stat()
if stat.st_size < min_size:
continue
if newer_than and stat.st_mtime <= newer_than:
continue
yield path
except (PermissionError, OSError):
continue
# 找出最近1小时内修改的所有 CSV 文件
root = Path("/data")
one_hour_ago = time.time() - 3600
for csv_file in iter_files(root, "*.csv", newer_than=one_hour_ago):
print(f"新文件: {csv_file} ({csv_file.stat().st_size:,} bytes)")文件读写最佳实践
from pathlib import Path
import json
import csv
from typing import Any
class FileManager:
"""文件读写工具类"""
@staticmethod
def write_safely(path: Path, content: str, encoding: str = "utf-8") -> bool:
"""原子写入:先写临时文件,再重命名,避免写入中途崩溃导致文件损坏"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
# 临时文件和目标文件在同一目录,保证重命名是原子操作
tmp_path = path.with_suffix(path.suffix + ".tmp")
try:
tmp_path.write_text(content, encoding=encoding)
tmp_path.replace(path) # 原子重命名
return True
except Exception as e:
tmp_path.unlink(missing_ok=True)
raise e
@staticmethod
def read_json(path: Path) -> Any:
return json.loads(Path(path).read_text(encoding="utf-8"))
@staticmethod
def write_json(path: Path, data: Any, indent: int = 2) -> None:
content = json.dumps(data, ensure_ascii=False, indent=indent)
FileManager.write_safely(Path(path), content)
@staticmethod
def iter_csv(path: Path, encoding: str = "utf-8") -> iter:
"""逐行读取 CSV,内存友好"""
with open(path, "r", encoding=encoding, newline="") as f:
reader = csv.DictReader(f)
yield from reader
@staticmethod
def archive(source: Path, archive_dir: Path) -> Path:
"""将文件归档到按日期组织的目录"""
from datetime import datetime
today = datetime.now().strftime("%Y/%m/%d")
target_dir = archive_dir / today
target_dir.mkdir(parents=True, exist_ok=True)
# 如果目标已存在,加时间戳后缀
target = target_dir / source.name
if target.exists():
ts = datetime.now().strftime("%H%M%S")
target = target_dir / f"{source.stem}_{ts}{source.suffix}"
source.rename(target)
return target二、watchdog——基于内核事件的文件监控
pip install watchdogwatchdog 底层使用操作系统原生的文件系统事件(Linux 的 inotify,macOS 的 FSEvents,Windows 的 ReadDirectoryChangesW),不是轮询,CPU 消耗极低。
完整的文件监控系统
import time
import logging
import threading
from pathlib import Path
from queue import Queue, Empty
from dataclasses import dataclass
from typing import Callable
from watchdog.observers import Observer
from watchdog.events import (
FileSystemEventHandler,
FileCreatedEvent,
FileModifiedEvent,
FileMovedEvent,
FileDeletedEvent,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class FileEvent:
"""统一的文件事件数据类"""
event_type: str # created / modified / moved / deleted
src_path: str
dest_path: str | None = None # 仅 moved 事件有值
timestamp: float = 0.0
def __post_init__(self):
if not self.timestamp:
import time
self.timestamp = time.time()
class BufferedFileHandler(FileSystemEventHandler):
"""
带防抖缓冲的文件事件处理器
同一文件在 debounce_seconds 内多次触发只处理最后一次
"""
def __init__(
self,
event_queue: Queue,
patterns: list[str] = None,
debounce_seconds: float = 0.5,
):
super().__init__()
self.event_queue = event_queue
self.patterns = patterns or ["*"]
self.debounce_seconds = debounce_seconds
self._pending: dict[str, tuple[FileEvent, float]] = {}
self._lock = threading.Lock()
self._start_debounce_worker()
def _matches(self, path: str) -> bool:
from fnmatch import fnmatch
return any(fnmatch(Path(path).name, p) for p in self.patterns)
def _queue_event(self, event: FileEvent):
if not self._matches(event.src_path):
return
with self._lock:
self._pending[event.src_path] = (event, time.time())
def _start_debounce_worker(self):
"""防抖工作线程:定期把稳定的事件推入队列"""
def worker():
while True:
time.sleep(0.1)
now = time.time()
with self._lock:
ready = [
(path, evt) for path, (evt, ts) in self._pending.items()
if now - ts >= self.debounce_seconds
]
for path, _ in ready:
del self._pending[path]
for path, evt in ready:
self.event_queue.put(evt)
t = threading.Thread(target=worker, daemon=True)
t.start()
def on_created(self, event):
if not event.is_directory:
self._queue_event(FileEvent("created", event.src_path))
def on_modified(self, event):
if not event.is_directory:
self._queue_event(FileEvent("modified", event.src_path))
def on_moved(self, event):
if not event.is_directory:
self._queue_event(FileEvent("moved", event.src_path, event.dest_path))
def on_deleted(self, event):
if not event.is_directory:
self._queue_event(FileEvent("deleted", event.src_path))
class DirectoryWatcher:
"""
目录监控服务
监控目录变化,通过回调处理新文件
"""
def __init__(
self,
watch_dir: str | Path,
on_file_event: Callable[[FileEvent], None],
patterns: list[str] = None,
recursive: bool = True,
):
self.watch_dir = Path(watch_dir)
self.on_file_event = on_file_event
self.patterns = patterns or ["*"]
self.recursive = recursive
self._event_queue: Queue = Queue()
self._observer = Observer()
self._running = False
def start(self, blocking: bool = True):
"""启动监控"""
self.watch_dir.mkdir(parents=True, exist_ok=True)
handler = BufferedFileHandler(
self._event_queue,
patterns=self.patterns,
)
self._observer.schedule(handler, str(self.watch_dir), recursive=self.recursive)
self._observer.start()
self._running = True
logger.info(f"开始监控目录: {self.watch_dir}")
if blocking:
self._consume_events()
else:
t = threading.Thread(target=self._consume_events, daemon=True)
t.start()
def _consume_events(self):
"""消费事件队列"""
while self._running:
try:
event = self._event_queue.get(timeout=1.0)
try:
self.on_file_event(event)
except Exception as e:
logger.error(f"处理文件事件出错: {e}", exc_info=True)
except Empty:
continue
def stop(self):
self._running = False
self._observer.stop()
self._observer.join()
logger.info("监控已停止")
# 使用示例:监控数据目录,自动处理新 CSV 文件
def process_csv_file(event: FileEvent):
if event.event_type not in ("created", "moved"):
return
path = Path(event.dest_path or event.src_path)
if path.suffix.lower() != ".csv":
return
logger.info(f"发现新 CSV 文件: {path}")
# 等待文件写入完成(简单检测:大小稳定)
prev_size = -1
for _ in range(10):
curr_size = path.stat().st_size if path.exists() else 0
if curr_size == prev_size and curr_size > 0:
break
prev_size = curr_size
time.sleep(0.2)
# 处理文件
try:
rows = list(FileManager.iter_csv(path))
logger.info(f"处理完成: {len(rows)} 行数据")
# 处理完毕,归档
FileManager.archive(path, Path("/data/archive"))
except Exception as e:
logger.error(f"处理失败: {path}, 错误: {e}")
# 启动监控(非阻塞)
watcher = DirectoryWatcher(
watch_dir="/data/incoming",
on_file_event=process_csv_file,
patterns=["*.csv", "*.json"],
recursive=False,
)
# watcher.start(blocking=False)
# time.sleep(60)
# watcher.stop()三、踩坑实录
踩坑实录1:watchdog 在 macOS 上漏事件
现象:macOS 上用 watchdog 监控,某些文件创建事件会丢失,特别是文件写入速度很快的时候。
原因:macOS FSEvents API 有合并事件的机制,短时间内大量事件可能被合并。
解法:1)降低事件合并延迟;2)结合启动时的全量扫描,补漏。
踩坑实录2:path.rename() 跨设备失败
现象:将文件从 /tmp 移动到 /data 时,path.rename() 抛出 OSError: [Errno 18] Invalid cross-device link。
原因:rename 只能在同一设备内原子操作,跨设备需要先复制再删除。
解法:用 shutil.move() 代替,它会自动处理跨设备情况。
踩坑实录3:文件刚创建就处理,内容还没写完
现象:watchdog 触发 created 事件时,文件可能还在写入中,读取到的是不完整数据。
原因:操作系统先创建文件(触发 created),再写入内容,两个操作不是原子的。
解法:收到 created 事件后,等待文件大小稳定(连续两次读取大小相同)再处理。
四、选型建议
| 场景 | 推荐方案 |
|---|---|
| 简单文件路径操作 | pathlib |
| 监控目录变化 | watchdog(内核事件) |
| 一次性全量扫描 | pathlib.rglob(生成器) |
| 跨平台文件操作 | pathlib + shutil |
| 大文件读写 | 逐行/逐块读取,避免全量加载内存 |
从那次服务器差点崩掉之后,我写的所有数据处理服务都用 watchdog 替代了轮询,CPU 从偶发90%降到了常态不足5%。架构上的改进,效果远比代码优化更显著。
