116 lines
3.3 KiB
Python
116 lines
3.3 KiB
Python
"""
|
||
安全模块。
|
||
"""
|
||
import asyncio
|
||
import json
|
||
|
||
from tornado.httpclient import HTTPResponse, HTTPRequest
|
||
|
||
import dock
|
||
from dock.govs import govs_api
|
||
from models.token import TokenModel
|
||
from paste.core import config
|
||
from paste.core.logging import echo_log
|
||
from paste.security import cryp_rsa
|
||
from paste.util import udict
|
||
from paste.web import requests
|
||
|
||
|
||
public_key = """-----BEGIN PUBLIC KEY-----
|
||
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC0Jr1NzVUQMburkZT6Rkt0eaPm
|
||
H8TN6E258l2tZMJgVCP/sL4oKjroKYmNPBkSSiLKFr9wwJqfesMeef6ChGRUXjG6
|
||
DX0oxQRe0f5/UnyEm/NicJwz9xwkU34gbuo1VB/EA2QZ5dl1rj9iSsiqKLK6/QFl
|
||
VuzslRdAXYZC79vprwIDAQAB
|
||
-----END PUBLIC KEY-----"""
|
||
"""
|
||
固定公钥,确保登录成功。
|
||
"""
|
||
|
||
|
||
async def login():
|
||
"""
|
||
登录政务服务 12345 系统并获取认证 Token。
|
||
|
||
流程:
|
||
1. 密码需要加密,密码明文使用PKCS1_v1_5进行RSA加密。
|
||
2. 从响应的['data']['access_token']内获取token。
|
||
|
||
Args:
|
||
无参数。
|
||
|
||
Returns:
|
||
tuple: 包含两个元素的元组:
|
||
- dict: DCM 接口返回的完整 JSON 响应数据
|
||
|
||
Raises:
|
||
AssertionError: 登录失败(`resultInfo.success` 为 False)
|
||
ValueError: 响应体非合法 JSON
|
||
HTTPError: 网络请求失败(由 `async_request` 抛出)
|
||
"""
|
||
login_url = f"{govs_api.ApiUrl}/system/sysLogin"
|
||
|
||
# 构建扩展头
|
||
user_agent, browser_ver, os_name = dock.get_random_user_agent()
|
||
extra_headers = {
|
||
'Content-Type': 'application/json; charset=UTF-8',
|
||
'User-Agent': user_agent,
|
||
}
|
||
|
||
# 构造请求
|
||
request_body = {
|
||
"username": config.get_config("dock.govs.account.username"),
|
||
"password": cryp_rsa.rsa_encrypt_pkcs1_v1_5(public_key, config.get_config("dock.govs.account.password")),
|
||
"tenantAccount": "suzhou",
|
||
"rememberme": 1,
|
||
"code": "",
|
||
"uuid": "",
|
||
}
|
||
|
||
# 构造请求对象
|
||
request = dock.new_http_request(
|
||
url=login_url,
|
||
body=request_body,
|
||
method='POST',
|
||
timeout=dock.DEFAULT_TIMEOUT,
|
||
use_form=False,
|
||
extra_headers=extra_headers,
|
||
** govs_api.ProxyConfig
|
||
)
|
||
|
||
queue = asyncio.Queue()
|
||
await queue.put(request)
|
||
await requests.async_concurrency(
|
||
queue, con_count=1, retry=dock.MAX_RETRY_COUNT,
|
||
after_request=after_login
|
||
)
|
||
|
||
|
||
async def after_login(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
|
||
response_body = response.body.decode()
|
||
response_data = json.loads(response_body)
|
||
success = udict.get_by_path(response_data, 'data.access_token', '')
|
||
if success:
|
||
await TokenModel.refresh(platform='GOVS', token=success)
|
||
echo_log(f"成功刷新省12345登录令牌.")
|
||
else:
|
||
echo_log(f"省12345登录失败,无法刷新令牌,响应:{response_body}")
|
||
if retry_queue:
|
||
echo_log(f"登录重试队列中有:{retry_queue.qsize()} 个请求在等待.")
|
||
return response_data
|
||
|
||
|
||
async def get_token(platform: str = 'GOVS'):
|
||
"""
|
||
取得可用 Token。
|
||
|
||
:param platform: 要查询的平台,默认是:GOVS,省12345
|
||
:return: Cookies 字符串
|
||
"""
|
||
_token = await TokenModel.find_by_platform(platform)
|
||
return _token.token
|
||
|
||
|
||
if __name__ == "__main__":
|
||
from paste.core import aio_pool
|
||
_runner = aio_pool.get_aio_runner()
|
||
_runner(login()) |