import asyncio import json from tornado.httpclient import HTTPResponse, HTTPRequest import dock from dock.dcm import dcm_api from models.dcm_task import DcmTask from paste.util import udict from paste.web import requests async def get_allow_postpone_request( rec_id: int, act_id: int, task_num: str, other_task_num: str, task_list_id: int = 600058, task_list_name: str = "部门待办栏" ): """ 获取 DCM 企业待办的是否允许申请延期。 向 DCM 的是否允许申请延期接口发送 GET 请求,获取与指定关系 ID 和类型 ID 关联的可用菜单信息。 自动注入有效的 Cookie(如 JSESSIONID)至请求头,并解析返回的 JSON 数据。 Args: rec_id (int): 记录 ID act_id (int): 任务 ID task_num (str): 任务号 other_task_num (str): 第三方任务号 task_list_id: 任务列表类型 ID,默认为企业待办:600058 task_list_name: 任务列表名称,默认为:部门待办栏 """ api_url = f"/home/workflow/wfitemtypecheck?itemType=postpone" request_body = { "recID": rec_id, "actID": act_id, "tasknum": task_num, "otherTaskNum": other_task_num, "taskListID": task_list_id, "taskListName": task_list_name, "recDispNum": "", "menuName": "assign", "menuDisplayName": "办理", } # 构造 API 请求 return await dcm_api.new_api_request(api_url, request_body, 'POST', timeout=30) async def after_allow_postpone_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): response_body = response.body.decode() response_data = json.loads(response_body) setattr(response.request, "response_data", response_data) async def fetch_allow_postpone(dcm_task: DcmTask): """ 抓取是否允许申请延期。 :param dcm_task: 数字城管待办工单 :return: """ # 取得请求对象 list_menu_request = await get_allow_postpone_request( dcm_task.rec_id, dcm_task.act_id, dcm_task.task_num, dcm_task.other_task_num ) request_queue = asyncio.Queue() await request_queue.put(list_menu_request) await requests.async_concurrency( request_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_request=after_allow_postpone_request ) response_data = getattr(list_menu_request, "response_data") success: bool = udict.get_by_path(response_data, 'resultInfo.success') message: str = udict.get_by_path(response_data, 'resultInfo.message') return success, message if __name__ == "__main__": from paste.core import aio_pool async def test(dcm_task_id): dcm_task = await DcmTask(id=dcm_task_id).async_find_first() assert dcm_task, f"未找到待办工单,工单ID:{dcm_task_id}" await fetch_allow_postpone(dcm_task) _runner = aio_pool.get_aio_runner() _runner(test(2054174091287597056))