""" 安全模块。 """ import asyncio import json from tornado.httpclient import HTTPResponse, HTTPRequest import dock from dock.govs import govs_api from models.token import TokenModel from paste.core import config from paste.core.logging import echo_log from paste.security import cryp_rsa from paste.util import udict from paste.web import requests public_key = """-----BEGIN PUBLIC KEY----- MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC0Jr1NzVUQMburkZT6Rkt0eaPm H8TN6E258l2tZMJgVCP/sL4oKjroKYmNPBkSSiLKFr9wwJqfesMeef6ChGRUXjG6 DX0oxQRe0f5/UnyEm/NicJwz9xwkU34gbuo1VB/EA2QZ5dl1rj9iSsiqKLK6/QFl VuzslRdAXYZC79vprwIDAQAB -----END PUBLIC KEY-----""" """ 固定公钥,确保登录成功。 """ async def login(): """ 登录政务服务 12345 系统并获取认证 Token。 流程: 1. 密码需要加密,密码明文使用PKCS1_v1_5进行RSA加密。 2. 从响应的['data']['access_token']内获取token。 Args: 无参数。 Returns: tuple: 包含两个元素的元组: - dict: DCM 接口返回的完整 JSON 响应数据 Raises: AssertionError: 登录失败(`resultInfo.success` 为 False) ValueError: 响应体非合法 JSON HTTPError: 网络请求失败(由 `async_request` 抛出) """ login_url = f"{govs_api.ApiUrl}/system/sysLogin" # 构建扩展头 user_agent, browser_ver, os_name = dock.get_random_user_agent() extra_headers = { 'Content-Type': 'application/json; charset=UTF-8', 'User-Agent': user_agent, } # 构造请求 request_body = { "username": config.get_config("dock.govs.account.username"), "password": cryp_rsa.rsa_encrypt_pkcs1_v1_5(public_key, config.get_config("dock.govs.account.password")), "tenantAccount": "suzhou", "rememberme": 1, "code": "", "uuid": "", } # 构造请求对象 request = dock.new_http_request( url=login_url, body=request_body, method='POST', timeout=dock.DEFAULT_TIMEOUT, use_form=False, extra_headers=extra_headers, ** govs_api.ProxyConfig ) queue = asyncio.Queue() await queue.put(request) await requests.async_concurrency( queue, con_count=1, retry=dock.MAX_RETRY_COUNT, after_request=after_login ) async def after_login(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): response_body = response.body.decode() response_data = json.loads(response_body) success = udict.get_by_path(response_data, 'data.access_token', '') if success: await TokenModel.refresh(platform='GOVS', token=success) echo_log(f"成功刷新省12345登录令牌.") else: echo_log(f"省12345登录失败,无法刷新令牌,响应:{response_body}") if retry_queue: echo_log(f"登录重试队列中有:{retry_queue.qsize()} 个请求在等待.") return response_data async def get_token(platform: str = 'GOVS'): """ 取得可用 Token。 :param platform: 要查询的平台,默认是:GOVS,省12345 :return: Cookies 字符串 """ _token = await TokenModel.find_by_platform(platform) return _token.token if __name__ == "__main__": from paste.core import aio_pool _runner = aio_pool.get_aio_runner() _runner(login())