import asyncio import os import io import json import time import logging from datetime import datetime from tornado.httpclient import HTTPResponse, HTTPRequest import dock import apps from models.govs_create_reply import GovsReplyFormal from models.govs_order_master import GovsOrderMaster from dock.govs import govs_api, govs_upload_file from dock.oa import oa_api_request, oa_result_notify, PushException from paste.core.logging import echo_log from paste.util.ufile import inspect_type from paste.web import requests async def get_reply_request(govs_reply: GovsReplyFormal, govs_order: GovsOrderMaster, file_list: list = None): """ 创建答复办结请求对象。方法仅创建请求对象,并未实际提交请求,具体由调度方法处理。 :param govs_reply: 答复办结对象 :param govs_order: 工单对象 :param file_list: 已上传到省12345的文件列表数据 :return: HTTPRequest 对象 """ api_url = '/workflow/approveTask/orderApprove' # 默认不加前缀 prefix = "" # 只有 contact_name 和 contact_time 都存在时才拼接前缀 if govs_reply.contact_name and govs_reply.contact_time: # 格式化时间 try: dt = datetime.fromisoformat(govs_reply.contact_time.replace('Z', '+00:00')) formatted_contact_time = dt.strftime('%Y-%m-%d %H:%M') except Exception: formatted_contact_time = govs_reply.contact_time prefix = f"您好,{govs_reply.contact_name}于{formatted_contact_time}通过{govs_reply.contact_type}方式联系您;" body = { "slaveDeptIsCompetent": [], "duties": "", "nextOrgIds": "", "adviceMasterOrgName": "", "adviceSlaveOrgNames": "", "searchValueList": [], "inforRetrieval": "", "nextProcessing": "", "assignedUnit": "", "duration": "", "dateChoose": "", "plannedDuration": "", "completionTime": "", "returnAuditorName": "", "handlingSuggestion": "", "remark": govs_reply.remarks, "fileList": file_list, "isContact": "是", "contactName": govs_reply.contact_name, "contactTime": govs_reply.contact_time, "contactType": govs_reply.contact_type, "nextFeedbackTime": "", "advice": prefix + (govs_reply.advice or ""), "answer": "", "applyReason": "", "applyBasis": "", "platformOpinion": "", "shortMessage": "", "distributor": "", "distributors": [], "positionSelection": "", "difficultReason": "", "directCompletionType": "", "applyType": "", "returnReason": "", "returnReasonName": "", "replyResult": "", "informPublic": govs_reply.is_contact, # 这个还要确认一遍 "approveAttachmentIds": "", "formalReply": "", "flowMap": { "nextHandleName": "答复办结", "nextHandle": "答复办结" }, "adviceMasterOrgId": "", "adviceSlaveOrgIdsList": [], "id": govs_order.next_task_id, "key": "", "nextHandler": "", "nextOrgId": "", "processInstanceId": govs_order.process_instance_id, "reason": prefix + (govs_reply.reason or ""), "taskHandlerId": "", "value": "", "vote": "", "assignedUnitList": [], "assignedUnitLabel": "", "nextOrgIdList": [], "nextOrgIdStr": "", "knowledgeQuote": "[]", "defineAuditorId": "", "defineAuditorName": "", "visitTypes": " ", "appeal1": "", "appeal2": "", "unreasonableDemands": "", "complainant": "", "pollutionType": "-", "pollutionType1": "", "pollutionType2": "", "involvedTargets": "", "problemCategory": "生态环境类", "defendantType": "", "reportingPurpose": "投诉举报", "industryType": "-", "industryType1": "", "industryType2": "", "complainants": [ { "complainant": "", "region": "", "street": "", "detailedAddress": "", "show": True, "disabled": False } ], "inforAddress": "", "associatedDefendantType": "", "adminLawEnf": "", "approveResult": "", "approveContent": "", "nextOrgIdsName": [], "noticeOrgId": "", "completeType": "", "caseAccordTypeOneName": govs_order.case_accord_type_one_name, "caseAccordTypeTwoName": govs_order.case_accord_type_two_name, "caseAccordTypeThreeName": govs_order.case_accord_type_three_name, "caseAccordTypeFourName": "", "caseAccordTypeFiveName": "", "fileVos": file_list, "reasonableLabels": "-", "visitType": "", "actionName": govs_reply.action_name, "businessKey": govs_order.order_id, "masterId": govs_reply.master_id, "orderNo": govs_order.order_no } return await govs_api.new_api_request(api_url, body) async def after_create_reply_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): """ 提交省12345后的处理程序。 :param response: 响应对象 :param retry_queue: 重试队列 """ echo_log(response.body.decode()) echo_log('答复办结请求成功.') async def done_file_download(response_list: list[HTTPResponse]): """ 所有附件下载完成执行的处理程序。 :param response_list: 附件下载响应列表 :return: 返回附件字典列表,每个元素包含文件名和io对象 """ file_info_list = [] for response in response_list: file_type = inspect_type(response.body) basename = os.path.basename(response.request.url) file_io = io.BytesIO(response.body) file_info_list.append({ 'file_name': f'{basename}.{file_type}', 'file_io': file_io }) return file_info_list async def done_file_upload(response_list: list[HTTPResponse]): """ 文件上传完成后的处理程序 :param response_list: 附件上传响应列表 :return: 返回上次后的文件信息列表,包含文件名、文件路径 """ uploaded_list = [] for response in response_list: response_body = response.body.decode() response_data = json.loads(response_body) if response_data['msg'] == '附件上传成功!': uploaded_list.append({ 'file_name': getattr(response.request, 'file_name', 'file.bin'), 'path': response_data['data'] }) else: echo_log(f'文件上传到省12345失败,{response_data}') return uploaded_list async def download_and_upload_files(file_id_str: str): """ 从OA下载文件,上传到省12345,返回上传后的文件信息列表 :param file_id_str: 英文逗号分隔的OA文件id """ file_id_list = file_id_str.strip(',').split(',') download_queue = asyncio.Queue() for file_id in file_id_list: download_request = await oa_api_request.get_download_request(file_id) await download_queue.put(download_request) file_info_list = await requests.async_concurrency(download_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_done=done_file_download) upload_queue = asyncio.Queue() for file_info in file_info_list: upload_request = await govs_upload_file.get_upload_request(file_info['file_name'], file_info['file_io']) setattr(upload_request, 'file_name', file_info['file_name']) await upload_queue.put(upload_request) uploaded_list = await requests.async_concurrency(upload_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_done=done_file_upload) return uploaded_list async def create_reply(govs_reply: GovsReplyFormal, govs_order: GovsOrderMaster): """ 推送答复办结请求。 :param govs_reply: 保存在数据库的答复办结对象 :param govs_order: 数据库中的工单对象 """ try: # 仅生产环境真实提交,其他环境不实际提交 if apps.get_active_env() not in ('dev', '', None): if govs_reply.file_id_str: # 根据OA传过来的文件id,下载并上传到省12345 file_info_list = await download_and_upload_files(govs_reply.file_id_str) file_info_list = [{ "name": info['file_name'], "filePath": info['path'], "orderId": govs_order.order_id, "uid": int(time.time() * 1000), "status": "success" } for info in file_info_list] else: file_info_list = None reply_request = await get_reply_request(govs_reply, govs_order, file_info_list) queue = asyncio.Queue() await queue.put(reply_request) reply_response_list = await requests.async_concurrency(queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_request=after_create_reply_request) # 检查答复办结响应是否成功 if len(reply_response_list) != 1: raise PushException("答复办结请求发生错误.", govs_reply.flow_token, 3) return_response = reply_response_list[0] return_response_data = return_response.body.decode() return_response_data = json.loads(return_response_data) if return_response_data.get('code') != 200 or return_response_data.get('data') != 'ok': raise PushException("答复办结请求发生错误.", govs_reply.flow_token, 3) else: echo_log(f"非生产环境,不实际提交.") # 保存成功状态 govs_reply.status = 1 await govs_reply.async_save() # 答复办结请求提交后,通知答复办结成功 await oa_result_notify.push_result_notify( govs_reply.flow_token, '答复办结成功', 1 ) except PushException as e: # 任何异常都意味着失败,通知 OA echo_log(f'答复办结发生错误.', logging.ERROR) echo_log(e, logging.ERROR, is_log_exc=True) # 保存失败状态 govs_reply.status = 0 await govs_reply.async_save() # 答复办结发生异常,通知答复办结失败 await oa_result_notify.push_result_notify( e.flow_token, f"{e}", e.return_code ) except Exception as e: # 其他异常都意味着失败 echo_log(f'答复办结发生错误.', logging.ERROR) echo_log(e, logging.ERROR, is_log_exc=True) if __name__ == '__main__': async def push(): task = await GovsOrderMaster.async_find_by_id(2060173985047351297) reply = await GovsReplyFormal.async_find_by_id(2061328980866371584) await create_reply(reply, task) from paste.core import aio_pool _runner = aio_pool.get_aio_runner() _runner(push())