""" OA 对接 API 基础功能。 """ import asyncio import datetime import json import apps import dock from paste.core import config from paste.core.logging import echo_log ApiUrl = config.get_config(f'dock.oa.env.{apps.get_active_env()}.api_url') """ 对接 API 根目录。 """ TokenPlatform = config.get_config(f'dock.oa.env.{apps.get_active_env()}.token_platform') """ OA Token 平台。 """ async def new_api_request(api_url: str, request_body: dict, method: str = 'POST', timeout: float = dock.DEFAULT_TIMEOUT, use_form: bool = False, headers: dict = None): """ 构造一个 API 请求对象 :param api_url: API 地址,以斜杠开头的 URI 地址,非完整 URL :param request_body: 请求体,即所有请求参数 :param method: 请求提交方式 :param timeout: 超时时长 :param use_form: 是否使用表单(Form)方式提交 :param headers: 头数据,最高优先级 :return: HTTPRequest 对象 """ # Token from dock.oa import oa_security token = await oa_security.get_token(TokenPlatform) # 构建扩展头 user_agent, browser_ver, os_name = dock.get_random_user_agent() extra_headers = { 'Content-Type': 'application/json', 'Token': token, 'User-Agent': user_agent, } if headers is not None: extra_headers = {**extra_headers, **headers} try: echo_log(json.dumps(request_body)) except Exception: echo_log(str(request_body)) # 构造请求对象 request = dock.new_http_request( url=f"{ApiUrl}{api_url}", body=request_body, method=method, timeout=timeout, use_form=use_form, extra_headers=extra_headers, ) return request # 使用 asyncio.Lock 保证线(协)程安全 _lock = asyncio.Lock() # _cache 作为内存缓存,结构 {date: counter} _cache = {} async def generate_serial_number(): """ 取得当日流水号。 :return: 流水号字符串 """ today = datetime.datetime.now().strftime("%Y%m%d") async with _lock: if today not in _cache: _cache[today] = 1 else: _cache[today] += 1 counter = _cache[today] return f"{today}{counter:05d}"