监视文件系统
watchdog
watchdog 提供了指定目录/文件的变化监控,对于指定目录内的操作,被视为一次事件。如添加删除文件或目录、重命名文件或目录、修改文件内容等,每种变化都会触发一次事件,事件是用户定义的业务逻辑代码。
安装
pip install watchdog
复制
演示程序
import sys
import time
import logging
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
if __name__ == "__main__":
path = r'/tmp'
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
event_handler = LoggingEventHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()复制
Observer
# 设置超时时间
watchdog.observers.Observer(timeout=30)
# 监控指定路径path,该路径触发任何事件都会调用event_handler来处理,如果path是目录,recursive=True 开启会递归模式,监控该目录下的所有变化。
observer.schedule(event_handler, path, recursive=False)
# 添加一个事件处理器到watch中
observer.add_handler_for_watch(event_handler, watch)
# 从watch中移除一个事件处理器
observer.remove_handler_for_watch(event_handler, watch)
# 移除一个watch及这个watch上的所有事件处理器
observer.unschedule(watch)
# 移除所有watch及关联的事件处理器
observer.unschedule_all()
# 等同于observer.unschedule_all()
observer.on_thread_stop()
# 启动 observer 线程
observer.start()
# 停止observer线程
observer.stop()复制
InotifyObserver,Linux系统默认使用的观察目录的调度事件,效率比较高。 PollingObserver,与平台无关,轮询目录以检测文件的更改,效率比较低。 WindowsApiObserver,Windows系统默认使用的观察目录的调度事件,效率比较高。 FSEventsObserver,macOS 系统默认使用的调度事件 KqueueObserver,FreeBSD 系统默认使用
默认 Observer 会判断操作系统类型,选择最佳的方式。下面👇是 Observer 的源码
# Linux系统
if platform.is_linux():
try:
from .inotify import InotifyObserver as Observer
except UnsupportedLibc:
from .polling import PollingObserver as Observer
# darwin系统
elif platform.is_darwin():
# FIXME: catching too broad. Error prone
try:
from .fsevents import FSEventsObserver as Observer
except:
try:
from .kqueue import KqueueObserver as Observer
warnings.warn("Failed to import fsevents. Fall back to kqueue")
except:
from .polling import PollingObserver as Observer
warnings.warn("Failed to import fsevents and kqueue. Fall back to polling.")
# bsd系统
elif platform.is_bsd():
from .kqueue import KqueueObserver as Observer
# windows系统
elif platform.is_windows():
# TODO: find a reliable way of checking Windows version and import
# polling explicitly for Windows XP
try:
from .read_directory_changes import WindowsApiObserver as Observer
except:
from .polling import PollingObserver as Observer
warnings.warn("Failed to import read_directory_changes. Fall back to polling.")
else:
from .polling import PollingObserver as Observer复制
手工选择 Observer,注意下面代码 observer = PollingObserver()
import sys
import time
import logging
# from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import LoggingEventHandler
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
path = sys.argv[1] if len(sys.argv) > 1 else '.'
event_handler = LoggingEventHandler()
observer = PollingObserver()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()复制
创建/删除/修改/移动
定义创建/删除/修改/移动四种事件
from watchdog.observers import Observer
from watchdog.events import *
import time
class DemoFileSystemEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_moved(self, event):
if event.is_directory:
print("directory moved from {0} to {1}".format(
event.src_path, event.dest_path))
else:
print("file moved from {0} to {1}".format(
event.src_path, event.dest_path))
def on_created(self, event):
if event.is_directory:
print("directory created:{0}".format(event.src_path))
else:
print("file created:{0}".format(event.src_path))
def on_deleted(self, event):
if event.is_directory:
print("directory deleted:{0}".format(event.src_path))
else:
print("file deleted:{0}".format(event.src_path))
def on_modified(self, event):
if event.is_directory:
print("directory modified:{0}".format(event.src_path))
else:
print("file modified:{0}".format(event.src_path))
if __name__ == "__main__":
path = r'/tmp'
observer = Observer()
event_handler = DemoFileSystemEventHandler()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()复制
on_any_event 任何事件都会触发
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MyHandler(FileSystemEventHandler):
def on_any_event(self, event):
if event.is_directory:
print('Directory', event.event_type, event.src_path)
else:
print('File', event.event_type, event.src_path)
if __name__ == "__main__":
monitor_path = '/tmp'
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path=monitor_path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()复制
多事件绑定
import time
import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.events import LoggingEventHandler
class MyEventHandler(FileSystemEventHandler):
def on_any_event(self, event):
if event.is_directory:
print('Directory', event.event_type, event.src_path)
else:
print('File', event.event_type, event.src_path)
if __name__ == "__main__":
watch_path = '/tmp'
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
loggingEventHandler = LoggingEventHandler()
myEventHandler = MyEventHandler()
observer = Observer()
watch1 = observer.schedule(
loggingEventHandler, path=watch_path, recursive=True)
watch2 = observer.schedule(
myEventHandler, path=watch_path, recursive=True)
observer.add_handler_for_watch(loggingEventHandler, watch1)
observer.add_handler_for_watch(myEventHandler, watch2)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()复制
自动备份程序
事情的 起因 - 经过 - 结果
做视频剪辑有一段时间了,苹果笔记本电脑硬盘比较小,视频剪辑空间不够,所以必须外挂一个剪辑盘。
我使用了由两块 SSD 组成的 RAID 硬盘盒子,起初是追求速度快,使用 RAID 0 模式,后期为了安全 切换到了 RAID 1 模式。这样硬件的安全就解决了,再也不用担心硬盘损坏数据丢失了。
某一天,就如日常剪辑一样,当我试图打开 Final cut pro 资源库的时候,悲剧了!!!
Fcpx 的工程文件打不开了,闪退后提示错误,这次翻车丢失了《Netkiller Architect 手札》6个视频,不过还好,那些视频都是初期做品,录制和剪辑都不算好,本就有重做的想法。
这次经历让我不在相信 fcpx 的资源库管理能力,鸡蛋放在一个篮子里及其危险的,《Netkiller Python 手札》我把一章内容放在一个资源库中,每节一个事件。
之前也偶尔备份,手工复制,有时比较懒,就没有备份。于是我便写了一个程序,来自动备份视频剪辑盘。
插入U设备,发现移动硬盘 剪辑盘和备份盘没有其他程序占用 开始备份 备份完成后自动弹出移动设备
<!-- -->
复制
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
# ================================================================================
# 视频剪辑盘自动备份程序
# 作者: netkiller | netkiller@msn.com | http://www.netkiller.cn
# ================================================================================
import time
import os
import signal
import sys
import atexit
import subprocess
# from atexit import register
from watchdog.observers import Observer
from watchdog.events import *
class FileEventHandler(FileSystemEventHandler):
src_path = ''
dst_path = ''
watchpath = {}
def __init__(self, logger, watchpath):
FileSystemEventHandler.__init__(self)
self.watchpath = watchpath
self.logger = logger
def execute(self, cmd):
fr = os.popen(cmd)
# text = fr.read()
text = fr.readlines()
fr.close()
# print(len(text), text)
return(text)
def isBusy(self, path):
cmd = "lsof {}".format(path)
# print(cmd)
if len(self.execute(cmd)) == 0:
self.logger.info("目录空闲: {0}".format(path))
return False
else:
self.logger.info("目录繁忙: {0}".format(path))
return True
def mirror(self):
self.logger.info("开始备份")
cmd = "/usr/bin/rsync -auzv --delete --exclude=.Spotlight-V100 --exclude=.fseventsd {0} {1}".format(
self.src_path, self.dst_path)
# self.logger.info(cmd)
sp = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
for line in iter(sp.stdout.readline, b''):
self.logger.info(line.decode('UTF8').replace("\n", ''))
sp.stdout.close()
sp.wait()
self.logger.info("备份完成")
def umount(self):
if not self.isBusy(self.src_path):
self.logger.info("卸载移动硬盘")
os.system("diskutil eject {0}".format(
os.path.basename(self.src_path)))
else:
self.logger.info("移动硬盘 {0} 繁忙".format(self.src_path))
def backup(self):
if not os.path.exists(self.dst_path):
os.mkdir(self.dst_path)
if not self.isBusy(self.src_path):
if not self.isBusy(self.dst_path):
self.mirror()
time.sleep(5)
self.umount()
def on_created(self, event):
if event.is_directory:
self.logger.info("发现移动硬盘:{0}".format(event.src_path))
# self.path = self.watchpath[event.src_path]
self.src_path = event.src_path
self.dst_path = self.watchpath[event.src_path]
self.backup()
def on_deleted(self, event):
if event.is_directory:
self.logger.info("卸载移动硬盘: {0}".format(event.src_path))
class Backup():
pidfile = "/tmp/netkiller.pid"
logdir = "/tmp"
logfile = logdir+"/netkiller.log"
logger = None
loop = True
job = True
watchpath = {'/Volumes/Video': '/tmp/Backup',
'/Volumes/Photo': '/tmp/Backup'}
observer = Observer()
def __init__(self):
logging.basicConfig(level=logging.NOTSET,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename=self.logfile,
filemode='a')
self.logger = logging.getLogger()
# 保存进程ID
def savepid(self, pid):
with open(self.pidfile, 'w') as f:
f.write(str(os.getpid()))
# 注册退出函数,进程退出时自动移除pidfile文件
atexit.register(os.remove, self.pidfile)
# 从pidfile中读取进程ID
def getpid(self):
pid = 0
try:
with open(self.pidfile, 'r') as f:
pid = int(f.readline())
except FileNotFoundError as identifier:
print(identifier)
return pid
# 信号处理
def signalHandler(self, signum, frame):
# global loop, job
# print("SIGN ",str(signum));
if signum == signal.SIGHUP:
# 优雅重启
self.job = False
self.logger.info("优雅重启完毕")
elif signum == signal.SIGINT:
# 正常退出
self.loop = False
self.job = False
self.observer.stop()
self.logger.info("正常退出")
elif signum == signal.SIGUSR1:
# 日志切割
pass
def daemonize(self):
# global job
self.logger.info("启动守护进程")
signal.signal(signal.SIGHUP, self.signalHandler)
signal.signal(signal.SIGINT, self.signalHandler)
signal.signal(signal.SIGUSR1, self.signalHandler)
signal.alarm(5)
pid = os.fork()
sys.stdout.flush()
sys.stderr.flush()
if pid:
sys.exit(0)
# print(os.getpid())
self.savepid(str(os.getpid()))
while self.loop:
self.logger.info("Start!!!")
self.main()
self.logger.info("Exit!!!")
def start(self):
if os.path.isfile(self.pidfile):
print("程序已经启动")
sys.exit(1)
else:
self.daemonize()
def stop(self):
try:
os.kill(self.getpid(), signal.SIGINT)
except ProcessLookupError as identifier:
print(identifier)
def reloads(self):
try:
os.kill(self.getpid(), signal.SIGHUP)
except ProcessLookupError as identifier:
print(identifier)
def logrotate(self):
try:
os.kill(self.getpid(), signal.SIGUSR1)
except ProcessLookupError as identifier:
print(identifier)
def main(self):
# 业务逻辑
event_handler = FileEventHandler(self.logger, self.watchpath)
for src, dst in self.watchpath.items():
self.logger.info("监控 {0} => {1}".format(src, dst))
self.observer.schedule(
event_handler, path=src, recursive=False)
self.logger.info("启动 Watchdog")
self.observer.start()
self.observer.join()
# 业务逻辑
def usage():
print(sys.argv[0] + " start | stop | restart | reload | log")
if __name__ == "__main__":
# print(sys.argv)
backup = Backup()
if len(sys.argv) > 1:
arg = sys.argv[1]
if arg == "start":
backup.start()
elif arg == "stop":
backup.stop()
elif arg == "restart":
backup.stop()
backup.start()
elif arg == "reload":
backup.reloads()
else:
usage()
else:
usage()复制
文章转载自netkiller,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
python排序sort()和sorted()区别
wzf0072
61次阅读
2025-04-09 19:57:46
AI与我共创WEB界面
布衣
50次阅读
2025-04-14 22:13:51
解决pyqt5 textbrowser控件超链接锚点问题
zayki
33次阅读
2025-04-27 16:58:59
python中标识符的命名规则和命名规范
周同学带您玩AI
31次阅读
2025-04-21 10:34:44
python 实现消费者优先级队列
天翼云开发者社区
26次阅读
2025-04-25 11:08:21
优雅遍历和删除特定开头的key
陌殇流苏
24次阅读
2025-04-25 12:17:03
《深入剖析Python的生成器表达式与列表推导式:探寻代码背后的哲学与艺术》
程序员阿伟
21次阅读
2025-04-27 16:22:14
python自动更新dns A记录
godba
11次阅读
2025-04-23 11:19:04
python中的常见数据类型
周同学带您玩AI
11次阅读
2025-04-21 10:34:43
Python运维神器:这些模块让你事半功倍
韩公子的Linux大集市
11次阅读
2025-04-09 11:13:18