serve_client.py 8.8 KB


  1. import base64
  2. import hashlib
  3. import json
  4. import os.path
  5. import re
  6. import requests
  7. from Crypto.Cipher import AES
  8. from config import TARGET_URL
  9. from tools.logger_handle import logger
  10. class ServerClient:
  11. def __init__(self, server_host, user_name, password):
  12. self.user_name = user_name
  13. self.password = password
  14. self.serve_host = server_host
  15. self.access_token = None
  16. self.login_reply = None
  17. self.headers = {
  18. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
  19. }
  20. self.login()
  21. def login(self):
  22. url = self.serve_host + '/auth/oauth2/token'
  23. reply = requests.post(url, params={
  24. 'username': self.user_name,
  25. 'password': self.encryption(self.password),
  26. 'grant_type': 'password',
  27. 'scope': 'server',
  28. 'client_id': 'knowledge',
  29. 'client_secret': 'knowledge'
  30. }, headers={'Content-Type': 'multipart/form-data',
  31. 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'})
  32. self.access_token = reply.json()['access_token']
  33. self.login_reply = reply.json()
  34. self.headers['Authorization'] = f'Bearer {self.access_token}'
  35. def decryption(self, hex_str, secret='anZz000000000000'):
  36. key_bytes = secret.encode('utf-8')
  37. iv = key_bytes
  38. base64_str = base64.b64encode(bytes.fromhex(hex_str)).decode('utf-8')
  39. cipher = AES.new(key_bytes, AES.MODE_CBC, iv)
  40. decrypted = cipher.decrypt(base64.b64decode(base64_str))
  41. try:
  42. decoded = decrypted.decode('utf-8')
  43. except UnicodeDecodeError:
  44. decoded = decrypted.decode('utf-8', errors='ignore')
  45. # 清除控制字符(类似 JS 中的 [\u0000-\u001F\u007F-\u009F])
  46. cleaned = re.sub(r'[\x00-\x1F\x7F-\x9F]', ' ', decoded)
  47. try:
  48. return json.loads(cleaned)
  49. except json.JSONDecodeError:
  50. return cleaned.strip()
  51. def zero_pad(self, data, block_size=16):
  52. pad_len = block_size - (len(data) % block_size)
  53. if pad_len == 0:
  54. return data
  55. return data + b'\x00' * pad_len
  56. def encryption(self, payload, secret='a25vd2xlZGdl0000'.encode('utf-8')):
  57. data_bytes = payload.encode('utf-8')
  58. padded_data = self.zero_pad(data_bytes)
  59. cipher = AES.new(secret, AES.MODE_CBC, secret)
  60. encrypted = cipher.encrypt(padded_data)
  61. return encrypted.hex()
  62. def get_base_path(self):
  63. url = self.serve_host + '/mgr/document/dcLibrary/knowledges/owner?size=1000&current=1'
  64. reply = requests.get(url, headers=self.headers)
  65. reply_data = reply.json()
  66. if reply_data.get('code') != 0:
  67. return []
  68. data = self.decryption(reply_data['data'])
  69. return data.get('records', [])
  70. def get_folder_by_id(self, _id):
  71. url = f'{self.serve_host}/mgr/document/dcLibrary/tree?id={_id}'
  72. self.headers['documentid'] = _id
  73. reply = requests.get(url, headers=self.headers)
  74. reply_data = reply.json()
  75. if reply_data.get('code') != 0:
  76. return []
  77. data = self.decryption(reply_data['data'])
  78. return data.get('data', [])
  79. def check_resp(self, resp):
  80. if resp.json()['code'] == -2:
  81. self.login()
  82. return True
  83. return False
  84. def download_file(self, file_info, storage_path):
  85. try:
  86. resp = requests.get(f'{self.serve_host}/mgr/document/dcLibrary/file/get/file/{file_info["id"]}',
  87. headers=self.headers)
  88. local_file_name = file_info['filePath'].replace('/', '_')
  89. with open(os.path.join(storage_path, local_file_name), 'wb') as f:
  90. f.write(resp.content)
  91. with open(os.path.join(storage_path, local_file_name + '.metadata'), 'w', encoding='utf-8') as f:
  92. f.write(json.dumps({
  93. 'file_id': file_info["id"],
  94. 'bucket_name': file_info['bucketName'],
  95. 'md5': self.get_file_md5(os.path.join(storage_path, local_file_name)),
  96. 'file_name': file_info['name'],
  97. 'oss_path': file_info['filePath'],
  98. 'update_time': file_info['updateTime']
  99. }))
  100. except Exception as e:
  101. logger.exception(e)
  102. return False
  103. return True
  104. def upload_file(self, file_path):
  105. try:
  106. with open(f'{file_path}.metadata', 'r', encoding='utf-8') as f:
  107. file_metadata = json.loads(f.read())
  108. file_id = file_metadata['file_id']
  109. files = [('file', (file_metadata['file_name'], open(file_path, 'rb'),
  110. 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'))]
  111. resp = requests.post(
  112. f'{self.serve_host}/mgr/document/dcLibrary/saveByPython/{file_id}',
  113. headers=self.headers,
  114. files=files,
  115. data={}
  116. )
  117. if self.check_resp(resp):
  118. resp = requests.post(
  119. f'{self.serve_host}/mgr/document/dcLibrary/saveByPython/{file_id}',
  120. headers=self.headers,
  121. files=files,
  122. data={})
  123. if resp.json()['code'] == 0:
  124. return True
  125. else:
  126. logger.error(resp.json())
  127. return False
  128. except Exception as e:
  129. logger.exception(e)
  130. return False
  131. @staticmethod
  132. def get_file_md5(file_path):
  133. md5 = hashlib.md5()
  134. with open(file_path, 'rb') as f:
  135. # 分块读取以支持大文件
  136. while chunk := f.read(8192):
  137. md5.update(chunk)
  138. return md5.hexdigest()
  139. @staticmethod
  140. def load_metadata(local_path):
  141. with open(f'{local_path}.metadata', 'r', encoding='utf-8') as f:
  142. file_metadata = json.loads(f.read())
  143. return file_metadata
  144. def create_cloud_file(self, local_file='D:/bussion/法律法规/道路交通安全法.docx',
  145. folder_id='5b091f1007c880528bd913530987d5e5'):
  146. payload = {'module': '/jvs-knowledge-ui/jvs-knowledge-import/'}
  147. files = [
  148. ('file', (os.path.basename(local_file), open(local_file, 'rb'),
  149. 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'))]
  150. resp = requests.post(f'{TARGET_URL}/mgr/document/dcLibrary/upload/{folder_id}',
  151. headers=self.headers, data=payload, files=files)
  152. if resp.json().get('code') == 0:
  153. return True
  154. else:
  155. return False
  156. def get_folder_tree(self):
  157. resp = requests.get(f'{TARGET_URL}/mgr/document/dcLibrary/tree/myFolder', headers=self.headers)
  158. if resp.json().get('code') == 0:
  159. return self.decryption(resp.json()['data'])
  160. else:
  161. return []
  162. def get_template_file_info(self, file_id):
  163. try:
  164. resp = requests.get(f'{TARGET_URL}/mgr/document/dcLibrary/template/get/{file_id}',
  165. headers=self.headers)
  166. if resp.json().get('code') == 0:
  167. return self.decryption(resp.json()['data'])
  168. return False
  169. except Exception as e:
  170. logger.error('获取模板信息出错')
  171. logger.exception(e)
  172. return False
  173. def download_template(self, url, file_path):
  174. try:
  175. resp = requests.get(url)
  176. with open(file_path, 'wb') as f:
  177. f.write(resp.content)
  178. return True
  179. except Exception as e:
  180. logger.error('下载模板文件出错')
  181. logger.exception(e)
  182. return False
  183. def get_file_info(self, file_id):
  184. '''
  185. :param file_id:
  186. :return:
  187. '''
  188. logger.info(self.headers)
  189. try:
  190. resp = requests.get(f'{self.serve_host}/mgr/document/no/auth/dcLibrary/info/{file_id}',
  191. headers=self.headers)
  192. if not self.check_resp(resp):
  193. resp = requests.get(f'{self.serve_host}/mgr/document/no/auth/dcLibrary/info/{file_id}',
  194. headers=self.headers)
  195. if not resp.json()['data']:
  196. return False
  197. return self.decryption(resp.json()['data'])
  198. except Exception as e:
  199. logger.exception(e)
  200. return False
  201. if __name__ == '__main__':
  202. client = ServerClient(f'{TARGET_URL}', 'admin', 'jxkj123456')
  203. res = client.decryption(
  204. "87ed629e963fcf0f573ef6daeaa30a9cde5b2659ab015f8bbc67115ab70771f3ab48734220dc18715f66e1524560fd23")
  205. print(res)