""" 安全模块。 """ import asyncio import json from tornado.httpclient import HTTPResponse, HTTPRequest import dock from dock.dcm import dcm_api from models.token import TokenModel from paste.core import config from paste.core.logging import echo_log from paste.util import udict from paste.web import requests async def login(): """ 登录 DCM 系统并获取认证 Cookie 和响应数据。 流程: 1. 访问 `/main.htm` 获取服务端下发的初始 Cookie(如 JSESSIONID)。 2. 使用该 Cookie 构造请求头,向 `/login/validpassword` 发送 POST 请求。 3. 请求体包含用户名、密码、浏览器及操作系统信息(从配置和随机生成获取)。 4. 验证响应中 `resultInfo.success` 字段,若为 False 则抛出断言错误。 Args: 无参数。 Returns: tuple: 包含两个元素的元组: - str: 请求头中的 Cookie 字符串(如 "JSESSIONID=abc; ...") - dict: DCM 接口返回的完整 JSON 响应数据 Raises: AssertionError: 登录失败(`resultInfo.success` 为 False) ValueError: 响应体非合法 JSON HTTPError: 网络请求失败(由 `async_request` 抛出) """ home_url = f"{dcm_api.ApiUrl}/main.htm" login_url = f"{dcm_api.ApiUrl}/login/validpassword" # 获取初始 Cookie initial_cookies = await dock.scrape_cookies(home_url) cookie_header = "; ".join([f"{k}={v}" for k, v in initial_cookies.items()]) echo_log(cookie_header) # 构建扩展头 user_agent, browser_ver, os_name = dock.get_random_user_agent() extra_headers = { 'Cookie': cookie_header, 'User-Agent': user_agent, } # 构造请求 request_body = { "u": config.get_config("dock.dcm.account.u"), "p": config.get_config("dock.dcm.account.p"), "ip": "", "browserVersion": browser_ver, "osVersion": os_name, "validCode": "", "validWay": 0, } # 构造请求对象 request = dock.new_http_request( url=login_url, body=request_body, method='POST', timeout=dock.DEFAULT_TIMEOUT, use_form=True, extra_headers=extra_headers, ) setattr(request, 'cookie_header', cookie_header) queue = asyncio.Queue() await queue.put(request) await requests.async_concurrency( queue, con_count=1, retry=dock.MAX_RETRY_COUNT, after_request=after_login ) async def after_login(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): response_body = response.body.decode() response_data = json.loads(response_body) success = udict.get_by_path(response_data, 'resultInfo.success', False) if success: cookie_header = getattr(response.request, 'cookie_header') await TokenModel.refresh(platform='数字城管', token=cookie_header) echo_log(f"成功刷新数字城管登录令牌.") else: echo_log(f"数字城管登录失败,无法刷新令牌,响应:{response_body}") if retry_queue: echo_log(f"登录重试队列中有:{retry_queue.qsize()} 个请求在等待.") return response_data async def get_cookies(platform: str = '数字城管'): """ 取得可用 Cookies。目前固定,后期改为从数据库读取。 :param platform: 要查询的平台,默认是:数字城管 :return: Cookies 字符串 """ _token = await TokenModel.find_by_platform(platform) return _token.token if __name__ == "__main__": from paste.core import aio_pool _runner = aio_pool.get_aio_runner() _runner(login())