123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- import base64
- import hashlib
- import json
- import os.path
- import re
- import requests
- from Crypto.Cipher import AES
- from tools.logger_handle import logger
- from tools.oss_client import oss_handle
- class ServerClient:
- def __init__(self, server_host, user_name, password):
- self.user_name = user_name
- self.password = password
- self.serve_host = server_host
- self.access_token = None
- self.login_reply = None
- self.headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
- }
- self.login()
- def login(self):
- url = self.serve_host + '/auth/oauth2/token'
- reply = requests.post(url, params={
- 'username': self.user_name,
- 'password': self.encryption(self.password),
- 'grant_type': 'password',
- 'scope': 'server',
- 'client_id': 'knowledge',
- 'client_secret': 'knowledge'
- }, headers={'Content-Type': 'multipart/form-data',
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'})
- self.access_token = reply.json()['access_token']
- self.login_reply = reply.json()
- self.headers['Authorization'] = f'Bearer {self.access_token}'
- def decryption(self, hex_str, secret='anZz000000000000'):
- key_bytes = secret.encode('utf-8')
- iv = key_bytes
- base64_str = base64.b64encode(bytes.fromhex(hex_str)).decode('utf-8')
- cipher = AES.new(key_bytes, AES.MODE_CBC, iv)
- decrypted = cipher.decrypt(base64.b64decode(base64_str))
- try:
- decoded = decrypted.decode('utf-8')
- except UnicodeDecodeError:
- decoded = decrypted.decode('utf-8', errors='ignore')
- # 清除控制字符(类似 JS 中的 [\u0000-\u001F\u007F-\u009F])
- cleaned = re.sub(r'[\x00-\x1F\x7F-\x9F]', ' ', decoded)
- try:
- return json.loads(cleaned)
- except json.JSONDecodeError:
- return cleaned.strip()
- def zero_pad(self, data, block_size=16):
- pad_len = block_size - (len(data) % block_size)
- if pad_len == 0:
- return data
- return data + b'\x00' * pad_len
- def encryption(self, payload, secret='a25vd2xlZGdl0000'.encode('utf-8')):
- data_bytes = payload.encode('utf-8')
- padded_data = self.zero_pad(data_bytes)
- cipher = AES.new(secret, AES.MODE_CBC, secret)
- encrypted = cipher.encrypt(padded_data)
- return encrypted.hex()
- def get_base_path(self):
- url = self.serve_host + '/mgr/document/dcLibrary/knowledges/owner?size=1000¤t=1'
- reply = requests.get(url, headers=self.headers)
- reply_data = reply.json()
- if reply_data.get('code') != 0:
- return []
- data = self.decryption(reply_data['data'])
- return data.get('records', [])
- def get_folder_by_id(self, _id):
- url = f'{self.serve_host}/mgr/document/dcLibrary/tree?id={_id}'
- self.headers['documentid'] = _id
- reply = requests.get(url, headers=self.headers)
- reply_data = reply.json()
- if reply_data.get('code') != 0:
- return []
- data = self.decryption(reply_data['data'])
- return data.get('data', [])
- def check_resp(self, resp):
- if resp.json()['code'] == -2:
- self.login()
- return True
- return False
- def download_file(self, file_info, storage_path):
- try:
- resp = requests.get(f'{self.serve_host}/mgr/document/dcLibrary/file/get/file/{file_info["id"]}',
- headers=self.headers)
- local_file_name = file_info['filePath'].replace('/', '_')
- with open(os.path.join(storage_path, local_file_name), 'wb') as f:
- f.write(resp.content)
- with open(os.path.join(storage_path, local_file_name + '.metadata'), 'w', encoding='utf-8') as f:
- f.write(json.dumps({
- 'serve': oss_handle.service,
- 'access_key': oss_handle.access_key,
- 'secret_key': oss_handle.secret_key,
- 'file_id': file_info["id"],
- 'bucket_name': file_info['bucketName'],
- 'md5': self.get_file_md5(os.path.join(storage_path, local_file_name)),
- 'file_name': file_info['name'],
- 'oss_path': file_info['filePath'],
- 'update_time': file_info['updateTime']
- }))
- except Exception as e:
- logger.exception(e)
- return False
- return True
- def upload_file(self, file_path):
- try:
- with open(f'{file_path}.metadata', 'r', encoding='utf-8') as f:
- file_metadata = json.loads(f.read())
- file_id = file_metadata['file_id']
- files = [('file', (file_metadata['file_name'], open(file_path, 'rb'),
- 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'))]
- resp = requests.post(
- f'{self.serve_host}/mgr/document/dcLibrary/saveByPython/{file_id}',
- headers=self.headers,
- files=files,
- data={}
- )
- if self.check_resp(resp):
- resp = requests.post(
- f'{self.serve_host}/mgr/document/dcLibrary/saveByPython/{file_id}',
- headers=self.headers,
- files=files,
- data={})
- if resp.json()['code'] == 0:
- return True
- else:
- logger.error(resp.json())
- return False
- except Exception as e:
- logger.exception(e)
- return False
- @staticmethod
- def get_file_md5(file_path):
- md5 = hashlib.md5()
- with open(file_path, 'rb') as f:
- # 分块读取以支持大文件
- while chunk := f.read(8192):
- md5.update(chunk)
- return md5.hexdigest()
- @staticmethod
- def load_metadata(local_path):
- with open(f'{local_path}.metadata', 'r', encoding='utf-8') as f:
- file_metadata = json.loads(f.read())
- return file_metadata
- def create_cloud_file(self, local_file='D:/bussion/法律法规/道路交通安全法.docx',
- folder_id='5b091f1007c880528bd913530987d5e5'):
- payload = {'module': '/jvs-knowledge-ui/jvs-knowledge-import/'}
- files = [
- ('file', (os.path.basename(local_file), open(local_file, 'rb'),
- 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'))]
- resp = requests.post(f'http://120.195.49.22:7215/mgr/document/dcLibrary/upload/{folder_id}',
- headers=self.headers, data=payload, files=files)
- if resp.json().get('code') == 0:
- return True
- else:
- return False
- def get_folder_tree(self):
- resp = requests.get('http://120.195.49.22:7215/mgr/document/dcLibrary/tree/myFolder', headers=self.headers)
- if resp.json().get('code') == 0:
- return self.decryption(resp.json()['data'])
- else:
- return []
- def get_template_file_info(self, file_id):
- try:
- resp = requests.get(f'http://120.195.49.22:7215/mgr/document/dcLibrary/template/get/{file_id}',
- headers=self.headers)
- if resp.json().get('code') == 0:
- return self.decryption(resp.json()['data'])
- return False
- except Exception as e:
- logger.error('获取模板信息出错')
- logger.exception(e)
- return False
- def download_template(self, url, file_path):
- try:
- resp = requests.get(url)
- with open(file_path, 'wb') as f:
- f.write(resp.content)
- return True
- except Exception as e:
- logger.error('下载模板文件出错')
- logger.exception(e)
- return False
- def get_file_info(self, file_id):
- '''
- :param file_id:
- :return:
- '''
- logger.info(self.headers)
- try:
- resp = requests.get(f'{self.serve_host}/mgr/document/no/auth/dcLibrary/info/{file_id}',
- headers=self.headers)
- if not self.check_resp(resp):
- resp = requests.get(f'{self.serve_host}/mgr/document/no/auth/dcLibrary/info/{file_id}',
- headers=self.headers)
- if not resp.json()['data']:
- return False
- return self.decryption(resp.json()['data'])
- except Exception as e:
- logger.exception(e)
- return False
- if __name__ == '__main__':
- client = ServerClient('http://120.195.49.22:7215', 'admin', 'jxkj123456')
- res = client.decryption(
- "")
- # print(res)
- for row in res['data']:
- print(res)
|