import asyncio import json from sqlalchemy import select from tornado.httpclient import HTTPResponse, HTTPRequest import dock from dock.dcm import dcm_api from models.dcm_task import DcmTask from paste.core.logging import echo_log from paste.util import udict from paste.web import requests async def get_operation_request( rec_id: int, act_id: int, task_num: str, other_task_num: str, task_list_id: int = 600058, task_list_name: str = "部门待办栏" ): """ 获取 DCM 企业待办的可用菜单列表。 向 DCM 的可用菜单列表接口发送 GET 请求,获取与指定关系 ID 和类型 ID 关联的可用菜单信息。 自动注入有效的 Cookie(如 JSESSIONID)至请求头,并解析返回的 JSON 数据。 Args: rec_id (int): 记录 ID act_id (int): 任务 ID task_num (str): 任务号 other_task_num (str): 第三方任务号 task_list_id: 任务列表类型 ID,默认为企业待办:600058 task_list_name: 任务列表名称,默认为:部门待办栏 """ api_url = f"/home/workflow/assign" request_body = { "recID": rec_id, "actID": act_id, "tasknum": task_num, "otherTaskNum": other_task_num, "taskListID": task_list_id, "taskListName": task_list_name, "recDispNum": "", "menuName": "assign", "menuDisplayName": "办理", } # 构造 API 请求 return await dcm_api.new_api_request(api_url, request_body, 'GET', timeout=30) async def after_operation_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): response_body = response.body.decode() response_data = json.loads(response_body) menus: list[dict] = udict.get_by_path(response_data, 'resultInfo.data.menuData.menus') operation: list[str] = [] if menus is None: menus = [] for menu in menus: if menu.get('menuName') != 'unassign' and menu.get('visible', False) and menu.get('displayProperty', 0) & 2 == 2: # 排除撤销办理,和不显示的菜单,仅返回可用的操作菜单 operation.append(menu.get('displayName')) setattr(response.request, "response_data", response_data) setattr(response.request, "operation", response_data) dcm_task_id = getattr(response.request, 'dcm_task_id') dcm_task = await DcmTask(id=dcm_task_id).async_find_first() dcm_task.operation = ','.join(operation) await dcm_task.async_save() rec_id = getattr(response.request, 'rec_id') echo_log(f"成功更新企业待办 {rec_id} 的可用操作:{dcm_task.operation}.") if retry_queue: echo_log(f"企业待办可用操作重试队列中有:{retry_queue.qsize()} 个请求在等待.") async def fetch_operation(dcm_task: DcmTask): """ 抓取列表菜单可用操作。 :param dcm_task: 数字城管待办工单 :return: """ # 取得请求对象 operation_request = await get_operation_request( dcm_task.rec_id, dcm_task.act_id, dcm_task.task_num, dcm_task.other_task_num ) setattr(operation_request, "dcm_task_id", dcm_task.id) request_queue = asyncio.Queue() await request_queue.put(operation_request) await requests.async_concurrency( request_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_request=after_operation_request ) operation = getattr(operation_request, "operation") return operation if __name__ == "__main__": from paste.core import aio_pool async def test(dcm_task_id): dcm_task = await DcmTask(id=dcm_task_id).async_find_first() assert dcm_task, f"未找到待办工单,工单ID:{dcm_task_id}" await fetch_operation(dcm_task) async def test_all(): dcm_task_list: list[DcmTask] = await DcmTask.query_all(select(DcmTask)) queue = asyncio.Queue() for dcm_task in dcm_task_list: request = await get_operation_request( dcm_task.rec_id, dcm_task.act_id, dcm_task.task_num, dcm_task.other_task_num ) setattr(request, "dcm_task_id", dcm_task.id) setattr(request, "rec_id", dcm_task.rec_id) await queue.put(request) await requests.async_concurrency(queue, after_request=after_operation_request) _runner = aio_pool.get_aio_runner() # _runner(test(2054174091304374276)) _runner(test_all())