Files
d3i-szct/dock/govs/govs_create_reply.py
2026-06-02 17:46:38 +08:00

307 lines
12 KiB
Python

import asyncio
import os
import io
import json
import time
import logging
from datetime import datetime
from tornado.httpclient import HTTPResponse, HTTPRequest
import dock
import apps
from models.govs_create_reply import GovsReplyFormal
from models.govs_order_master import GovsOrderMaster
from dock.govs import govs_api, govs_upload_file
from dock.oa import oa_api_request, oa_result_notify, PushException
from paste.core.logging import echo_log
from paste.util.ufile import inspect_type
from paste.web import requests
async def get_reply_request(govs_reply: GovsReplyFormal, govs_order: GovsOrderMaster, file_list: list = None):
"""
创建答复办结请求对象。方法仅创建请求对象,并未实际提交请求,具体由调度方法处理。
:param govs_reply: 答复办结对象
:param govs_order: 工单对象
:param file_list: 已上传到省12345的文件列表数据
:return: HTTPRequest 对象
"""
api_url = '/workflow/approveTask/orderApprove'
# 默认不加前缀
prefix = ""
# 只有 contact_name 和 contact_time 都存在时才拼接前缀
if govs_reply.contact_name and govs_reply.contact_time:
# 格式化时间
try:
dt = datetime.fromisoformat(govs_reply.contact_time.replace('Z', '+00:00'))
formatted_contact_time = dt.strftime('%Y-%m-%d %H:%M')
except Exception:
formatted_contact_time = govs_reply.contact_time
prefix = f"您好,{govs_reply.contact_name}{formatted_contact_time}通过{govs_reply.contact_type}方式联系您;"
body = {
"slaveDeptIsCompetent": [],
"duties": "",
"nextOrgIds": "",
"adviceMasterOrgName": "",
"adviceSlaveOrgNames": "",
"searchValueList": [],
"inforRetrieval": "",
"nextProcessing": "",
"assignedUnit": "",
"duration": "",
"dateChoose": "",
"plannedDuration": "",
"completionTime": "",
"returnAuditorName": "",
"handlingSuggestion": "",
"remark": govs_reply.remarks,
"fileList": file_list,
"isContact": "",
"contactName": govs_reply.contact_name,
"contactTime": govs_reply.contact_time,
"contactType": govs_reply.contact_type,
"nextFeedbackTime": "",
"advice": prefix + (govs_reply.advice or ""),
"answer": "",
"applyReason": "",
"applyBasis": "",
"platformOpinion": "",
"shortMessage": "",
"distributor": "",
"distributors": [],
"positionSelection": "",
"difficultReason": "",
"directCompletionType": "",
"applyType": "",
"returnReason": "",
"returnReasonName": "",
"replyResult": "",
"informPublic": govs_reply.is_contact, # 这个还要确认一遍
"approveAttachmentIds": "",
"formalReply": "",
"flowMap": {
"nextHandleName": "答复办结",
"nextHandle": "答复办结"
},
"adviceMasterOrgId": "",
"adviceSlaveOrgIdsList": [],
"id": govs_order.next_task_id,
"key": "",
"nextHandler": "",
"nextOrgId": "",
"processInstanceId": govs_order.process_instance_id,
"reason": prefix + (govs_reply.reason or ""),
"taskHandlerId": "",
"value": "",
"vote": "",
"assignedUnitList": [],
"assignedUnitLabel": "",
"nextOrgIdList": [],
"nextOrgIdStr": "",
"knowledgeQuote": "[]",
"defineAuditorId": "",
"defineAuditorName": "",
"visitTypes": " ",
"appeal1": "",
"appeal2": "",
"unreasonableDemands": "",
"complainant": "",
"pollutionType": "-",
"pollutionType1": "",
"pollutionType2": "",
"involvedTargets": "",
"problemCategory": "生态环境类",
"defendantType": "",
"reportingPurpose": "投诉举报",
"industryType": "-",
"industryType1": "",
"industryType2": "",
"complainants": [
{
"complainant": "",
"region": "",
"street": "",
"detailedAddress": "",
"show": True,
"disabled": False
}
],
"inforAddress": "",
"associatedDefendantType": "",
"adminLawEnf": "",
"approveResult": "",
"approveContent": "",
"nextOrgIdsName": [],
"noticeOrgId": "",
"completeType": "",
"caseAccordTypeOneName": govs_order.case_accord_type_one_name,
"caseAccordTypeTwoName": govs_order.case_accord_type_two_name,
"caseAccordTypeThreeName": govs_order.case_accord_type_three_name,
"caseAccordTypeFourName": "",
"caseAccordTypeFiveName": "",
"fileVos": file_list,
"reasonableLabels": "-",
"visitType": "",
"actionName": govs_reply.action_name,
"businessKey": govs_order.order_id,
"masterId": govs_reply.master_id,
"orderNo": govs_order.order_no
}
return await govs_api.new_api_request(api_url, body)
async def after_create_reply_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
"""
提交省12345后的处理程序。
:param response: 响应对象
:param retry_queue: 重试队列
"""
echo_log(response.body.decode())
echo_log('答复办结请求成功.')
async def done_file_download(response_list: list[HTTPResponse]):
"""
所有附件下载完成执行的处理程序。
:param response_list: 附件下载响应列表
:return: 返回附件字典列表,每个元素包含文件名和io对象
"""
file_info_list = []
for response in response_list:
file_type = inspect_type(response.body)
basename = os.path.basename(response.request.url)
file_io = io.BytesIO(response.body)
file_info_list.append({
'file_name': f'{basename}.{file_type}',
'file_io': file_io
})
return file_info_list
async def done_file_upload(response_list: list[HTTPResponse]):
"""
文件上传完成后的处理程序
:param response_list: 附件上传响应列表
:return: 返回上次后的文件信息列表,包含文件名、文件路径
"""
uploaded_list = []
for response in response_list:
response_body = response.body.decode()
response_data = json.loads(response_body)
if response_data['msg'] == '附件上传成功!':
uploaded_list.append({
'file_name': getattr(response.request, 'file_name', 'file.bin'),
'path': response_data['data']
})
else:
echo_log(f'文件上传到省12345失败,{response_data}')
return uploaded_list
async def download_and_upload_files(file_id_str: str):
"""
从OA下载文件,上传到省12345,返回上传后的文件信息列表
:param file_id_str: 英文逗号分隔的OA文件id
"""
file_id_list = file_id_str.strip(',').split(',')
download_queue = asyncio.Queue()
for file_id in file_id_list:
download_request = await oa_api_request.get_download_request(file_id)
await download_queue.put(download_request)
file_info_list = await requests.async_concurrency(download_queue, con_count=dock.CONCURRENCY_COUNT,
retry=dock.MAX_RETRY_COUNT, after_done=done_file_download)
upload_queue = asyncio.Queue()
for file_info in file_info_list:
upload_request = await govs_upload_file.get_upload_request(file_info['file_name'], file_info['file_io'])
setattr(upload_request, 'file_name', file_info['file_name'])
await upload_queue.put(upload_request)
uploaded_list = await requests.async_concurrency(upload_queue, con_count=dock.CONCURRENCY_COUNT,
retry=dock.MAX_RETRY_COUNT, after_done=done_file_upload)
return uploaded_list
async def create_reply(govs_reply: GovsReplyFormal, govs_order: GovsOrderMaster):
"""
推送答复办结请求。
:param govs_reply: 保存在数据库的答复办结对象
:param govs_order: 数据库中的工单对象
"""
try:
# 仅生产环境真实提交,其他环境不实际提交
if apps.get_active_env() not in ('dev', '', None):
if govs_reply.file_id_str:
# 根据OA传过来的文件id,下载并上传到省12345
file_info_list = await download_and_upload_files(govs_reply.file_id_str)
file_info_list = [{
"name": info['file_name'],
"filePath": info['path'],
"orderId": govs_order.order_id,
"uid": int(time.time() * 1000),
"status": "success"
} for info in file_info_list]
else:
file_info_list = None
reply_request = await get_reply_request(govs_reply, govs_order, file_info_list)
queue = asyncio.Queue()
await queue.put(reply_request)
reply_response_list = await requests.async_concurrency(queue, con_count=dock.CONCURRENCY_COUNT,
retry=dock.MAX_RETRY_COUNT,
after_request=after_create_reply_request)
# 检查答复办结响应是否成功
if len(reply_response_list) != 1:
raise PushException("答复办结请求发生错误.", govs_reply.flow_token, 3)
return_response = reply_response_list[0]
return_response_data = return_response.body.decode()
return_response_data = json.loads(return_response_data)
if return_response_data.get('code') != 200 or return_response_data.get('data') != 'ok':
raise PushException("答复办结请求发生错误.", govs_reply.flow_token, 3)
else:
echo_log(f"非生产环境,不实际提交.")
# 保存成功状态
govs_reply.status = 1
await govs_reply.async_save()
# 答复办结请求提交后,通知答复办结成功
await oa_result_notify.push_result_notify(
govs_reply.flow_token,
'答复办结成功',
1
)
except PushException as e:
# 任何异常都意味着失败,通知 OA
echo_log(f'答复办结发生错误.', logging.ERROR)
echo_log(e, logging.ERROR, is_log_exc=True)
# 保存失败状态
govs_reply.status = 0
await govs_reply.async_save()
# 答复办结发生异常,通知答复办结失败
await oa_result_notify.push_result_notify(
e.flow_token, f"{e}", e.return_code
)
except Exception as e:
# 其他异常都意味着失败
echo_log(f'答复办结发生错误.', logging.ERROR)
echo_log(e, logging.ERROR, is_log_exc=True)
if __name__ == '__main__':
async def push():
task = await GovsOrderMaster.async_find_by_id(2060173985047351297)
reply = await GovsReplyFormal.async_find_by_id(2061328980866371584)
await create_reply(reply, task)
from paste.core import aio_pool
_runner = aio_pool.get_aio_runner()
_runner(push())