""" 向 OA 平台上报数据已经完整推送。 对应文档接口:11、签收 """ import asyncio import json from typing import Optional, Union from sqlalchemy import select, desc from tornado.httpclient import HTTPResponse, HTTPRequest import dock from dock.oa import oa_api_request from models.dcm_push_status import DcmPushStatus from models.dcm_task import DcmTask from paste.core.logging import echo_log from paste.util import udict from paste.web import requests async def after_sign_task_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): """ 签收工单请求响应后的处理程序。 :param response: 响应对象 :param retry_queue: 重试队列 """ body = response.body.decode() echo_log(body) body_data = json.loads(body) code = udict.get_by_path(body_data, 'code') message = udict.get_by_path(body_data, 'msg') if code == 200: echo_log(f"签收工单成功.") else: echo_log(f"签收工单失败:{message}") if retry_queue: echo_log(f"签收工单重试队列中有:{retry_queue.qsize()} 个请求在等待.") async def sign_task(fetch_size: int = 50, task_id: Optional[Union[str, int, list[Union[str, int]]]] = None): """ 签收指定数量的工单任务 :param fetch_size: 本次签收的任务数量 :param task_id: 待办任务 ID 可选 """ # 根据条件获取目标任务 ID 列表(支持指定 task_id 或分页获取) task_query = select(DcmTask.id).join( DcmPushStatus, DcmPushStatus.dcm_task_id == DcmTask.id ).where( DcmPushStatus.push_task_status == 1, DcmPushStatus.push_task_detail_status == 1, DcmPushStatus.push_task_attachment_status == 1, DcmPushStatus.push_task_extend_info_status == 1, DcmPushStatus.push_task_file_upload_status == 1, DcmPushStatus.push_task_more_info_status == 1, DcmPushStatus.push_task_process_info_status == 1, ).order_by( desc(DcmTask.act_id) ) if task_id: if isinstance(task_id, list): task_query = task_query.where(DcmTask.id.in_(task_id)) echo_log(f"本次签收待办列表:{task_id} 的工单...") else: task_query = task_query.where(DcmTask.id == task_id) echo_log(f"本次签收待办:{task_id} 的工单...") else: task_query = task_query.limit(fetch_size) echo_log(f"本次签收前 {fetch_size} 条待办工单...") dcm_task_df = await DcmTask.query_as_df(task_query) # 格式化为字符串 dcm_task_df[DcmTask.id.key] = dcm_task_df[DcmTask.id.key].astype(str) echo_log(f"正在准备请求队列...") # 构建请求队列 sign_request_queue = asyncio.Queue() task_id_list = dcm_task_df[DcmTask.id.key].unique().tolist() for task_id in task_id_list: request = await oa_api_request.get_sign_task_request(task_id) await sign_request_queue.put(request) echo_log(f"开始签收工单...") await requests.async_concurrency( sign_request_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_request=after_sign_task_request ) echo_log(f"签收工单完成...") if __name__ == "__main__": from paste.core import aio_pool _runner = aio_pool.get_aio_runner() _runner(sign_task(task_id=2054174091228876801))