目录

1.背景说明

2.watchdog介绍

3. watchdog使用步骤

4.本机环境

5.代码示例

6.代码优化

7.异常字符处理


1.背景说明

        在实际做项目开发的时候,往往需要适时监控某一个文件夹作为辅助,当有文件生成、拷贝进去、或者移动走的时候能够实时触发一个信号,经过自己的不断地调研和大量的阅读,发现了python中一个好用的实时文件监控库-watchdog。

2.watchdog介绍

        Watchdog是一个Python库,用于监视文件系统中的变化,例如文件或目录的创建、修改、删除等。它可以用于编写监控文件变化的应用程序,例如自动备份、文件同步等。

3. watchdog使用步骤

以下是使用Watchdog的基本步骤:

1.安装Watchdog库:可以使用pip命令安装Watchdog库,例如在命令行中运行pip install watchdog

2.导入Watchdog模块:在Python脚本中导入Watchdog库,例如from watchdog.observers import Observerfrom watchdog.events import FileSystemEventHandler

3.创建事件处理器:继承FileSystemEventHandler类,并重写需要处理的事件方法,例如on_createdon_modifiedon_deleted等。

4.创建观察者对象和添加事件处理器:创建一个Observer对象,并将事件处理器对象添加到观察者中,例如observer = Observer()observer.schedule(event_handler, path, recursive=True)

5.启动观察者:使用observer.start()方法启动观察者,开始监视文件系统中的变化。

6.处理事件:观察者开始运行后,会调用事件处理器中相应的方法来处理文件系统的变化事件。

7.停止观察者:使用observer.stop()方法停止观察者的运行,并调用observer.join()等待观察者线程的结束。

4.本机环境

  • win10   64位系统企业版
  • python版本:3.6.8 (x64)
  • watchdog :0.9.0

5.代码示例

下面是一个简单的示例代码,演示如何使用Watchdog监视文件夹中的文件创建事件:

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyHandler(FileSystemEventHandler):
    def on_created(self, event):
        print(f"File created: {event.src_path}")

if __name__ == "__main__":
    path = "path/to/folder"
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()

    try:
        while True:
            pass
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

在上面的示例中,当文件夹中有新文件创建时,会调用on_created方法,并打印出文件路径。

通过这种方式,你可以根据自己的需求来处理不同类型的文件系统变化事件。

示例2:

import time

from watchdog.events import *
from watchdog.observers import Observer

class FileEventHandler(FileSystemEventHandler):
    def __init__(self):
        FileSystemEventHandler.__init__(self)

    def on_moved(self, event):
        if event.is_directory:
            print("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
        else:
            print("file moved from {0} to {1}".format(event.src_path, event.dest_path))

    def on_created(self, event):
        if event.is_directory:
            print("directory created:{0}".format(event.src_path))
        else:
            print("file created:{0}".format(event.src_path))

    def on_deleted(self, event):
        if event.is_directory:
            print("directory deleted:{0}".format(event.src_path))
        else:
            print("file deleted:{0}".format(event.src_path))

    def on_modified(self, event):
        if event.is_directory:
            print("directory modified:{0}".format(event.src_path))
        else:
            print("file modified:{0}".format(event.src_path))


if __name__ == "__main__":
    observer = Observer()
    event_handler = FileEventHandler()
    observer.schedule(event_handler, "./testFile", True)
    observer.start()
    print('监控当前目录下的文件夹开始:')
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

初始testFile文件夹中是空的,向testFile文件中丢入一副图像,后台运行触发效果如下:

6.代码优化

可以使用配置文件的方式,在外部设置要监控的文件夹路径和要移动到的文件夹路径。

# -*- coding: utf-8 -*-
import os ,shutil,time
from watchdog.observers import Observer
from watchdog.events import *
import configparser   
###################监控文件夹下文件的变化并复制到另一目录后删除###################
class FileEventHandler(FileSystemEventHandler):
    def __init__(self):
        FileSystemEventHandler.__init__(self)

    def on_created(self, event):
        if event.is_directory:
            print("directory created:{0}".format(event.src_path))
        else:
            filename= format(event.src_path)
            print(filename,type(filename))
            time.sleep(0.5)
            if os.path.exists(filename):
                #copyCommand = 'copy %s %s' % (filename, fn)
                copyCommand = os.system('xcopy "'+filename+'" D:\MonitoringSystem\video_source')
                time.sleep(0.5)
                if copyCommand == 0:
                    print('copy successed!')                           
                else:
                    print('copy failed!',copyCommand)
                        
if __name__ == "__main__":
    global fn
    config = configparser.ConfigParser()
    config.read('Config.ini',encoding='utf-8')
    watchpath = config.get('watch config', 'watchpath')
    fn = config.get('watch config', 'videopath')
    root_dir= watchpath 
    observer = Observer()
    event_handler = FileEventHandler()
    observer.schedule(event_handler,root_dir,True)    
    observer.start()
    print('视频监控程序运行中,请不要关闭......')
    print("*"*60)
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

7.异常字符处理

说明:有时候我们在用watchdog监控文件夹并复制文件到别的文件夹的时候,因为文件夹名字中有空格往往会导致赋值失败,这时候需要对字符串进行相关处理后,才可以移动到关文件,推荐一个参考程序:

import os
fn = "D:\Record"
aa="E:\J04_2020.03.19 00_20_02_manual.txt"
try:
    #bb=os.system('xcopy "E:\J04_2020.03.19 00_20_02_manual.txt" D:\Record')
    bb=os.system('xcopy "'+aa+'" D:\Record')
    print('复制成功!',bb)
except:
    print('文件名不正确!')
finally:
    print('无奈啊!')
    

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐