monitor_file.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import os
  2. import time
  3. from threading import Timer
  4. from watchdog.observers import Observer
  5. from watchdog.events import FileSystemEventHandler, FileSystemEvent
  6. from tools.logger_handle import logger
  7. class FileSaveHandler(FileSystemEventHandler):
  8. def __init__(self, serve_client):
  9. self.serve_client = serve_client
  10. self.debounce_timers = {}
  11. self.upload_cache = {} # 文件路径 → 上次上传时间戳
  12. self.debounce_delay = 5 # 延迟秒数
  13. self._event_cache = {}
  14. super().__init__()
  15. def _should_process(self, event_path):
  16. now = time.time()
  17. last_time = self._event_cache.get(event_path, 0)
  18. if now - last_time < 5: # 5秒内重复事件,忽略
  19. return False
  20. self._event_cache[event_path] = now
  21. return True
  22. def on_created(self, event: FileSystemEvent) -> None:
  23. if event.is_directory:
  24. return
  25. filepath = event.src_path
  26. ext = os.path.splitext(event.src_path)[-1]
  27. if '~$' in event.src_path or ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']:
  28. return
  29. self._should_process(event.src_path)
  30. def on_modified(self, event):
  31. if event.is_directory:
  32. return
  33. filepath = event.src_path
  34. ext = os.path.splitext(event.src_path)[-1]
  35. if '~$' in event.src_path:
  36. return
  37. if ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']:
  38. return
  39. if not self._should_process(event.src_path):
  40. return
  41. def do_upload():
  42. if not os.path.exists(filepath):
  43. return
  44. mtime = os.path.getmtime(filepath)
  45. if self.upload_cache.get(filepath) == mtime:
  46. logger.debug(f"[重复上传跳过] {filepath}")
  47. return
  48. logger.info(f"[文件修改] 文件已保存: {filepath}, 执行上传操作。")
  49. res = self.serve_client.upload_file(filepath)
  50. metadata = self.serve_client.load_metadata(filepath + '.metadata')
  51. os.remove(filepath)
  52. os.remove(filepath + '.metadata')
  53. storage_path = os.path.dirname(filepath)
  54. self.serve_client.download_file(metadata['file_id'], storage_path)
  55. self.upload_cache[filepath] = mtime
  56. self.debounce_timers.pop(filepath, None)
  57. if filepath in self.debounce_timers:
  58. self.debounce_timers[filepath].cancel()
  59. # 创建新的延迟上传任务
  60. timer = Timer(self.debounce_delay, do_upload)
  61. self.debounce_timers[filepath] = timer
  62. timer.start()
  63. def start_watchdog(serve_client, work_path):
  64. event_handler = FileSaveHandler(serve_client)
  65. observer = Observer()
  66. observer.schedule(event_handler, work_path, recursive=True)
  67. observer.start()
  68. logger.info(f'watch dog start {serve_client}')
  69. try:
  70. while True:
  71. time.sleep(1)
  72. except KeyboardInterrupt:
  73. observer.stop()
  74. observer.join()
  75. if __name__ == '__main__':
  76. start_watchdog()