import asyncio import datetime import io import json import logging from tornado.httpclient import HTTPResponse, HTTPRequest import apps import dock from dock.dcm import dcm_api, dcm_push_conv_rollback, dcm_send_sms, dcm_push_upload from dock.oa import oa_api_request, oa_result_notify, PushException from models.dcm_rollback import DcmRollback from models.dcm_task import DcmTask from paste.core.logging import echo_log from paste.util import ufile, udict from paste.web import requests async def get_rollback_request(dcm_rollback: DcmRollback, dcm_task: DcmTask, media_ids: str, media_types: str, add_num: int, extra_opinion: str): """ 创建回退请求对象。方法仅创建请求对象,并未实际提交请求,具体由调度方法处理。 :param dcm_rollback: 回退对象 :param dcm_task: 工单对象 :param media_ids: 上传的附件 MediaID 字符串,多个用英文逗号分隔 :param media_types: 上传的附件的MediaType字符串,多个用英文逗号分隔 :param add_num: 上传的附件数 :param extra_opinion: 便民表单的extendInfo文本 :return: HTTPRequest 对象 """ api_url = '/home/workflow/rollback' body_dict = { 'actID': int(dcm_rollback.act_id), 'opinion': dcm_rollback.opinion if not extra_opinion else dcm_rollback.opinion + ' ' + extra_opinion, 'transInfo': dcm_rollback.trans_info, 'saveOldActFlag': dcm_rollback.save_old_act_flag, 'rollbackReasonID': dcm_rollback.rollback_reason_id, 'transListNum': 0, 'mediaIDs': media_ids, 'mediaUsage': '回退', 'relationTypeID': 10, 'relationID': dcm_task.rec_id, 'relationMainID': 45, 'relationSubID': dcm_task.act_id, 'tempUsage': 'rollback_tmp', 'mediaTypes': media_types, 'addNum': add_num } rollback_request = await dcm_api.new_api_request(api_url, body_dict) return rollback_request async def on_attachment_download_error(request: HTTPRequest, exc: Exception, retry_queue: asyncio.Queue = None): """ 下载附件时的出错处理。 :param request: 请求对象 :param exc: 异常对象 :param retry_queue: :return: """ retry = getattr(request, 'retry', 0) max_retry = getattr(request, 'max_retry', 0) if retry < max_retry - 1: # 非最后一次尝试,不处理 return message = f"下载回退附件时发生错误,下载地址:{request.url}" echo_log(message) echo_log(exc, logging.ERROR, is_log_exc=True) # 保存异常 dcm_rollback: DcmRollback = getattr(request, 'dcm_rollback') exc_list = getattr(dcm_rollback, 'exc_list') exc_list.append(PushException(message, dcm_rollback.flow_token, 3)) async def done_attachment_download(response_list: list[HTTPResponse]): """ 所有附件下载完成执行的处理程序。 :param response_list: 附件下载响应列表 :return: 返回附件字典 """ # 遍历取出附件数据 date_str = datetime.datetime.now().strftime("%Y%m%d") files_body: dict[str, io.IOBase] = {} for i, response in enumerate(response_list): _type = ufile.inspect_type(response.body) if isinstance(_type, tuple): _fn = f"CT{date_str}{i + 1:04d}.{_type[0]}" elif isinstance(_type, str) and _type: _fn = f"CT{date_str}{i + 1:04d}.{_type}" else: _fn = f"CT{date_str}{i + 1:04d}" file_obj = io.BytesIO(response.body) files_body[_fn] = file_obj return files_body async def after_push_rollback_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): """ 提交数字城管后的处理程序。 :param response: 响应对象 :param retry_queue: 重试队列 """ echo_log(response.body.decode()) echo_log('回退请求成功.') async def push_rollback(dcm_rollback: DcmRollback, dcm_task: DcmTask): """ 推送回退请求。 :param dcm_rollback: 保存在数据库的回退对象 :param dcm_task: 待办工单 """ try: # 如果来源是12345热线,提交便民回退 extra_opinion = '' if dcm_task.event_src_name == '12345热线': conv_response_list = await dcm_push_conv_rollback.push_conv_rollback_request(dcm_rollback, dcm_task) if conv_response_list: conv_response = conv_response_list[0].body.decode() conv_response = json.loads(conv_response) extra_opinion = udict.get_by_path(conv_response, 'resultInfo.data.extendInfo', '') echo_log(f"正在准备下载队列...") setattr(dcm_rollback, 'exc_list', []) files_body = {} if dcm_rollback.attachments: # 创建并填充下载请求队列 attachment_download_queue = asyncio.Queue() attachment_id_list = dcm_rollback.attachments.split(',') for attachment_id in attachment_id_list: download_request = await oa_api_request.get_download_request(attachment_id) setattr(download_request, 'dcm_rollback', dcm_rollback) await attachment_download_queue.put(download_request) # 启动下载,得到所有的附件 files_body: dict[str, io.IOBase] = await requests.async_concurrency( attachment_download_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_done=done_attachment_download, on_error=on_attachment_download_error ) # 读取异常有则报错 exc_list: list[PushException] = getattr(dcm_rollback, 'exc_list', []) if exc_list: message = ";".join([f"{exc}" for exc in exc_list]) raise PushException(message, dcm_rollback.flow_token, 3) echo_log(f"附件下载完成,正在准备提交数字城管...") # 仅生产环境真实提交,其他环境不实际提交 if apps.get_active_env() in ('dev', '', None): echo_log(f"非生产环境,不实际提交.") return # 上传附件,同步得到 MediaIds media_ids, media_types, add_num = await dcm_push_upload.push_upload( dcm_task, dcm_push_upload.RollbackTmp, files_body ) # 创建并填充数字城管请求队列 rollback_request = await get_rollback_request( dcm_rollback, dcm_task, media_ids, media_types, add_num, extra_opinion ) setattr(rollback_request, 'dcm_rollback', dcm_rollback) rollback_push_queue = asyncio.Queue() await rollback_push_queue.put(rollback_request) # 提交数字城管-回退请求 rollback_push_response_list = await requests.async_concurrency( rollback_push_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_request=after_push_rollback_request ) # 检查是否推送成功,失败直接报错 if len(rollback_push_response_list) != 1: raise PushException("回退请求发生错误.", dcm_rollback.flow_token, 3) # 检查响应的内容,验证是否推送成功 response_data = rollback_push_response_list[0].body.decode() response_data = json.loads(response_data) if udict.get_by_path(response_data, 'resultInfo.message', '') != '回退成功!' or udict.get_by_path( response_data, 'resultInfo.success') is not True: raise PushException("回退请求发生错误.", dcm_rollback.flow_token, 3) # 保存成功状态 dcm_rollback.status = 1 await dcm_rollback.async_save() # 回退请求提交后,通知回退成功 await oa_result_notify.push_result_notify( dcm_rollback.flow_token, '回退成功', 1 ) # 如果需要发送短信,后台发送 if dcm_rollback.send_message in (1, '1'): await dcm_send_sms.send_sms({ 'actDefID': '45', 'partStr': '254,role,市受理员', 'itemType': 'rollback', 'recID': dcm_task.rec_id, 'actID': dcm_task.act_id, 'actPropertyID': '7' }) except PushException as e: # 任何异常都意味着失败,通知 OA echo_log(f'回退发生错误.', logging.ERROR) echo_log(e, logging.ERROR, is_log_exc=True) # 保存失败状态 dcm_rollback.status = 0 await dcm_rollback.async_save() # 回退发生异常,通知回退失败 await oa_result_notify.push_result_notify( e.flow_token, f"{e}", e.return_code ) except Exception as e: # 其他异常都意味着失败 echo_log(f'回退发生错误.', logging.ERROR) echo_log(e, logging.ERROR, is_log_exc=True) if __name__ == "__main__": from paste.core import aio_pool async def push(): dcm_task = await DcmTask(id=2054174091237265413).async_find_first() task_rollback = await DcmRollback(id=2058802518821048320).async_find_first() await push_rollback(task_rollback, dcm_task) _runner = aio_pool.get_aio_runner() _runner(push())