Files
d3i-szct/dock/oa_govs/govs_push_process.py
2026-06-02 17:46:38 +08:00

261 lines
11 KiB
Python

"""
待办工单办理过程推送。
对应文档接口:5、推送工单处理流程列表
"""
import asyncio
import io
import json
from typing import Optional, Union
import pandas as pd
from sqlalchemy import select, desc
from tornado.httpclient import HTTPResponse, HTTPRequest
import dock
import models
from dock.oa import oa_api_request
from dock.govs import govs_download_file
from models.govs_order_master import GovsOrderMaster
from models.govs_order_process import GovsOrderProcess
from models.govs_push_status import GovsPushStatus
from paste.core.logging import echo_log
from paste.util import udict
from paste.web import requests
GovsOrderProcessMapping = {
GovsOrderProcess.id.key: 'id',
GovsOrderProcess.master_id.key: 'parentId',
GovsOrderProcess.action_name.key: 'processingSteps',
GovsOrderProcess.handler_org_names.key: 'processingDepartment',
GovsOrderProcess.handler_user_names.key: 'processor',
GovsOrderProcess.deal_type.key: 'processingMethod',
GovsOrderProcess.next_org_names.key: 'receivingDepartment',
GovsOrderProcess.next_handler_user_names.key: 'receiver',
GovsOrderProcess.adv_content.key: 'processingOpinion',
GovsOrderProcess.deal_date.key: 'processingTime',
GovsOrderProcess.plan_finish_time.key: 'plannedCompletionTime',
GovsOrderProcess.remarks.key: 'remarks',
GovsOrderProcess.attachment_dto_list.key: 'fileIdList'
}
"""
数据推送映射关系。
"""
async def after_push_order_process_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
"""
工单推送请求响应后的处理程序。
:param response: 响应对象
:param retry_queue: 重试队列
"""
body = response.body.decode()
echo_log(body)
body_data = json.loads(body)
code = udict.get_by_path(body_data, 'code')
message = udict.get_by_path(body_data, 'msg')
if code == 200:
govs_task_id = getattr(response.request, "govs_task_id")
await GovsPushStatus.set_push_order_process_status(govs_task_id)
echo_log(f"推送工单办理过程成功.")
else:
echo_log(f"推送工单办理过程失败:{message}")
if retry_queue:
echo_log(f"工单办理过程重试队列中有:{retry_queue.qsize()} 个请求在等待.")
async def done_push_order_process(response_list: list[HTTPResponse]):
"""
推送完成工单办理过程后的回调
"""
unique_task_ids = set()
for response in response_list:
task_id = getattr(response.request, 'govs_task_id', None)
if task_id:
unique_task_ids.add(task_id)
return unique_task_ids
async def done_attachment_download(response_list: list[HTTPResponse]):
"""
所有附件下载完成执行的处理程序。
:param response_list: 附件下载响应列表
:return: 返回附件字典列表,每个元素包含文件名和io对象
"""
downloaded_attachments = []
for response in response_list:
file_name = getattr(response.request, "file_name", "file")
file_io = io.BytesIO(response.body)
downloaded_attachments.append({
"file_name": file_name, "file_io": file_io
})
return downloaded_attachments
async def done_attachment_upload(response_list: list[HTTPResponse]):
"""
所有附件上传到OA完成后执行的处理程序
:param response_list: 附件下载响应列表
:return: 返回文件id列表
"""
file_ids = []
for response in response_list:
body = response.body.decode()
echo_log(body)
body_data = json.loads(body)
nas = udict.get_by_path(body_data, 'n_a_s')
if nas:
oa_media_id = body_data.get('atts', [])[0].get('fileUrl')
file_ids.append(oa_media_id)
echo_log(f"省12345的办理过程的附件上传成功.")
else:
echo_log(f"省12345的办理过程的附件上传失败.")
return file_ids
async def download_and_upload_attachment(attachment_list: list):
"""
从省12345下载附件,并上传到OA
:param attachment_list: 附件信息列表
:return: 返回上传OA成功的文件id列表
"""
download_queue = asyncio.Queue()
for attachment in attachment_list:
download_request = await govs_download_file.get_download_request('1773611023340371969', attachment['filePath'])
setattr(download_request, 'file_name', attachment['attachName'])
await download_queue.put(download_request)
downloaded_attachments = await requests.async_concurrency(download_queue, con_count=dock.CONCURRENCY_COUNT,
retry=dock.MAX_RETRY_COUNT,
after_done=done_attachment_download)
oa_upload_queue = asyncio.Queue()
for downloaded_attachment in downloaded_attachments:
upload_request = await oa_api_request.get_upload_request(downloaded_attachment['file_io'],
downloaded_attachment['file_name'], False)
await oa_upload_queue.put(upload_request)
uploaded_attachment_ids = await requests.async_concurrency(oa_upload_queue, con_count=dock.CONCURRENCY_COUNT,
retry=dock.MAX_RETRY_COUNT,
after_done=done_attachment_upload)
return uploaded_attachment_ids
async def push_order_process(fetch_size: int = 50,
task_id: Optional[Union[str, int, list[Union[str, int]]]] = None):
"""
推送12345工单办理过程。
:param fetch_size: 本次推送数量
:param task_id: 待办任务 ID 可选
"""
# 根据条件获取目标任务 ID 列表(支持指定 task_id 或分页获取)
task_query = select(GovsOrderMaster.id).order_by(desc(GovsOrderMaster.id))
if task_id is not None:
if isinstance(task_id, list):
task_query = task_query.where(GovsOrderMaster.id.in_(task_id))
echo_log(f"本次推送待办列表:{task_id} 的详细数据...")
else:
task_query = task_query.where(GovsOrderMaster.id == task_id)
echo_log(f"本次推送待办:{task_id} 的详细数据...")
else:
task_query = task_query.limit(fetch_size)
echo_log(f"本次推送前 {fetch_size} 条待办的详细数据...")
govs_task_df = await GovsOrderProcess.query_as_df(task_query)
# 预处理数据方法
def preprocess(df: pd.DataFrame):
# 格式化为字符串
df[GovsOrderProcess.id.key] = df[GovsOrderProcess.id.key].astype(str)
df[GovsOrderProcess.master_id.key] = df[GovsOrderProcess.master_id.key].astype(
str)
# 日期转换为字符串
df[GovsOrderProcess.deal_date.key] = df[GovsOrderProcess.deal_date.key].apply(
lambda x: x.strftime('%Y-%m-%d %H:%M:%S') if pd.notna(x) and hasattr(x, 'strftime') else ''
)
df[GovsOrderProcess.plan_finish_time.key] = df[GovsOrderProcess.plan_finish_time.key].apply(
lambda x: x.strftime('%Y-%m-%d %H:%M:%S') if pd.notna(x) and hasattr(x, 'strftime') else ''
)
# 更名,并仅保留需要的列
df = df.rename(columns=GovsOrderProcessMapping)
df = df[list(GovsOrderProcessMapping.values())]
df[GovsOrderProcess.master_id.key] = df['parentId']
return df
# 填充处理过程
await GovsOrderProcess.fill_process_info(govs_task_df, column_name='processLogBOList', preprocessing=preprocess)
# 处理无待办工单处理过程状态
empty_govs_task_df = govs_task_df[govs_task_df['processLogBOList'].apply(lambda x: len(x) == 0)]
empty_govs_task_df[GovsPushStatus.master_id.key] = empty_govs_task_df[GovsOrderMaster.id.key]
empty_govs_task_df[GovsPushStatus.push_order_process_status.key] = 1
empty_govs_task_df = empty_govs_task_df[
[GovsPushStatus.master_id.key, GovsPushStatus.push_order_process_status.key]]
await GovsPushStatus.save_batch(empty_govs_task_df)
# 过滤空数组
full_govs_task_df = govs_task_df[govs_task_df['processLogBOList'].apply(lambda x: len(x) > 0)]
# 删除 processLogBOList内的工单id字段
def remove_govs_task_id(data_list):
for item in data_list:
if isinstance(item, dict) and GovsOrderProcess.master_id.key in item:
del item[GovsOrderProcess.master_id.key]
return data_list
# 执行替换
full_govs_task_df['processLogBOList'] = full_govs_task_df['processLogBOList'].apply(remove_govs_task_id)
# 处理数据映射,适应接口推送
mapped_df = full_govs_task_df.rename(columns={GovsOrderMaster.id.key: 'gdId'})
mapped_df['gdId'] = mapped_df['gdId'].astype(str)
# 这里把空数据都换成 None,以便存入数据库时是 null
mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
echo_log(f"正在准备请求队列...")
# 构建请求队列
push_queue = asyncio.Queue()
# 向队列中填充请求对象
for _h, row in mapped_df.iterrows():
row_data = row.to_dict()
process_list = row_data['processLogBOList']
for i in range(len(process_list)):
attachment_list = process_list[i].get('fileIdList')
if not attachment_list:
process_list[i]['fileIdList'] = []
else:
if isinstance(attachment_list, str):
try:
attachment_list = json.loads(attachment_list)
# 从省12345下载附件,然后上传到OA,获取id列表
oa_file_ids = await download_and_upload_attachment(attachment_list)
process_list[i]['fileIdList'] = oa_file_ids
except Exception as e:
echo_log(f'下载省12345的附件并上传到OA时遇到错误:{e}', is_log_exc=True)
process_list[i]['fileIdList'] = []
else:
process_list[i]['fileIdList'] = []
push_request = await oa_api_request.get_push_gov_process_request(**row_data)
setattr(push_request, "govs_task_id", row.get('gdId'))
await push_queue.put(push_request)
# 并发提交推送请求
echo_log(f"开始推送待办过程数据...")
pushed_task_ids = await requests.async_concurrency(
push_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_push_order_process_request, after_done=done_push_order_process
)
echo_log(f"待办过程数据推送已经完成...")
return pushed_task_ids
if __name__ == "__main__":
from paste.core import aio_pool
_runner = aio_pool.get_aio_runner()
_runner(push_order_process(fetch_size=60))