""" 市12345对接 API 基础功能。 """ from tornado.httpclient import AsyncHTTPClient import dock from paste.core import config ApiUrl = "http://2.46.12.176:8091/sz12345" """ 对接 API 根目录。 """ ProxyConfig = config.get_config('dock.govc.proxy') """ 代理服务器配置。 """ if ProxyConfig and ProxyConfig.get('proxy_host', None) and ProxyConfig.get('proxy_port', None): # 切换到底层实现,以便代理服务器生效 AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient") async def new_api_request(api_url: str, request_body: dict, method: str = 'POST', timeout: float = dock.DEFAULT_TIMEOUT, use_form: bool = True, headers: dict = None): """ 构造一个 API 请求对象 :param api_url: API 地址,以斜杠开头的 URI 地址,非完整 URL :param request_body: 请求体,即所有请求参数 :param method: 请求提交方式 :param timeout: 超时时长 :param use_form: 是否使用表单(Form)方式提交 :param headers: 头数据,最高优先级 :return: HTTPRequest 对象 """ # Cookie from dock.govc import govc_security _, cookie_header = await govc_security.get_cookies() # 构建扩展头 user_agent, browser_ver, os_name = dock.get_random_user_agent() extra_headers = { 'Cookie': cookie_header, 'EPTOKEN': dock.get_cookie_value(cookie_header, 'EPTOKEN'), 'Host': '2.46.12.176:8091', 'Origin': 'http://2.46.12.176:8091', 'Referer': 'http://2.46.12.176:8091/sz12345/bmfw/bmfwlogin/login', 'User-Agent': user_agent, } if headers is not None: extra_headers = {**extra_headers, **headers} # 构造请求对象 request = dock.new_http_request( url=f"{ApiUrl}{api_url}", body=request_body, method=method, timeout=timeout, use_form=use_form, extra_headers=extra_headers, ** ProxyConfig ) return request