import json import os import re import ipaddress from urllib.parse import urlparse import pythoncom import win32com from tools.config import base_path, file_type_map from tools.oss_client import MinioClient REPLY = {'html': None, 'cookies': None, 'token': None} def extract_domain_or_ip(url): """提取网页中的域名或 IP+端口""" try: parsed_url = urlparse(url) host = parsed_url.hostname # 提取主机部分(域名或 IP) port = parsed_url.port # 提取端口 result = host # 如果是 IP 地址,则保留端口 if is_ip_address(host): result = f"u{host}:{port}" if port else 'u' + host return result.replace('.', '_').replace(':', '_') # 仅返回域名 except Exception as e: print(f"解析错误: {e}") return None def is_ip_address(host): """判断是否是 IP 地址""" try: ipaddress.ip_address(host) return True except ValueError: return False def detect_oss_file_changes(oss_client, file_info): cloud_file_md5 = oss_client.get_cloud_file_md5(file_info['bucket_name'], file_info['file_path']) with open(os.path.join(base_path, file_info['file_name'] + '.metadata'), 'r', encoding='utf-8') as f: metadata = json.load(f) return cloud_file_md5 == metadata['md5'] def detect_local_file_changes(file_path): with open(os.path.join(base_path, file_path + '.metadata'), 'r', encoding='utf-8') as f: metadata = json.load(f) current_file_md5 = MinioClient.get_file_md5(file_path) return metadata['md5'] == current_file_md5 def is_file_open_in_wps(file_path: str) -> bool: abs_path = os.path.abspath(file_path).lower() suffix = file_path.split('.')[-1] try: pythoncom.CoInitialize() app = win32com.client.GetActiveObject(file_type_map[suffix][0]) for wb in getattr(app, file_type_map[suffix][1]): if wb.FullName.lower() == abs_path: return True except Exception: return False finally: pythoncom.CoUninitialize() return False