123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- import os
- import time
- from threading import Timer
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler, FileSystemEvent
- from tools.logger_handle import logger
- class FileSaveHandler(FileSystemEventHandler):
- def __init__(self, serve_client):
- self.serve_client = serve_client
- self.debounce_timers = {}
- self.upload_cache = {} # 文件路径 → 上次上传时间戳
- self.debounce_delay = 5 # 延迟秒数
- self._event_cache = {}
- super().__init__()
- def _should_process(self, event_path):
- now = time.time()
- last_time = self._event_cache.get(event_path, 0)
- if now - last_time < 5: # 5秒内重复事件,忽略
- return False
- self._event_cache[event_path] = now
- return True
- def on_created(self, event: FileSystemEvent) -> None:
- if event.is_directory:
- return
- filepath = event.src_path
- ext = os.path.splitext(event.src_path)[-1]
- if '~$' in event.src_path:
- return
- if ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']:
- return
- self._should_process(event.src_path)
- def on_modified(self, event):
- if event.is_directory:
- return
- filepath = event.src_path
- ext = os.path.splitext(event.src_path)[-1]
- if '~$' in event.src_path:
- return
- if ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']:
- return
- if not self._should_process(event.src_path):
- return
- def do_upload():
- if not os.path.exists(filepath):
- return
- mtime = os.path.getmtime(filepath)
- if self.upload_cache.get(filepath) == mtime:
- logger.debug(f"[重复上传跳过] {filepath}")
- return
- logger.info(f"[文件修改] 文件已保存: {filepath}, 执行上传操作。")
- res = self.serve_client.upload_file(filepath)
- metadata = self.serve_client.load_metadata(filepath + '.metadata')
- os.remove(filepath)
- os.remove(filepath + '.metadata')
- storage_path = os.path.dirname(filepath)
- self.serve_client.download_file(metadata['file_id'], storage_path)
- if res:
- logger.info(f'文件{filepath}上传成功')
- else:
- logger.info(f'文件{filepath}上传失败')
- self.upload_cache[filepath] = mtime
- self.debounce_timers.pop(filepath, None)
- if filepath in self.debounce_timers:
- self.debounce_timers[filepath].cancel()
- # 创建新的延迟上传任务
- timer = Timer(self.debounce_delay, do_upload)
- self.debounce_timers[filepath] = timer
- timer.start()
- def start_watchdog(serve_client, work_path):
- event_handler = FileSaveHandler(serve_client)
- observer = Observer()
- observer.schedule(event_handler, work_path, recursive=True)
- observer.start()
- logger.info(f'watch dog start {serve_client}')
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
- if __name__ == '__main__':
- start_watchdog()
- # print(os.listdir('../test/'))
|