import os import time from threading import Timer from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler, FileSystemEvent from tools.logger_handle import logger class FileSaveHandler(FileSystemEventHandler): def __init__(self, serve_client): self.serve_client = serve_client self.debounce_timers = {} self.upload_cache = {} # 文件路径 → 上次上传时间戳 self.debounce_delay = 5 # 延迟秒数 self._event_cache = {} super().__init__() def _should_process(self, event_path): now = time.time() last_time = self._event_cache.get(event_path, 0) if now - last_time < 5: # 5秒内重复事件,忽略 return False self._event_cache[event_path] = now return True def on_created(self, event: FileSystemEvent) -> None: if event.is_directory: return filepath = event.src_path ext = os.path.splitext(event.src_path)[-1] if '~$' in event.src_path: return if ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']: return self._should_process(event.src_path) def on_modified(self, event): if event.is_directory: return filepath = event.src_path ext = os.path.splitext(event.src_path)[-1] if '~$' in event.src_path: return if ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']: return if not self._should_process(event.src_path): return def do_upload(): if not os.path.exists(filepath): return mtime = os.path.getmtime(filepath) if self.upload_cache.get(filepath) == mtime: logger.debug(f"[重复上传跳过] {filepath}") return logger.info(f"[文件修改] 文件已保存: {filepath}, 执行上传操作。") res = self.serve_client.upload_file(filepath) metadata = self.serve_client.load_metadata(filepath + '.metadata') os.remove(filepath) os.remove(filepath + '.metadata') storage_path = os.path.dirname(filepath) self.serve_client.download_file(metadata['file_id'], storage_path) if res: logger.info(f'文件{filepath}上传成功') else: logger.info(f'文件{filepath}上传失败') self.upload_cache[filepath] = mtime self.debounce_timers.pop(filepath, None) if filepath in self.debounce_timers: self.debounce_timers[filepath].cancel() # 创建新的延迟上传任务 timer = Timer(self.debounce_delay, do_upload) self.debounce_timers[filepath] = timer timer.start() def start_watchdog(serve_client, work_path): event_handler = FileSaveHandler(serve_client) observer = Observer() observer.schedule(event_handler, work_path, recursive=True) observer.start() logger.info(f'watch dog start {serve_client}') try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() if __name__ == '__main__': start_watchdog() # print(os.listdir('../test/'))