暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Python 监控文件系统

netkiller 2021-06-21
1118

监视文件系统

watchdog

watchdog 提供了指定目录/文件的变化监控,对于指定目录内的操作,被视为一次事件。如添加删除文件或目录、重命名文件或目录、修改文件内容等,每种变化都会触发一次事件,事件是用户定义的业务逻辑代码。

安装

pip install watchdog

复制

演示程序

import sys
import time
import logging
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler

if __name__ == "__main__":

path = r'/tmp'

logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
event_handler = LoggingEventHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()

复制

Observer

# 设置超时时间        
watchdog.observers.Observer(timeout=30)

# 监控指定路径path,该路径触发任何事件都会调用event_handler来处理,如果path是目录,recursive=True 开启会递归模式,监控该目录下的所有变化。
observer.schedule(event_handler, path, recursive=False)

# 添加一个事件处理器到watch中
observer.add_handler_for_watch(event_handler, watch)

# 从watch中移除一个事件处理器
observer.remove_handler_for_watch(event_handler, watch)

# 移除一个watch及这个watch上的所有事件处理器
observer.unschedule(watch)

# 移除所有watch及关联的事件处理器
observer.unschedule_all()
# 等同于observer.unschedule_all()
observer.on_thread_stop()

# 启动 observer 线程
observer.start()

# 停止observer线程
observer.stop()

复制
  • InotifyObserver,Linux系统默认使用的观察目录的调度事件,效率比较高。
  • PollingObserver,与平台无关,轮询目录以检测文件的更改,效率比较低。
  • WindowsApiObserver,Windows系统默认使用的观察目录的调度事件,效率比较高。
  • FSEventsObserver,macOS 系统默认使用的调度事件
  • KqueueObserver,FreeBSD 系统默认使用

默认 Observer 会判断操作系统类型,选择最佳的方式。下面👇是 Observer 的源码

# Linux系统
if platform.is_linux():
try:
from .inotify import InotifyObserver as Observer
except UnsupportedLibc:
from .polling import PollingObserver as Observer
# darwin系统
elif platform.is_darwin():
# FIXME: catching too broad. Error prone
try:
from .fsevents import FSEventsObserver as Observer
except:
try:
from .kqueue import KqueueObserver as Observer
warnings.warn("Failed to import fsevents. Fall back to kqueue")
except:
from .polling import PollingObserver as Observer
warnings.warn("Failed to import fsevents and kqueue. Fall back to polling.")
# bsd系统
elif platform.is_bsd():
from .kqueue import KqueueObserver as Observer
# windows系统
elif platform.is_windows():
# TODO: find a reliable way of checking Windows version and import
# polling explicitly for Windows XP
try:
from .read_directory_changes import WindowsApiObserver as Observer
except:
from .polling import PollingObserver as Observer
warnings.warn("Failed to import read_directory_changes. Fall back to polling.")

else:
from .polling import PollingObserver as Observer

复制

手工选择 Observer,注意下面代码 observer = PollingObserver()

import sys
import time
import logging
# from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import LoggingEventHandler

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
path = sys.argv[1] if len(sys.argv) > 1 else '.'
event_handler = LoggingEventHandler()
observer = PollingObserver()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()

复制

创建/删除/修改/移动

定义创建/删除/修改/移动四种事件

from watchdog.observers import Observer
from watchdog.events import *
import time


class DemoFileSystemEventHandler(FileSystemEventHandler):

def __init__(self):
FileSystemEventHandler.__init__(self)

def on_moved(self, event):
if event.is_directory:
print("directory moved from {0} to {1}".format(
event.src_path, event.dest_path))
else:
print("file moved from {0} to {1}".format(
event.src_path, event.dest_path))

def on_created(self, event):
if event.is_directory:
print("directory created:{0}".format(event.src_path))
else:
print("file created:{0}".format(event.src_path))

def on_deleted(self, event):
if event.is_directory:
print("directory deleted:{0}".format(event.src_path))
else:
print("file deleted:{0}".format(event.src_path))

def on_modified(self, event):
if event.is_directory:
print("directory modified:{0}".format(event.src_path))
else:
print("file modified:{0}".format(event.src_path))


if __name__ == "__main__":

path = r'/tmp'

observer = Observer()
event_handler = DemoFileSystemEventHandler()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()

复制

on_any_event 任何事件都会触发

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler


class MyHandler(FileSystemEventHandler):
def on_any_event(self, event):
if event.is_directory:
print('Directory', event.event_type, event.src_path)
else:
print('File', event.event_type, event.src_path)


if __name__ == "__main__":

monitor_path = '/tmp'

event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path=monitor_path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()

复制

多事件绑定

import time
import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.events import LoggingEventHandler


class MyEventHandler(FileSystemEventHandler):
def on_any_event(self, event):
if event.is_directory:
print('Directory', event.event_type, event.src_path)
else:
print('File', event.event_type, event.src_path)


if __name__ == "__main__":

watch_path = '/tmp'

logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
loggingEventHandler = LoggingEventHandler()
myEventHandler = MyEventHandler()

observer = Observer()

watch1 = observer.schedule(
loggingEventHandler, path=watch_path, recursive=True)
watch2 = observer.schedule(
myEventHandler, path=watch_path, recursive=True)

observer.add_handler_for_watch(loggingEventHandler, watch1)
observer.add_handler_for_watch(myEventHandler, watch2)

observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()

复制

自动备份程序

事情的 起因 - 经过 - 结果

做视频剪辑有一段时间了,苹果笔记本电脑硬盘比较小,视频剪辑空间不够,所以必须外挂一个剪辑盘。

我使用了由两块 SSD 组成的 RAID 硬盘盒子,起初是追求速度快,使用 RAID 0 模式,后期为了安全 切换到了 RAID 1 模式。这样硬件的安全就解决了,再也不用担心硬盘损坏数据丢失了。

某一天,就如日常剪辑一样,当我试图打开 Final cut pro 资源库的时候,悲剧了!!!

Fcpx 的工程文件打不开了,闪退后提示错误,这次翻车丢失了《Netkiller Architect 手札》6个视频,不过还好,那些视频都是初期做品,录制和剪辑都不算好,本就有重做的想法。

这次经历让我不在相信 fcpx 的资源库管理能力,鸡蛋放在一个篮子里及其危险的,《Netkiller Python 手札》我把一章内容放在一个资源库中,每节一个事件。

之前也偶尔备份,手工复制,有时比较懒,就没有备份。于是我便写了一个程序,来自动备份视频剪辑盘。

  • 插入U设备,发现移动硬盘
  • 剪辑盘和备份盘没有其他程序占用
  • 开始备份
  • 备份完成后自动弹出移动设备
<!-- -->

复制
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
# ================================================================================
# 视频剪辑盘自动备份程序
# 作者: netkiller | netkiller@msn.com | http://www.netkiller.cn
# ================================================================================

import time
import os
import signal
import sys
import atexit
import subprocess
# from atexit import register
from watchdog.observers import Observer
from watchdog.events import *


class FileEventHandler(FileSystemEventHandler):
src_path = ''
dst_path = ''
watchpath = {}

def __init__(self, logger, watchpath):
FileSystemEventHandler.__init__(self)
self.watchpath = watchpath
self.logger = logger

def execute(self, cmd):
fr = os.popen(cmd)
# text = fr.read()
text = fr.readlines()
fr.close()
# print(len(text), text)
return(text)

def isBusy(self, path):
cmd = "lsof {}".format(path)
# print(cmd)
if len(self.execute(cmd)) == 0:
self.logger.info("目录空闲: {0}".format(path))
return False
else:
self.logger.info("目录繁忙: {0}".format(path))
return True

def mirror(self):
self.logger.info("开始备份")
cmd = "/usr/bin/rsync -auzv --delete --exclude=.Spotlight-V100 --exclude=.fseventsd {0} {1}".format(
self.src_path, self.dst_path)
# self.logger.info(cmd)

sp = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
for line in iter(sp.stdout.readline, b''):
self.logger.info(line.decode('UTF8').replace("\n", ''))
sp.stdout.close()
sp.wait()
self.logger.info("备份完成")

def umount(self):
if not self.isBusy(self.src_path):
self.logger.info("卸载移动硬盘")
os.system("diskutil eject {0}".format(
os.path.basename(self.src_path)))
else:
self.logger.info("移动硬盘 {0} 繁忙".format(self.src_path))

def backup(self):
if not os.path.exists(self.dst_path):
os.mkdir(self.dst_path)
if not self.isBusy(self.src_path):
if not self.isBusy(self.dst_path):
self.mirror()
time.sleep(5)
self.umount()

def on_created(self, event):
if event.is_directory:
self.logger.info("发现移动硬盘:{0}".format(event.src_path))
# self.path = self.watchpath[event.src_path]
self.src_path = event.src_path
self.dst_path = self.watchpath[event.src_path]
self.backup()

def on_deleted(self, event):
if event.is_directory:
self.logger.info("卸载移动硬盘: {0}".format(event.src_path))


class Backup():

pidfile = "/tmp/netkiller.pid"
logdir = "/tmp"
logfile = logdir+"/netkiller.log"
logger = None
loop = True
job = True

watchpath = {'/Volumes/Video': '/tmp/Backup',
'/Volumes/Photo': '/tmp/Backup'}

observer = Observer()

def __init__(self):
logging.basicConfig(level=logging.NOTSET,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename=self.logfile,
filemode='a')
self.logger = logging.getLogger()

# 保存进程ID
def savepid(self, pid):
with open(self.pidfile, 'w') as f:
f.write(str(os.getpid()))
# 注册退出函数,进程退出时自动移除pidfile文件
atexit.register(os.remove, self.pidfile)

# 从pidfile中读取进程ID
def getpid(self):
pid = 0
try:
with open(self.pidfile, 'r') as f:
pid = int(f.readline())
except FileNotFoundError as identifier:
print(identifier)
return pid

# 信号处理
def signalHandler(self, signum, frame):
# global loop, job
# print("SIGN ",str(signum));
if signum == signal.SIGHUP:
# 优雅重启
self.job = False
self.logger.info("优雅重启完毕")
elif signum == signal.SIGINT:
# 正常退出
self.loop = False
self.job = False
self.observer.stop()
self.logger.info("正常退出")
elif signum == signal.SIGUSR1:
# 日志切割
pass

def daemonize(self):
# global job
self.logger.info("启动守护进程")
signal.signal(signal.SIGHUP, self.signalHandler)
signal.signal(signal.SIGINT, self.signalHandler)
signal.signal(signal.SIGUSR1, self.signalHandler)
signal.alarm(5)

pid = os.fork()
sys.stdout.flush()
sys.stderr.flush()
if pid:
sys.exit(0)
# print(os.getpid())
self.savepid(str(os.getpid()))
while self.loop:
self.logger.info("Start!!!")
self.main()
self.logger.info("Exit!!!")

def start(self):
if os.path.isfile(self.pidfile):
print("程序已经启动")
sys.exit(1)
else:
self.daemonize()

def stop(self):
try:
os.kill(self.getpid(), signal.SIGINT)
except ProcessLookupError as identifier:
print(identifier)

def reloads(self):
try:
os.kill(self.getpid(), signal.SIGHUP)
except ProcessLookupError as identifier:
print(identifier)

def logrotate(self):
try:
os.kill(self.getpid(), signal.SIGUSR1)
except ProcessLookupError as identifier:
print(identifier)

def main(self):
# 业务逻辑
event_handler = FileEventHandler(self.logger, self.watchpath)
for src, dst in self.watchpath.items():
self.logger.info("监控 {0} => {1}".format(src, dst))
self.observer.schedule(
event_handler, path=src, recursive=False)
self.logger.info("启动 Watchdog")
self.observer.start()
self.observer.join()
# 业务逻辑


def usage():
print(sys.argv[0] + " start | stop | restart | reload | log")


if __name__ == "__main__":
# print(sys.argv)
backup = Backup()
if len(sys.argv) > 1:
arg = sys.argv[1]
if arg == "start":
backup.start()
elif arg == "stop":
backup.stop()
elif arg == "restart":
backup.stop()
backup.start()
elif arg == "reload":
backup.reloads()
else:
usage()
else:
usage()

复制


文章转载自netkiller,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论