""" 安全模块。 """ import asyncio import base64 import io import json import re from typing import Optional import ddddocr from PIL import Image, ImageFilter, ImageEnhance from gmssl import sm2 from tornado.httpclient import HTTPResponse, HTTPRequest import dock from dock.govc import govc_api from models.token import TokenModel from paste.core import config from paste.core.logging import echo_log from paste.util import udict from paste.web import requests async def fetch_captcha(): verify_code_img: Optional[str] = None def after_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): nonlocal verify_code_img response_body = response.body.decode() response_data = json.loads(response_body) controls = udict.get_by_path(response_data, 'controls') verify_code_img = udict.get_by_path(controls[0], 'data.src') # 请求路径 pub_key_url = '/rest/bmfw/bmfwlogin/loginaction/page_Refresh?isCommondto=true' # 构建请求参数 common_dto_str = json.dumps([ { "id": "verifyCode", "type": "verifycode", "action": "loginaction.pageLoad", "mapClass": "com.epoint.basic.faces.verifycode.VerifyCode", "width": "35%", "height": "43px", "charLength": 4, "ignorecase": True, "value": "random" }, { "id": "_common_hidden_viewdata", "type": "hidden", "value": "{\"pageUrl\":\"E86A24CDBC744150F0A28F52940E2E9342802C4870563C30D121F50510AD3184\"}" } ], separators=(',', ':')) cmd_params_str = json.dumps({ "pageUrl": "http://2.46.12.176:8091/sz12345/bmfw/bmfwlogin/login" }, separators=(',', ':')) body_data = { "commonDto": common_dto_str, "cmdParams": cmd_params_str } verify_code_request = dock.new_http_request( f"{govc_api.ApiUrl}{pub_key_url}", body_data, use_form=True, **govc_api.ProxyConfig ) request_queue = asyncio.Queue() await request_queue.put(verify_code_request) await requests.async_concurrency( request_queue, con_count=1, retry=1, after_request=after_request ) return verify_code_img def enhance_captcha(img_bytes): """ 利用 PIL 对图像进行:去噪 + 增强对比度处理。 :param img_bytes: 图像字节数据 :return: 增强后的图像数据 """ img = Image.open(io.BytesIO(img_bytes)) # 去噪 img = img.filter(ImageFilter.MedianFilter()) # 增强对比度 enhancer = ImageEnhance.Contrast(img) img = enhancer.enhance(1.5) # 1.5倍对比度 # 转回字节流 buf = io.BytesIO() img.save(buf, format='PNG') return buf.getvalue() async def read_verify_code(): """ 读取验证码。 :return: 验证码 """ verify_code = "" def validate_captcha(code: str) -> bool: """ 验证验证码是否符合要求。 """ return bool(re.fullmatch(r'[A-Za-z0-9]{4}', code)) while not validate_captcha(verify_code): base64_str = await fetch_captcha() img_data = base64_str.split(',')[1] img_data += '=' * (-len(img_data) % 4) img_bytes = base64.b64decode(img_data) # 利用 PIL 预处理图像 img_bytes = enhance_captcha(img_bytes) ocr = ddddocr.DdddOcr(show_ad=False) verify_code = ocr.classification(img_bytes) echo_log(verify_code) return verify_code async def fetch_public_key(): sm2_pub_key: Optional[str] = None def after_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): nonlocal sm2_pub_key response_body = response.body.decode() response_data = json.loads(response_body) sm2_pub_key = udict.get_by_path(response_data, 'custom.sm2PubKey') pub_key_url = '/rest/loginaction/autoLoad?isCommondto=true' body_data = { "commonDto": json.dumps([]) # 空数组,表示没有额外数据 } public_key_request = dock.new_http_request( f"{govc_api.ApiUrl}{pub_key_url}", body_data, **govc_api.ProxyConfig ) request_queue = asyncio.Queue() await request_queue.put(public_key_request) await requests.async_concurrency( request_queue, con_count=1, retry=dock.MAX_RETRY_COUNT, after_request=after_request ) return sm2_pub_key def js_escape(s: str) -> str: """模拟 JavaScript escape""" result = [] for ch in s: code = ord(ch) if (65 <= code <= 90) or (97 <= code <= 122) or (48 <= code <= 57): result.append(ch) elif code == 32: result.append("%20") elif code in [42, 45, 46, 47, 64, 95]: result.append(ch) elif code <= 0xFF: result.append(f"%{code:02X}") else: result.append(f"%u{code:04X}") return ''.join(result) def unicode_escape_to_utf8(escaped: str) -> str: """将 %uXXXX 转换成 UTF-8 的 %XX 形式""" import re def repl(m): code = int(m.group(1), 16) utf8_bytes = code.to_bytes((code.bit_length() + 7) // 8, 'big') return ''.join(f'%{b:02X}' for b in utf8_bytes) return re.sub(r'%u([0-9A-Fa-f]{4})', repl, escaped) def sm2_encrypt(plain_text: str, public_key_hex: str) -> str: escaped = js_escape(plain_text) encoded = unicode_escape_to_utf8(escaped) # SM2 加密(模拟前端 sm2Encrypt 内部逻辑) # 前端:CryptoJS.enc.Utf8.parse(encoded) → Base64.stringify → CryptoJS.enc.Utf8.parse → SM2 加密 utf8_bytes = encoded.encode('utf-8') base64_str = base64.b64encode(utf8_bytes).decode('ascii') # 再次做 UTF-8 编码作为加密输入 final_bytes = base64_str.encode('utf-8') # C1C3C2 加密 sm2_crypt = sm2.CryptSM2(public_key=public_key_hex, private_key="") encrypted = sm2_crypt.encrypt(final_bytes) return '04' + encrypted.hex() async def build_login_common_dto( username: str, password: str ) -> tuple[str, str]: """ 构造登录请求的 commonDto 参数(符合服务器要求) Args: username: 用户名 password: 密码 Returns: (commonDto, cmdParams) 元组 """ # 获取公钥 pub_key = await fetch_public_key() if not pub_key: raise Exception("获取 SM2 公钥失败") # 使用 SM2 加密 encrypted_username = sm2_encrypt(username, pub_key) encrypted_password = sm2_encrypt(password, pub_key) # # 读取验证码 # verify_code = await read_verify_code() # 构造 commonDto common_dto_data = [ { "id": "_common_hidden_viewdata", "type": "hidden", "value": "" } ] # 构造 cmdParams # 格式: [加密用户名, 加密密码, loginType, false, verifyCodeRandom] cmd_params = [ encrypted_username, # 加密后的用户名 encrypted_password, # 加密后的密码 "0", # 固定值 False, # 固定值 f"#undefined #verifyCode", # 验证码随机串 ] # 转为 JSON 字符串并将双引号替换为单引号 common_dto_str = json.dumps(common_dto_data, separators=(',', ':')) cmd_params_str = json.dumps(cmd_params, separators=(',', ':')) return common_dto_str, cmd_params_str async def login(): """ 登录政务服务 12345 系统并获取认证 Token。 流程: 1. 从市12345平台获取公钥。 2. 模拟前端的编码和加密过程。 3. 提交请求完成登陆。 Args: 无参数。 Returns: tuple: 包含两个元素的元组: - dict: DCM 接口返回的完整 JSON 响应数据 Raises: AssertionError: 登录失败(`resultInfo.success` 为 False) ValueError: 响应体非合法 JSON HTTPError: 网络请求失败(由 `async_request` 抛出) """ login_url = f"{govc_api.ApiUrl}/rest/bmfw/bmfwlogin/loginaction/login?isCommondto=true" # 构建扩展头 user_agent, browser_ver, os_name = dock.get_random_user_agent() extra_headers = { 'Host': '2.46.12.176:8091', 'Referer': 'http://2.46.12.176:8091/sz12345/bmfw/bmfwlogin/login', 'User-Agent': user_agent, 'X-Requested-With': 'XMLHttpRequest', } # 构造 commonDto common_dto, cmd_params = await build_login_common_dto( config.get_config("dock.govc.account.username"), config.get_config("dock.govc.account.password"), ) # 构造请求 request_body = { "commonDto": common_dto, "cmdParams": cmd_params, } # 构造请求对象 request = dock.new_http_request( url=login_url, body=request_body, method='POST', timeout=dock.DEFAULT_TIMEOUT, use_form=True, extra_headers=extra_headers, **govc_api.ProxyConfig ) async def after_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): cookies_data = dock.get_cookies(response) cookies = "; ".join([f"{k}={v}" for k, v in cookies_data.items()]) await first_login(cookies) queue = asyncio.Queue() await queue.put(request) await requests.async_concurrency( queue, con_count=1, retry=dock.MAX_RETRY_COUNT, after_request=after_request ) async def first_login(cookies: str): grace_url = f"{govc_api.ApiUrl}/rest/szbmfw/szdesktop/szdeptindexaction/getIsFirstLogin?isCommondto=true" # 构建扩展头 user_agent, browser_ver, os_name = dock.get_random_user_agent() extra_headers = { 'Cookie': cookies, 'Host': '2.46.12.176:8091', 'Referer': 'http://2.46.12.176:8091/sz12345/bmfw/bmfwlogin/login', 'User-Agent': user_agent, 'X-Requested-With': 'XMLHttpRequest', } async def after_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): # 读取 epoint_user_loginid response_body = response.body.decode() response_data = json.loads(response_body) controls: list[dict] = response_data.get('controls', []) epoint_user_loginid = json.loads(controls[0].get('value', '')).get('epoint_user_loginid', '') # 读取并组合 cookies cookies_data = dock.get_cookies(response) full_cookies = "; ".join([f"{k}={v}" for k, v in cookies_data.items()]) full_cookies = f"{full_cookies}; {cookies}" # 组合 token token = json.dumps( { 'epoint_user_loginid': epoint_user_loginid, 'cookies': full_cookies, }, separators=(',', ':') ) await TokenModel.refresh(platform='GOVC', token=token) echo_log(f"成功刷新市12345登录令牌.") grace_request = dock.new_http_request( grace_url, {}, 'GET', extra_headers=extra_headers, **govc_api.ProxyConfig ) request_queue = asyncio.Queue() await request_queue.put(grace_request) await requests.async_concurrency( request_queue, con_count=1, retry=dock.MAX_RETRY_COUNT, after_request=after_request ) async def get_cookies(platform: str = 'GOVC'): """ 取得可用 Cookies。 :param platform: 要查询的平台,默认是:GOVC,市12345 :return: epoint_user_loginid, cookies """ _token_str = await TokenModel.find_by_platform(platform) _token = json.loads(_token_str.token) return _token.get('epoint_user_loginid', ''), _token.get('cookies', '') if __name__ == "__main__": from paste.core import aio_pool _runner = aio_pool.get_aio_runner() _runner(login())