monitor_file.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import os
  2. import time
  3. from threading import Timer
  4. from watchdog.observers import Observer
  5. from watchdog.events import FileSystemEventHandler, FileSystemEvent
  6. from tools.logger_handle import logger
  7. class FileSaveHandler(FileSystemEventHandler):
  8. def __init__(self, serve_client):
  9. self.serve_client = serve_client
  10. self.debounce_timers = {}
  11. self.upload_cache = {} # 文件路径 → 上次上传时间戳
  12. self.debounce_delay = 5 # 延迟秒数
  13. self._event_cache = {}
  14. super().__init__()
  15. def _should_process(self, event_path):
  16. now = time.time()
  17. last_time = self._event_cache.get(event_path, 0)
  18. if now - last_time < 5: # 5秒内重复事件,忽略
  19. return False
  20. self._event_cache[event_path] = now
  21. return True
  22. def on_created(self, event: FileSystemEvent) -> None:
  23. if event.is_directory:
  24. return
  25. filepath = event.src_path
  26. ext = os.path.splitext(event.src_path)[-1]
  27. if '~$' in event.src_path:
  28. return
  29. if ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']:
  30. return
  31. self._should_process(event.src_path)
  32. def on_modified(self, event):
  33. if event.is_directory:
  34. return
  35. filepath = event.src_path
  36. ext = os.path.splitext(event.src_path)[-1]
  37. if '~$' in event.src_path:
  38. return
  39. if ext not in ['.docx', '.doc', '.ppt', '.pptx', '.xls', '.xlsx']:
  40. return
  41. if not self._should_process(event.src_path):
  42. return
  43. def do_upload():
  44. if not os.path.exists(filepath):
  45. return
  46. mtime = os.path.getmtime(filepath)
  47. if self.upload_cache.get(filepath) == mtime:
  48. logger.debug(f"[重复上传跳过] {filepath}")
  49. return
  50. logger.info(f"[文件修改] 文件已保存: {filepath}, 执行上传操作。")
  51. res = self.serve_client.upload_file(filepath)
  52. metadata = self.serve_client.load_metadata(filepath + '.metadata')
  53. os.remove(filepath)
  54. os.remove(filepath + '.metadata')
  55. storage_path = os.path.dirname(filepath)
  56. self.serve_client.download_file(metadata['file_id'], storage_path)
  57. if res:
  58. logger.info(f'文件{filepath}上传成功')
  59. else:
  60. logger.info(f'文件{filepath}上传失败')
  61. self.upload_cache[filepath] = mtime
  62. self.debounce_timers.pop(filepath, None)
  63. if filepath in self.debounce_timers:
  64. self.debounce_timers[filepath].cancel()
  65. # 创建新的延迟上传任务
  66. timer = Timer(self.debounce_delay, do_upload)
  67. self.debounce_timers[filepath] = timer
  68. timer.start()
  69. def start_watchdog(serve_client, work_path):
  70. event_handler = FileSaveHandler(serve_client)
  71. observer = Observer()
  72. observer.schedule(event_handler, work_path, recursive=True)
  73. observer.start()
  74. logger.info(f'watch dog start {serve_client}')
  75. try:
  76. while True:
  77. time.sleep(1)
  78. except KeyboardInterrupt:
  79. observer.stop()
  80. observer.join()
  81. if __name__ == '__main__':
  82. start_watchdog()
  83. # print(os.listdir('../test/'))