Files
d3i-szct/dock/govs/govs_security.py
T
2026-06-02 17:46:38 +08:00

116 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
安全模块。
"""
import asyncio
import json
from tornado.httpclient import HTTPResponse, HTTPRequest
import dock
from dock.govs import govs_api
from models.token import TokenModel
from paste.core import config
from paste.core.logging import echo_log
from paste.security import cryp_rsa
from paste.util import udict
from paste.web import requests
public_key = """-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC0Jr1NzVUQMburkZT6Rkt0eaPm
H8TN6E258l2tZMJgVCP/sL4oKjroKYmNPBkSSiLKFr9wwJqfesMeef6ChGRUXjG6
DX0oxQRe0f5/UnyEm/NicJwz9xwkU34gbuo1VB/EA2QZ5dl1rj9iSsiqKLK6/QFl
VuzslRdAXYZC79vprwIDAQAB
-----END PUBLIC KEY-----"""
"""
固定公钥,确保登录成功。
"""
async def login():
"""
登录政务服务 12345 系统并获取认证 Token。
流程:
1. 密码需要加密,密码明文使用PKCS1_v1_5进行RSA加密。
2. 从响应的['data']['access_token']内获取token。
Args:
无参数。
Returns:
tuple: 包含两个元素的元组:
- dict: DCM 接口返回的完整 JSON 响应数据
Raises:
AssertionError: 登录失败(`resultInfo.success` 为 False
ValueError: 响应体非合法 JSON
HTTPError: 网络请求失败(由 `async_request` 抛出)
"""
login_url = f"{govs_api.ApiUrl}/system/sysLogin"
# 构建扩展头
user_agent, browser_ver, os_name = dock.get_random_user_agent()
extra_headers = {
'Content-Type': 'application/json; charset=UTF-8',
'User-Agent': user_agent,
}
# 构造请求
request_body = {
"username": config.get_config("dock.govs.account.username"),
"password": cryp_rsa.rsa_encrypt_pkcs1_v1_5(public_key, config.get_config("dock.govs.account.password")),
"tenantAccount": "suzhou",
"rememberme": 1,
"code": "",
"uuid": "",
}
# 构造请求对象
request = dock.new_http_request(
url=login_url,
body=request_body,
method='POST',
timeout=dock.DEFAULT_TIMEOUT,
use_form=False,
extra_headers=extra_headers,
** govs_api.ProxyConfig
)
queue = asyncio.Queue()
await queue.put(request)
await requests.async_concurrency(
queue, con_count=1, retry=dock.MAX_RETRY_COUNT,
after_request=after_login
)
async def after_login(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
response_body = response.body.decode()
response_data = json.loads(response_body)
success = udict.get_by_path(response_data, 'data.access_token', '')
if success:
await TokenModel.refresh(platform='GOVS', token=success)
echo_log(f"成功刷新省12345登录令牌.")
else:
echo_log(f"省12345登录失败,无法刷新令牌,响应:{response_body}")
if retry_queue:
echo_log(f"登录重试队列中有:{retry_queue.qsize()} 个请求在等待.")
return response_data
async def get_token(platform: str = 'GOVS'):
"""
取得可用 Token。
:param platform: 要查询的平台,默认是:GOVS,省12345
:return: Cookies 字符串
"""
_token = await TokenModel.find_by_platform(platform)
return _token.token
if __name__ == "__main__":
from paste.core import aio_pool
_runner = aio_pool.get_aio_runner()
_runner(login())