import base64 import hashlib import json import os.path import re import requests from Crypto.Cipher import AES from tools.logger_handle import logger class ServerClient: def __init__(self, server_host, user_name, password): self.user_name = user_name self.password = password self.serve_host = server_host self.access_token = None self.login_reply = None self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36', } self.login() def login(self): url = self.serve_host + '/auth/oauth2/token' reply = requests.post(url, params={ 'username': self.user_name, 'password': self.encryption(self.password), 'grant_type': 'password', 'scope': 'server', 'client_id': 'knowledge', 'client_secret': 'knowledge' }, headers={'Content-Type': 'multipart/form-data', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}) self.access_token = reply.json()['access_token'] self.login_reply = reply.json() self.headers['Authorization'] = f'Bearer {self.access_token}' def decryption(self, hex_str, secret='anZz000000000000'): key_bytes = secret.encode('utf-8') iv = key_bytes base64_str = base64.b64encode(bytes.fromhex(hex_str)).decode('utf-8') cipher = AES.new(key_bytes, AES.MODE_CBC, iv) decrypted = cipher.decrypt(base64.b64decode(base64_str)) try: decoded = decrypted.decode('utf-8') except UnicodeDecodeError: decoded = decrypted.decode('utf-8', errors='ignore') # 清除控制字符(类似 JS 中的 [\u0000-\u001F\u007F-\u009F]) cleaned = re.sub(r'[\x00-\x1F\x7F-\x9F]', ' ', decoded) try: return json.loads(cleaned) except json.JSONDecodeError: return cleaned.strip() def zero_pad(self, data, block_size=16): pad_len = block_size - (len(data) % block_size) if pad_len == 0: return data return data + b'\x00' * pad_len def encryption(self, payload, secret='a25vd2xlZGdl0000'.encode('utf-8')): data_bytes = payload.encode('utf-8') padded_data = self.zero_pad(data_bytes) cipher = AES.new(secret, AES.MODE_CBC, secret) encrypted = cipher.encrypt(padded_data) return encrypted.hex() def get_base_path(self): url = self.serve_host + '/mgr/document/dcLibrary/knowledges/owner?size=1000¤t=1' reply = requests.get(url, headers=self.headers) reply_data = reply.json() if reply_data.get('code') != 0: return [] data = self.decryption(reply_data['data']) return data.get('records', []) def get_folder_by_id(self, _id): url = f'{self.serve_host}/mgr/document/dcLibrary/tree?id={_id}' self.headers['documentid'] = _id reply = requests.get(url, headers=self.headers) reply_data = reply.json() if reply_data.get('code') != 0: return [] data = self.decryption(reply_data['data']) return data.get('data', []) def check_resp(self, resp): if resp.json()['code'] == -2: self.login() return True return False def download_file(self, file_info, storage_path): try: resp = requests.get(f'{self.serve_host}/mgr/document/dcLibrary/file/get/file/{file_info["id"]}', headers=self.headers) local_file_name = file_info['filePath'].replace('/', '_') with open(os.path.join(storage_path, local_file_name), 'wb') as f: f.write(resp.content) with open(os.path.join(storage_path, local_file_name + '.metadata'), 'w', encoding='utf-8') as f: f.write(json.dumps({ 'file_id': file_info["id"], 'bucket_name': file_info['bucketName'], 'md5': self.get_file_md5(os.path.join(storage_path, local_file_name)), 'file_name': file_info['name'], 'oss_path': file_info['filePath'], 'update_time': file_info['updateTime'] })) except Exception as e: logger.exception(e) return False return True def upload_file(self, file_path): try: with open(f'{file_path}.metadata', 'r', encoding='utf-8') as f: file_metadata = json.loads(f.read()) file_id = file_metadata['file_id'] files = [('file', (file_metadata['file_name'], open(file_path, 'rb'), 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'))] resp = requests.post( f'{self.serve_host}/mgr/document/dcLibrary/saveByPython/{file_id}', headers=self.headers, files=files, data={} ) if self.check_resp(resp): resp = requests.post( f'{self.serve_host}/mgr/document/dcLibrary/saveByPython/{file_id}', headers=self.headers, files=files, data={}) if resp.json()['code'] == 0: return True else: logger.error(resp.json()) return False except Exception as e: logger.exception(e) return False @staticmethod def get_file_md5(file_path): md5 = hashlib.md5() with open(file_path, 'rb') as f: # 分块读取以支持大文件 while chunk := f.read(8192): md5.update(chunk) return md5.hexdigest() @staticmethod def load_metadata(local_path): with open(f'{local_path}.metadata', 'r', encoding='utf-8') as f: file_metadata = json.loads(f.read()) return file_metadata def create_cloud_file(self, local_file='D:/bussion/法律法规/道路交通安全法.docx', folder_id='5b091f1007c880528bd913530987d5e5'): payload = {'module': '/jvs-knowledge-ui/jvs-knowledge-import/'} files = [ ('file', (os.path.basename(local_file), open(local_file, 'rb'), 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'))] resp = requests.post(f'http://120.195.49.22:7215/mgr/document/dcLibrary/upload/{folder_id}', headers=self.headers, data=payload, files=files) if resp.json().get('code') == 0: return True else: return False def get_folder_tree(self): resp = requests.get('http://120.195.49.22:7215/mgr/document/dcLibrary/tree/myFolder', headers=self.headers) if resp.json().get('code') == 0: return self.decryption(resp.json()['data']) else: return [] def get_template_file_info(self, file_id): try: resp = requests.get(f'http://120.195.49.22:7215/mgr/document/dcLibrary/template/get/{file_id}', headers=self.headers) if resp.json().get('code') == 0: return self.decryption(resp.json()['data']) return False except Exception as e: logger.error('获取模板信息出错') logger.exception(e) return False def download_template(self, url, file_path): try: resp = requests.get(url) with open(file_path, 'wb') as f: f.write(resp.content) return True except Exception as e: logger.error('下载模板文件出错') logger.exception(e) return False def get_file_info(self, file_id): ''' :param file_id: :return: ''' logger.info(self.headers) try: resp = requests.get(f'{self.serve_host}/mgr/document/no/auth/dcLibrary/info/{file_id}', headers=self.headers) if not self.check_resp(resp): resp = requests.get(f'{self.serve_host}/mgr/document/no/auth/dcLibrary/info/{file_id}', headers=self.headers) if not resp.json()['data']: return False return self.decryption(resp.json()['data']) except Exception as e: logger.exception(e) return False if __name__ == '__main__': client = ServerClient('http://120.195.49.22:7215', 'admin', 'jxkj123456') res = client.decryption( "") print(res)