Files
d3i-szct/dock/govs/govs_phase_wise_completion.py
T
2026-06-02 17:46:38 +08:00

282 lines
11 KiB
Python

import asyncio
import os
import io
import json
import time
import logging
from tornado.httpclient import HTTPResponse, HTTPRequest
import dock
import apps
from models.govs_phase_wise_completion import GovsPhaseWiseCompletion
from models.govs_order_master import GovsOrderMaster
from dock.govs import govs_api, govs_upload_file
from dock.oa import oa_api_request, oa_result_notify, PushException
from paste.core.logging import echo_log
from paste.util.ufile import inspect_type
from paste.web import requests
async def get_phase_request(phase_wise_completion: GovsPhaseWiseCompletion, govs_order: GovsOrderMaster,
file_list: list = None):
"""
创建阶段性办结请求对象。方法仅创建请求对象,并未实际提交请求,具体由调度方法处理。
:param phase_wise_completion: 阶段性办结对象
:param govs_order: 工单对象
:param file_list: 已上传到省12345的文件列表数据
:return: HTTPRequest 对象
"""
api_url = '/orderhandler/remAndSup/savePeriodicCompletion'
body = {
"slaveDeptIsCompetent": [],
"duties": "",
"nextOrgIds": "",
"adviceMasterOrgName": "",
"adviceSlaveOrgNames": "",
"searchValueList": [],
"inforRetrieval": "",
"nextProcessing": "",
"assignedUnit": "",
"duration": "",
"dateChoose": "",
"plannedDuration": "",
"completionTime": "",
"returnAuditorName": "",
"handlingSuggestion": "",
"remark": phase_wise_completion.remark,
"fileList": file_list,
"isContact": phase_wise_completion.is_contact,
"contactName": phase_wise_completion.contact_name,
"contactTime": phase_wise_completion.contact_time,
"contactType": phase_wise_completion.contact_type,
"nextFeedbackTime": phase_wise_completion.next_feedback_time,
"advice": phase_wise_completion.advice,
"answer": "",
"applyReason": "",
"applyBasis": "",
"platformOpinion": "",
"shortMessage": "",
"distributor": "",
"distributors": [],
"positionSelection": "",
"difficultReason": "",
"directCompletionType": "",
"applyType": "",
"returnReason": "",
"returnReasonName": "",
"replyResult": "",
"informPublic": "",
"approveAttachmentIds": "",
"formalReply": "",
"flowMap": {
"nextHandleName": "阶段性办结",
"nextHandle": "阶段性办结"
},
"adviceMasterOrgId": "",
"adviceSlaveOrgIdsList": [],
"id": "",
"key": "",
"nextHandler": "",
"nextOrgId": "",
"processInstanceId": govs_order.process_instance_id,
"reason": phase_wise_completion.reason,
"taskHandlerId": "",
"value": "",
"vote": "",
"assignedUnitList": [],
"assignedUnitLabel": "",
"nextOrgIdList": [],
"nextOrgIdStr": "",
"knowledgeQuote": "[]",
"defineAuditorId": "",
"defineAuditorName": "",
"visitTypes": " ",
"appeal1": "",
"appeal2": "",
"unreasonableDemands": "",
"complainant": "",
"pollutionType": "",
"pollutionType1": "",
"pollutionType2": "",
"involvedTargets": "",
"problemCategory": "生态环境类",
"defendantType": "",
"reportingPurpose": "投诉举报",
"industryType": "",
"industryType1": "",
"industryType2": "",
"complainants": [
{
"complainant": "",
"region": "",
"street": "",
"detailedAddress": "",
"show": True,
"disabled": False
}
],
"inforAddress": "",
"associatedDefendantType": "",
"adminLawEnf": "",
"approveResult": "",
"approveContent": "",
"nextOrgIdsName": [],
"noticeOrgId": "",
"completeType": "",
"caseAccordTypeOneName": govs_order.case_accord_type_one_name,
"caseAccordTypeTwoName": govs_order.case_accord_type_two_name,
"caseAccordTypeThreeName": govs_order.case_accord_type_three_name,
"caseAccordTypeFourName": "",
"caseAccordTypeFiveName": "",
"fileVos": file_list,
"actionName": phase_wise_completion.action_name,
"orderId": govs_order.order_id,
"taskId": govs_order.next_task_id,
"submitType": "0",
"masterId": phase_wise_completion.master_id,
"orderNo": govs_order.order_no
}
return await govs_api.new_api_request(api_url, body)
async def after_phase_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
"""
提交省12345后的处理程序。
:param response: 响应对象
:param retry_queue: 重试队列
"""
echo_log(response.body.decode())
echo_log('阶段性办结请求成功.')
async def done_file_download(response_list: list[HTTPResponse]):
"""
所有附件下载完成执行的处理程序。
:param response_list: 附件下载响应列表
:return: 返回附件字典列表,每个元素包含文件名和io对象
"""
file_info_list = []
for response in response_list:
file_type = inspect_type(response.body)
basename = os.path.basename(response.request.url)
file_io = io.BytesIO(response.body)
file_info_list.append({
'file_name': f'{basename}.{file_type}',
'file_io': file_io
})
return file_info_list
async def done_file_upload(response_list: list[HTTPResponse]):
"""
文件上传完成后的处理程序
:param response_list: 附件上传响应列表
:return: 返回上次后的文件信息列表,包含文件名、文件路径
"""
uploaded_list = []
for response in response_list:
response_body = response.body.decode()
response_data = json.loads(response_body)
if response_data['msg'] == '附件上传成功!':
uploaded_list.append({
'file_name': getattr(response.request, 'file_name', 'file.bin'),
'path': response_data['data']
})
else:
echo_log(f'文件上传到省12345失败,{response_data}')
return uploaded_list
async def download_and_upload_files(file_id_str: str):
"""
从OA下载文件,上传到省12345,返回上传后的文件信息列表
:param file_id_str: 英文逗号分隔的OA文件id
"""
file_id_list = file_id_str.strip(',').split(',')
download_queue = asyncio.Queue()
for file_id in file_id_list:
download_request = await oa_api_request.get_download_request(file_id)
await download_queue.put(download_request)
file_info_list = await requests.async_concurrency(download_queue, con_count=dock.CONCURRENCY_COUNT,
retry=dock.MAX_RETRY_COUNT, after_done=done_file_download)
upload_queue = asyncio.Queue()
for file_info in file_info_list:
upload_request = await govs_upload_file.get_upload_request(file_info['file_name'], file_info['file_io'])
setattr(upload_request, 'file_name', file_info['file_name'])
await upload_queue.put(upload_request)
uploaded_list = await requests.async_concurrency(upload_queue, con_count=dock.CONCURRENCY_COUNT,
retry=dock.MAX_RETRY_COUNT, after_done=done_file_upload)
return uploaded_list
async def create_phase_wise_completion(phase_wise_completion: GovsPhaseWiseCompletion, govs_order: GovsOrderMaster):
"""
推送阶段性办结请求。
:param phase_wise_completion: 保存在数据库的阶段性办结对象
:param govs_order: 数据库中的工单对象
"""
try:
# 仅生产环境真实提交,其他环境不实际提交
if apps.get_active_env() not in ('dev', '', None):
if phase_wise_completion.file_id_str:
# 根据OA传过来的文件id,下载并上传到省12345
file_info_list = await download_and_upload_files(phase_wise_completion.file_id_str)
file_info_list = [{
"name": info['file_name'],
"filePath": info['path'],
"orderId": govs_order.order_id,
"uid": int(time.time() * 1000),
"status": "success"
} for info in file_info_list]
else:
file_info_list = None
phase_request = await get_phase_request(phase_wise_completion, govs_order, file_info_list)
queue = asyncio.Queue()
await queue.put(phase_request)
phase_response_list = await requests.async_concurrency(queue, con_count=dock.CONCURRENCY_COUNT,
retry=dock.MAX_RETRY_COUNT,
after_request=after_phase_request)
# 检查阶段性办结响应是否成功
if len(phase_response_list) != 1:
raise PushException("阶段性办结请求发生错误.", phase_wise_completion.flow_token, 3)
return_response = phase_response_list[0]
return_response_data = return_response.body.decode()
return_response_data = json.loads(return_response_data)
if return_response_data.get('code') != 200:
raise PushException("阶段性办结请求发生错误.", phase_wise_completion.flow_token, 3)
else:
echo_log(f"非生产环境,不实际提交.")
# 保存成功状态
phase_wise_completion.status = 1
await phase_wise_completion.async_save()
# 阶段性办结请求提交后,通知阶段性办结成功
await oa_result_notify.push_result_notify(
phase_wise_completion.flow_token,
'阶段性办结成功',
1
)
except PushException as e:
# 任何异常都意味着失败,通知 OA
echo_log(f'阶段性办结发生错误.', logging.ERROR)
echo_log(e, logging.ERROR, is_log_exc=True)
# 保存失败状态
phase_wise_completion.status = 0
await phase_wise_completion.async_save()
# 阶段性办结发生异常,通知阶段性办结失败
await oa_result_notify.push_result_notify(
e.flow_token, f"{e}", e.return_code
)
except Exception as e:
# 其他异常都意味着失败
echo_log(f'阶段性办结发生错误.', logging.ERROR)
echo_log(e, logging.ERROR, is_log_exc=True)