""" 待办工单办理过程推送。 对应文档接口:5、推送工单处理流程列表 """ import asyncio import io import json from typing import Optional, Union import pandas as pd from sqlalchemy import select, desc from tornado.httpclient import HTTPResponse, HTTPRequest import dock import models from dock.oa import oa_api_request from dock.govs import govs_download_file from models.govs_order_master import GovsOrderMaster from models.govs_order_process import GovsOrderProcess from models.govs_push_status import GovsPushStatus from paste.core.logging import echo_log from paste.util import udict from paste.web import requests GovsOrderProcessMapping = { GovsOrderProcess.id.key: 'id', GovsOrderProcess.master_id.key: 'parentId', GovsOrderProcess.action_name.key: 'processingSteps', GovsOrderProcess.handler_org_names.key: 'processingDepartment', GovsOrderProcess.handler_user_names.key: 'processor', GovsOrderProcess.deal_type.key: 'processingMethod', GovsOrderProcess.next_org_names.key: 'receivingDepartment', GovsOrderProcess.next_handler_user_names.key: 'receiver', GovsOrderProcess.adv_content.key: 'processingOpinion', GovsOrderProcess.deal_date.key: 'processingTime', GovsOrderProcess.plan_finish_time.key: 'plannedCompletionTime', GovsOrderProcess.remarks.key: 'remarks', GovsOrderProcess.attachment_dto_list.key: 'fileIdList' } """ 数据推送映射关系。 """ async def after_push_order_process_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): """ 工单推送请求响应后的处理程序。 :param response: 响应对象 :param retry_queue: 重试队列 """ body = response.body.decode() echo_log(body) body_data = json.loads(body) code = udict.get_by_path(body_data, 'code') message = udict.get_by_path(body_data, 'msg') if code == 200: govs_task_id = getattr(response.request, "govs_task_id") await GovsPushStatus.set_push_order_process_status(govs_task_id) echo_log(f"推送工单办理过程成功.") else: echo_log(f"推送工单办理过程失败:{message}") if retry_queue: echo_log(f"工单办理过程重试队列中有:{retry_queue.qsize()} 个请求在等待.") async def done_push_order_process(response_list: list[HTTPResponse]): """ 推送完成工单办理过程后的回调 """ unique_task_ids = set() for response in response_list: task_id = getattr(response.request, 'govs_task_id', None) if task_id: unique_task_ids.add(task_id) return unique_task_ids async def done_attachment_download(response_list: list[HTTPResponse]): """ 所有附件下载完成执行的处理程序。 :param response_list: 附件下载响应列表 :return: 返回附件字典列表,每个元素包含文件名和io对象 """ downloaded_attachments = [] for response in response_list: file_name = getattr(response.request, "file_name", "file") file_io = io.BytesIO(response.body) downloaded_attachments.append({ "file_name": file_name, "file_io": file_io }) return downloaded_attachments async def done_attachment_upload(response_list: list[HTTPResponse]): """ 所有附件上传到OA完成后执行的处理程序 :param response_list: 附件下载响应列表 :return: 返回文件id列表 """ file_ids = [] for response in response_list: body = response.body.decode() echo_log(body) body_data = json.loads(body) nas = udict.get_by_path(body_data, 'n_a_s') if nas: oa_media_id = body_data.get('atts', [])[0].get('fileUrl') file_ids.append(oa_media_id) echo_log(f"省12345的办理过程的附件上传成功.") else: echo_log(f"省12345的办理过程的附件上传失败.") return file_ids async def download_and_upload_attachment(attachment_list: list): """ 从省12345下载附件,并上传到OA :param attachment_list: 附件信息列表 :return: 返回上传OA成功的文件id列表 """ download_queue = asyncio.Queue() for attachment in attachment_list: download_request = await govs_download_file.get_download_request('1773611023340371969', attachment['filePath']) setattr(download_request, 'file_name', attachment['attachName']) await download_queue.put(download_request) downloaded_attachments = await requests.async_concurrency(download_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_done=done_attachment_download) oa_upload_queue = asyncio.Queue() for downloaded_attachment in downloaded_attachments: upload_request = await oa_api_request.get_upload_request(downloaded_attachment['file_io'], downloaded_attachment['file_name'], False) await oa_upload_queue.put(upload_request) uploaded_attachment_ids = await requests.async_concurrency(oa_upload_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_done=done_attachment_upload) return uploaded_attachment_ids async def push_order_process(fetch_size: int = 50, task_id: Optional[Union[str, int, list[Union[str, int]]]] = None): """ 推送12345工单办理过程。 :param fetch_size: 本次推送数量 :param task_id: 待办任务 ID 可选 """ # 根据条件获取目标任务 ID 列表(支持指定 task_id 或分页获取) task_query = select(GovsOrderMaster.id).order_by(desc(GovsOrderMaster.id)) if task_id is not None: if isinstance(task_id, list): task_query = task_query.where(GovsOrderMaster.id.in_(task_id)) echo_log(f"本次推送待办列表:{task_id} 的详细数据...") else: task_query = task_query.where(GovsOrderMaster.id == task_id) echo_log(f"本次推送待办:{task_id} 的详细数据...") else: task_query = task_query.limit(fetch_size) echo_log(f"本次推送前 {fetch_size} 条待办的详细数据...") govs_task_df = await GovsOrderProcess.query_as_df(task_query) # 预处理数据方法 def preprocess(df: pd.DataFrame): # 格式化为字符串 df[GovsOrderProcess.id.key] = df[GovsOrderProcess.id.key].astype(str) df[GovsOrderProcess.master_id.key] = df[GovsOrderProcess.master_id.key].astype( str) # 日期转换为字符串 df[GovsOrderProcess.deal_date.key] = df[GovsOrderProcess.deal_date.key].apply( lambda x: x.strftime('%Y-%m-%d %H:%M:%S') if pd.notna(x) and hasattr(x, 'strftime') else '' ) df[GovsOrderProcess.plan_finish_time.key] = df[GovsOrderProcess.plan_finish_time.key].apply( lambda x: x.strftime('%Y-%m-%d %H:%M:%S') if pd.notna(x) and hasattr(x, 'strftime') else '' ) # 更名,并仅保留需要的列 df = df.rename(columns=GovsOrderProcessMapping) df = df[list(GovsOrderProcessMapping.values())] df[GovsOrderProcess.master_id.key] = df['parentId'] return df # 填充处理过程 await GovsOrderProcess.fill_process_info(govs_task_df, column_name='processLogBOList', preprocessing=preprocess) # 处理无待办工单处理过程状态 empty_govs_task_df = govs_task_df[govs_task_df['processLogBOList'].apply(lambda x: len(x) == 0)] empty_govs_task_df[GovsPushStatus.master_id.key] = empty_govs_task_df[GovsOrderMaster.id.key] empty_govs_task_df[GovsPushStatus.push_order_process_status.key] = 1 empty_govs_task_df = empty_govs_task_df[ [GovsPushStatus.master_id.key, GovsPushStatus.push_order_process_status.key]] await GovsPushStatus.save_batch(empty_govs_task_df) # 过滤空数组 full_govs_task_df = govs_task_df[govs_task_df['processLogBOList'].apply(lambda x: len(x) > 0)] # 删除 processLogBOList内的工单id字段 def remove_govs_task_id(data_list): for item in data_list: if isinstance(item, dict) and GovsOrderProcess.master_id.key in item: del item[GovsOrderProcess.master_id.key] return data_list # 执行替换 full_govs_task_df['processLogBOList'] = full_govs_task_df['processLogBOList'].apply(remove_govs_task_id) # 处理数据映射,适应接口推送 mapped_df = full_govs_task_df.rename(columns={GovsOrderMaster.id.key: 'gdId'}) mapped_df['gdId'] = mapped_df['gdId'].astype(str) # 这里把空数据都换成 None,以便存入数据库时是 null mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True) echo_log(f"正在准备请求队列...") # 构建请求队列 push_queue = asyncio.Queue() # 向队列中填充请求对象 for _h, row in mapped_df.iterrows(): row_data = row.to_dict() process_list = row_data['processLogBOList'] for i in range(len(process_list)): attachment_list = process_list[i].get('fileIdList') if not attachment_list: process_list[i]['fileIdList'] = [] else: if isinstance(attachment_list, str): try: attachment_list = json.loads(attachment_list) # 从省12345下载附件,然后上传到OA,获取id列表 oa_file_ids = await download_and_upload_attachment(attachment_list) process_list[i]['fileIdList'] = oa_file_ids except Exception as e: echo_log(f'下载省12345的附件并上传到OA时遇到错误:{e}', is_log_exc=True) process_list[i]['fileIdList'] = [] else: process_list[i]['fileIdList'] = [] push_request = await oa_api_request.get_push_gov_process_request(**row_data) setattr(push_request, "govs_task_id", row.get('gdId')) await push_queue.put(push_request) # 并发提交推送请求 echo_log(f"开始推送待办过程数据...") pushed_task_ids = await requests.async_concurrency( push_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_request=after_push_order_process_request, after_done=done_push_order_process ) echo_log(f"待办过程数据推送已经完成...") return pushed_task_ids if __name__ == "__main__": from paste.core import aio_pool _runner = aio_pool.get_aio_runner() _runner(push_order_process(fetch_size=60))