Files
d3i-szct/dock/dcm/dcm_push_rollback.py
2026-06-02 17:46:38 +08:00

236 lines
9.1 KiB
Python

import asyncio
import datetime
import io
import json
import logging
from tornado.httpclient import HTTPResponse, HTTPRequest
import apps
import dock
from dock.dcm import dcm_api, dcm_push_conv_rollback, dcm_send_sms, dcm_push_upload
from dock.oa import oa_api_request, oa_result_notify, PushException
from models.dcm_rollback import DcmRollback
from models.dcm_task import DcmTask
from paste.core.logging import echo_log
from paste.util import ufile, udict
from paste.web import requests
async def get_rollback_request(dcm_rollback: DcmRollback, dcm_task: DcmTask, media_ids: str, media_types: str,
add_num: int, extra_opinion: str):
"""
创建回退请求对象。方法仅创建请求对象,并未实际提交请求,具体由调度方法处理。
:param dcm_rollback: 回退对象
:param dcm_task: 工单对象
:param media_ids: 上传的附件 MediaID 字符串,多个用英文逗号分隔
:param media_types: 上传的附件的MediaType字符串,多个用英文逗号分隔
:param add_num: 上传的附件数
:param extra_opinion: 便民表单的extendInfo文本
:return: HTTPRequest 对象
"""
api_url = '/home/workflow/rollback'
body_dict = {
'actID': int(dcm_rollback.act_id),
'opinion': dcm_rollback.opinion if not extra_opinion else dcm_rollback.opinion + ' ' + extra_opinion,
'transInfo': dcm_rollback.trans_info,
'saveOldActFlag': dcm_rollback.save_old_act_flag,
'rollbackReasonID': dcm_rollback.rollback_reason_id,
'transListNum': 0,
'mediaIDs': media_ids,
'mediaUsage': '回退',
'relationTypeID': 10,
'relationID': dcm_task.rec_id,
'relationMainID': 45,
'relationSubID': dcm_task.act_id,
'tempUsage': 'rollback_tmp',
'mediaTypes': media_types,
'addNum': add_num
}
rollback_request = await dcm_api.new_api_request(api_url, body_dict)
return rollback_request
async def on_attachment_download_error(request: HTTPRequest, exc: Exception, retry_queue: asyncio.Queue = None):
"""
下载附件时的出错处理。
:param request: 请求对象
:param exc: 异常对象
:param retry_queue:
:return:
"""
retry = getattr(request, 'retry', 0)
max_retry = getattr(request, 'max_retry', 0)
if retry < max_retry - 1:
# 非最后一次尝试,不处理
return
message = f"下载回退附件时发生错误,下载地址:{request.url}"
echo_log(message)
echo_log(exc, logging.ERROR, is_log_exc=True)
# 保存异常
dcm_rollback: DcmRollback = getattr(request, 'dcm_rollback')
exc_list = getattr(dcm_rollback, 'exc_list')
exc_list.append(PushException(message, dcm_rollback.flow_token, 3))
async def done_attachment_download(response_list: list[HTTPResponse]):
"""
所有附件下载完成执行的处理程序。
:param response_list: 附件下载响应列表
:return: 返回附件字典
"""
# 遍历取出附件数据
date_str = datetime.datetime.now().strftime("%Y%m%d")
files_body: dict[str, io.IOBase] = {}
for i, response in enumerate(response_list):
_type = ufile.inspect_type(response.body)
if isinstance(_type, tuple):
_fn = f"CT{date_str}{i + 1:04d}.{_type[0]}"
elif isinstance(_type, str) and _type:
_fn = f"CT{date_str}{i + 1:04d}.{_type}"
else:
_fn = f"CT{date_str}{i + 1:04d}"
file_obj = io.BytesIO(response.body)
files_body[_fn] = file_obj
return files_body
async def after_push_rollback_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
"""
提交数字城管后的处理程序。
:param response: 响应对象
:param retry_queue: 重试队列
"""
echo_log(response.body.decode())
echo_log('回退请求成功.')
async def push_rollback(dcm_rollback: DcmRollback, dcm_task: DcmTask):
"""
推送回退请求。
:param dcm_rollback: 保存在数据库的回退对象
:param dcm_task: 待办工单
"""
try:
# 如果来源是12345热线,提交便民回退
extra_opinion = ''
if dcm_task.event_src_name == '12345热线':
conv_response_list = await dcm_push_conv_rollback.push_conv_rollback_request(dcm_rollback, dcm_task)
if conv_response_list:
conv_response = conv_response_list[0].body.decode()
conv_response = json.loads(conv_response)
extra_opinion = udict.get_by_path(conv_response, 'resultInfo.data.extendInfo', '')
echo_log(f"正在准备下载队列...")
setattr(dcm_rollback, 'exc_list', [])
files_body = {}
if dcm_rollback.attachments:
# 创建并填充下载请求队列
attachment_download_queue = asyncio.Queue()
attachment_id_list = dcm_rollback.attachments.split(',')
for attachment_id in attachment_id_list:
download_request = await oa_api_request.get_download_request(attachment_id)
setattr(download_request, 'dcm_rollback', dcm_rollback)
await attachment_download_queue.put(download_request)
# 启动下载,得到所有的附件
files_body: dict[str, io.IOBase] = await requests.async_concurrency(
attachment_download_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_done=done_attachment_download, on_error=on_attachment_download_error
)
# 读取异常有则报错
exc_list: list[PushException] = getattr(dcm_rollback, 'exc_list', [])
if exc_list:
message = ";".join([f"{exc}" for exc in exc_list])
raise PushException(message, dcm_rollback.flow_token, 3)
echo_log(f"附件下载完成,正在准备提交数字城管...")
# 仅生产环境真实提交,其他环境不实际提交
if apps.get_active_env() in ('dev', '', None):
echo_log(f"非生产环境,不实际提交.")
return
# 上传附件,同步得到 MediaIds
media_ids, media_types, add_num = await dcm_push_upload.push_upload(
dcm_task, dcm_push_upload.RollbackTmp, files_body
)
# 创建并填充数字城管请求队列
rollback_request = await get_rollback_request(
dcm_rollback, dcm_task, media_ids, media_types, add_num, extra_opinion
)
setattr(rollback_request, 'dcm_rollback', dcm_rollback)
rollback_push_queue = asyncio.Queue()
await rollback_push_queue.put(rollback_request)
# 提交数字城管-回退请求
rollback_push_response_list = await requests.async_concurrency(
rollback_push_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_push_rollback_request
)
# 检查是否推送成功,失败直接报错
if len(rollback_push_response_list) != 1:
raise PushException("回退请求发生错误.", dcm_rollback.flow_token, 3)
# 检查响应的内容,验证是否推送成功
response_data = rollback_push_response_list[0].body.decode()
response_data = json.loads(response_data)
if udict.get_by_path(response_data, 'resultInfo.message', '') != '回退成功!' or udict.get_by_path(
response_data, 'resultInfo.success') is not True:
raise PushException("回退请求发生错误.", dcm_rollback.flow_token, 3)
# 保存成功状态
dcm_rollback.status = 1
await dcm_rollback.async_save()
# 回退请求提交后,通知回退成功
await oa_result_notify.push_result_notify(
dcm_rollback.flow_token,
'回退成功',
1
)
# 如果需要发送短信,后台发送
if dcm_rollback.send_message in (1, '1'):
await dcm_send_sms.send_sms({
'actDefID': '45',
'partStr': '254,role,市受理员',
'itemType': 'rollback',
'recID': dcm_task.rec_id,
'actID': dcm_task.act_id,
'actPropertyID': '7'
})
except PushException as e:
# 任何异常都意味着失败,通知 OA
echo_log(f'回退发生错误.', logging.ERROR)
echo_log(e, logging.ERROR, is_log_exc=True)
# 保存失败状态
dcm_rollback.status = 0
await dcm_rollback.async_save()
# 回退发生异常,通知回退失败
await oa_result_notify.push_result_notify(
e.flow_token, f"{e}", e.return_code
)
except Exception as e:
# 其他异常都意味着失败
echo_log(f'回退发生错误.', logging.ERROR)
echo_log(e, logging.ERROR, is_log_exc=True)
if __name__ == "__main__":
from paste.core import aio_pool
async def push():
dcm_task = await DcmTask(id=2054174091237265413).async_find_first()
task_rollback = await DcmRollback(id=2058802518821048320).async_find_first()
await push_rollback(task_rollback, dcm_task)
_runner = aio_pool.get_aio_runner()
_runner(push())