307 lines
12 KiB
Python
307 lines
12 KiB
Python
import asyncio
|
|
import os
|
|
import io
|
|
import json
|
|
import time
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
from tornado.httpclient import HTTPResponse, HTTPRequest
|
|
|
|
import dock
|
|
import apps
|
|
from models.govs_create_reply import GovsReplyFormal
|
|
from models.govs_order_master import GovsOrderMaster
|
|
from dock.govs import govs_api, govs_upload_file
|
|
from dock.oa import oa_api_request, oa_result_notify, PushException
|
|
from paste.core.logging import echo_log
|
|
from paste.util.ufile import inspect_type
|
|
from paste.web import requests
|
|
|
|
|
|
async def get_reply_request(govs_reply: GovsReplyFormal, govs_order: GovsOrderMaster, file_list: list = None):
|
|
"""
|
|
创建答复办结请求对象。方法仅创建请求对象,并未实际提交请求,具体由调度方法处理。
|
|
|
|
:param govs_reply: 答复办结对象
|
|
:param govs_order: 工单对象
|
|
:param file_list: 已上传到省12345的文件列表数据
|
|
:return: HTTPRequest 对象
|
|
"""
|
|
api_url = '/workflow/approveTask/orderApprove'
|
|
# 默认不加前缀
|
|
prefix = ""
|
|
|
|
# 只有 contact_name 和 contact_time 都存在时才拼接前缀
|
|
if govs_reply.contact_name and govs_reply.contact_time:
|
|
# 格式化时间
|
|
try:
|
|
dt = datetime.fromisoformat(govs_reply.contact_time.replace('Z', '+00:00'))
|
|
formatted_contact_time = dt.strftime('%Y-%m-%d %H:%M')
|
|
except Exception:
|
|
formatted_contact_time = govs_reply.contact_time
|
|
prefix = f"您好,{govs_reply.contact_name}于{formatted_contact_time}通过{govs_reply.contact_type}方式联系您;"
|
|
body = {
|
|
"slaveDeptIsCompetent": [],
|
|
"duties": "",
|
|
"nextOrgIds": "",
|
|
"adviceMasterOrgName": "",
|
|
"adviceSlaveOrgNames": "",
|
|
"searchValueList": [],
|
|
"inforRetrieval": "",
|
|
"nextProcessing": "",
|
|
"assignedUnit": "",
|
|
"duration": "",
|
|
"dateChoose": "",
|
|
"plannedDuration": "",
|
|
"completionTime": "",
|
|
"returnAuditorName": "",
|
|
"handlingSuggestion": "",
|
|
"remark": govs_reply.remarks,
|
|
"fileList": file_list,
|
|
"isContact": "是",
|
|
"contactName": govs_reply.contact_name,
|
|
"contactTime": govs_reply.contact_time,
|
|
"contactType": govs_reply.contact_type,
|
|
"nextFeedbackTime": "",
|
|
"advice": prefix + (govs_reply.advice or ""),
|
|
"answer": "",
|
|
"applyReason": "",
|
|
"applyBasis": "",
|
|
"platformOpinion": "",
|
|
"shortMessage": "",
|
|
"distributor": "",
|
|
"distributors": [],
|
|
"positionSelection": "",
|
|
"difficultReason": "",
|
|
"directCompletionType": "",
|
|
"applyType": "",
|
|
"returnReason": "",
|
|
"returnReasonName": "",
|
|
"replyResult": "",
|
|
"informPublic": govs_reply.is_contact, # 这个还要确认一遍
|
|
"approveAttachmentIds": "",
|
|
"formalReply": "",
|
|
"flowMap": {
|
|
"nextHandleName": "答复办结",
|
|
"nextHandle": "答复办结"
|
|
},
|
|
"adviceMasterOrgId": "",
|
|
"adviceSlaveOrgIdsList": [],
|
|
"id": govs_order.next_task_id,
|
|
"key": "",
|
|
"nextHandler": "",
|
|
"nextOrgId": "",
|
|
"processInstanceId": govs_order.process_instance_id,
|
|
"reason": prefix + (govs_reply.reason or ""),
|
|
"taskHandlerId": "",
|
|
"value": "",
|
|
"vote": "",
|
|
"assignedUnitList": [],
|
|
"assignedUnitLabel": "",
|
|
"nextOrgIdList": [],
|
|
"nextOrgIdStr": "",
|
|
"knowledgeQuote": "[]",
|
|
"defineAuditorId": "",
|
|
"defineAuditorName": "",
|
|
"visitTypes": " ",
|
|
"appeal1": "",
|
|
"appeal2": "",
|
|
"unreasonableDemands": "",
|
|
"complainant": "",
|
|
"pollutionType": "-",
|
|
"pollutionType1": "",
|
|
"pollutionType2": "",
|
|
"involvedTargets": "",
|
|
"problemCategory": "生态环境类",
|
|
"defendantType": "",
|
|
"reportingPurpose": "投诉举报",
|
|
"industryType": "-",
|
|
"industryType1": "",
|
|
"industryType2": "",
|
|
"complainants": [
|
|
{
|
|
"complainant": "",
|
|
"region": "",
|
|
"street": "",
|
|
"detailedAddress": "",
|
|
"show": True,
|
|
"disabled": False
|
|
}
|
|
],
|
|
"inforAddress": "",
|
|
"associatedDefendantType": "",
|
|
"adminLawEnf": "",
|
|
"approveResult": "",
|
|
"approveContent": "",
|
|
"nextOrgIdsName": [],
|
|
"noticeOrgId": "",
|
|
"completeType": "",
|
|
"caseAccordTypeOneName": govs_order.case_accord_type_one_name,
|
|
"caseAccordTypeTwoName": govs_order.case_accord_type_two_name,
|
|
"caseAccordTypeThreeName": govs_order.case_accord_type_three_name,
|
|
"caseAccordTypeFourName": "",
|
|
"caseAccordTypeFiveName": "",
|
|
"fileVos": file_list,
|
|
"reasonableLabels": "-",
|
|
"visitType": "",
|
|
"actionName": govs_reply.action_name,
|
|
"businessKey": govs_order.order_id,
|
|
"masterId": govs_reply.master_id,
|
|
"orderNo": govs_order.order_no
|
|
}
|
|
return await govs_api.new_api_request(api_url, body)
|
|
|
|
|
|
async def after_create_reply_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
|
|
"""
|
|
提交省12345后的处理程序。
|
|
|
|
:param response: 响应对象
|
|
:param retry_queue: 重试队列
|
|
"""
|
|
echo_log(response.body.decode())
|
|
echo_log('答复办结请求成功.')
|
|
|
|
|
|
async def done_file_download(response_list: list[HTTPResponse]):
|
|
"""
|
|
所有附件下载完成执行的处理程序。
|
|
|
|
:param response_list: 附件下载响应列表
|
|
:return: 返回附件字典列表,每个元素包含文件名和io对象
|
|
"""
|
|
file_info_list = []
|
|
for response in response_list:
|
|
file_type = inspect_type(response.body)
|
|
basename = os.path.basename(response.request.url)
|
|
file_io = io.BytesIO(response.body)
|
|
file_info_list.append({
|
|
'file_name': f'{basename}.{file_type}',
|
|
'file_io': file_io
|
|
})
|
|
return file_info_list
|
|
|
|
|
|
async def done_file_upload(response_list: list[HTTPResponse]):
|
|
"""
|
|
文件上传完成后的处理程序
|
|
|
|
:param response_list: 附件上传响应列表
|
|
:return: 返回上次后的文件信息列表,包含文件名、文件路径
|
|
"""
|
|
uploaded_list = []
|
|
for response in response_list:
|
|
response_body = response.body.decode()
|
|
response_data = json.loads(response_body)
|
|
if response_data['msg'] == '附件上传成功!':
|
|
uploaded_list.append({
|
|
'file_name': getattr(response.request, 'file_name', 'file.bin'),
|
|
'path': response_data['data']
|
|
})
|
|
else:
|
|
echo_log(f'文件上传到省12345失败,{response_data}')
|
|
return uploaded_list
|
|
|
|
|
|
async def download_and_upload_files(file_id_str: str):
|
|
"""
|
|
从OA下载文件,上传到省12345,返回上传后的文件信息列表
|
|
|
|
:param file_id_str: 英文逗号分隔的OA文件id
|
|
"""
|
|
file_id_list = file_id_str.strip(',').split(',')
|
|
download_queue = asyncio.Queue()
|
|
for file_id in file_id_list:
|
|
download_request = await oa_api_request.get_download_request(file_id)
|
|
await download_queue.put(download_request)
|
|
file_info_list = await requests.async_concurrency(download_queue, con_count=dock.CONCURRENCY_COUNT,
|
|
retry=dock.MAX_RETRY_COUNT, after_done=done_file_download)
|
|
upload_queue = asyncio.Queue()
|
|
for file_info in file_info_list:
|
|
upload_request = await govs_upload_file.get_upload_request(file_info['file_name'], file_info['file_io'])
|
|
setattr(upload_request, 'file_name', file_info['file_name'])
|
|
await upload_queue.put(upload_request)
|
|
uploaded_list = await requests.async_concurrency(upload_queue, con_count=dock.CONCURRENCY_COUNT,
|
|
retry=dock.MAX_RETRY_COUNT, after_done=done_file_upload)
|
|
return uploaded_list
|
|
|
|
|
|
async def create_reply(govs_reply: GovsReplyFormal, govs_order: GovsOrderMaster):
|
|
"""
|
|
推送答复办结请求。
|
|
|
|
:param govs_reply: 保存在数据库的答复办结对象
|
|
:param govs_order: 数据库中的工单对象
|
|
"""
|
|
try:
|
|
# 仅生产环境真实提交,其他环境不实际提交
|
|
if apps.get_active_env() not in ('dev', '', None):
|
|
if govs_reply.file_id_str:
|
|
# 根据OA传过来的文件id,下载并上传到省12345
|
|
file_info_list = await download_and_upload_files(govs_reply.file_id_str)
|
|
file_info_list = [{
|
|
"name": info['file_name'],
|
|
"filePath": info['path'],
|
|
"orderId": govs_order.order_id,
|
|
"uid": int(time.time() * 1000),
|
|
"status": "success"
|
|
} for info in file_info_list]
|
|
else:
|
|
file_info_list = None
|
|
reply_request = await get_reply_request(govs_reply, govs_order, file_info_list)
|
|
queue = asyncio.Queue()
|
|
await queue.put(reply_request)
|
|
reply_response_list = await requests.async_concurrency(queue, con_count=dock.CONCURRENCY_COUNT,
|
|
retry=dock.MAX_RETRY_COUNT,
|
|
after_request=after_create_reply_request)
|
|
# 检查答复办结响应是否成功
|
|
if len(reply_response_list) != 1:
|
|
raise PushException("答复办结请求发生错误.", govs_reply.flow_token, 3)
|
|
return_response = reply_response_list[0]
|
|
return_response_data = return_response.body.decode()
|
|
return_response_data = json.loads(return_response_data)
|
|
if return_response_data.get('code') != 200 or return_response_data.get('data') != 'ok':
|
|
raise PushException("答复办结请求发生错误.", govs_reply.flow_token, 3)
|
|
else:
|
|
echo_log(f"非生产环境,不实际提交.")
|
|
# 保存成功状态
|
|
govs_reply.status = 1
|
|
await govs_reply.async_save()
|
|
# 答复办结请求提交后,通知答复办结成功
|
|
await oa_result_notify.push_result_notify(
|
|
govs_reply.flow_token,
|
|
'答复办结成功',
|
|
1
|
|
)
|
|
except PushException as e:
|
|
# 任何异常都意味着失败,通知 OA
|
|
echo_log(f'答复办结发生错误.', logging.ERROR)
|
|
echo_log(e, logging.ERROR, is_log_exc=True)
|
|
|
|
# 保存失败状态
|
|
govs_reply.status = 0
|
|
await govs_reply.async_save()
|
|
|
|
# 答复办结发生异常,通知答复办结失败
|
|
await oa_result_notify.push_result_notify(
|
|
e.flow_token, f"{e}", e.return_code
|
|
)
|
|
except Exception as e:
|
|
# 其他异常都意味着失败
|
|
echo_log(f'答复办结发生错误.', logging.ERROR)
|
|
echo_log(e, logging.ERROR, is_log_exc=True)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
async def push():
|
|
task = await GovsOrderMaster.async_find_by_id(2060173985047351297)
|
|
reply = await GovsReplyFormal.async_find_by_id(2061328980866371584)
|
|
await create_reply(reply, task)
|
|
|
|
|
|
from paste.core import aio_pool
|
|
|
|
_runner = aio_pool.get_aio_runner()
|
|
_runner(push())
|