monitor_file.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import os
  2. import time
  3. from threading import Timer
  4. from watchdog.observers import Observer
  5. from watchdog.events import FileSystemEventHandler, FileSystemEvent
  6. from tools.logger_handle import logger
  7. class FileSaveHandler(FileSystemEventHandler):
  8. def __init__(self, serve_client):
  9. self.serve_client = serve_client
  10. self.debounce_timers = {}
  11. self.upload_cache = {} # 文件路径 → 上次上传时间戳
  12. self.debounce_delay = 5 # 延迟秒数
  13. self._event_cache = {}
  14. super().__init__()
  15. def _should_process(self, event_path):
  16. now = time.time()
  17. last_time = self._event_cache.get(event_path, 0)
  18. if now - last_time < 5: # 5秒内重复事件,忽略
  19. return False
  20. self._event_cache[event_path] = now
  21. return True
  22. def on_created(self, event: FileSystemEvent) -> None:
  23. if event.is_directory:
  24. return
  25. filepath = event.src_path
  26. ext = os.path.splitext(event.src_path)[-1]
  27. if '~$' in event.src_path or ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']:
  28. return
  29. self._should_process(event.src_path)
  30. def on_modified(self, event):
  31. if event.is_directory:
  32. return
  33. filepath = event.src_path
  34. ext = os.path.splitext(event.src_path)[-1]
  35. if '~$' in event.src_path:
  36. return
  37. if ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']:
  38. return
  39. if not self._should_process(event.src_path):
  40. return
  41. def do_upload():
  42. if not os.path.exists(filepath):
  43. return
  44. ext = os.path.splitext(event.src_path)[-1]
  45. if ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']:
  46. return
  47. mtime = os.path.getmtime(filepath)
  48. if self.upload_cache.get(filepath) == mtime:
  49. logger.debug(f"[重复上传跳过] {filepath}")
  50. return
  51. logger.info(f"[文件修改] 文件已保存: {filepath}, 执行上传操作。")
  52. metadata = self.serve_client.load_metadata(filepath + '.metadata')
  53. os.remove(filepath)
  54. os.remove(filepath + '.metadata')
  55. storage_path = os.path.dirname(filepath)
  56. self.serve_client.download_file(metadata['file_id'], storage_path)
  57. self.upload_cache[filepath] = mtime
  58. self.debounce_timers.pop(filepath, None)
  59. if filepath in self.debounce_timers:
  60. self.debounce_timers[filepath].cancel()
  61. # 创建新的延迟上传任务
  62. timer = Timer(self.debounce_delay, do_upload)
  63. self.debounce_timers[filepath] = timer
  64. timer.start()
  65. def start_watchdog(serve_client, work_path):
  66. event_handler = FileSaveHandler(serve_client)
  67. observer = Observer()
  68. observer.schedule(event_handler, work_path, recursive=True)
  69. observer.start()
  70. logger.info(f'watch dog start {serve_client}')
  71. try:
  72. while True:
  73. time.sleep(1)
  74. except KeyboardInterrupt:
  75. observer.stop()
  76. observer.join()
  77. if __name__ == '__main__':
  78. start_watchdog()