""" 待办工单明细推送。 对应文档接口:5、推送工单详情 """ import asyncio import json from typing import Optional, Union import pandas as pd from sqlalchemy import select, desc from tornado.httpclient import HTTPResponse, HTTPRequest import dock import models from dock.oa import oa_api_request from models.govs_order_master import GovsOrderMaster from models.govs_order_detail import GovsOrderDetail from models.govs_order_user import GovsOrderUser from models.govs_push_status import GovsPushStatus from paste.core.logging import echo_log from paste.util import udict from paste.web import requests GovsOrderDetailMapping = { GovsOrderMaster.id.key: 'gdId', GovsOrderDetail.order_id.key: 'workOrderNo', GovsOrderUser.customer_name.key: 'name', GovsOrderUser.customer_sex.key: 'gender', GovsOrderUser.customer_credentials_type.key: 'documentType', GovsOrderUser.customer_credentials_no.key: 'idNumber', GovsOrderDetail.call_number.key: 'incomingCallNumber', GovsOrderDetail.call_time.key: 'callTime', GovsOrderDetail.order_source_for_view.key: 'workOrderStatus', GovsOrderUser.customer_connect_phone.key: 'contactPhoneNumber', GovsOrderDetail.belong_platform_name.key: 'acceptancePlatform', GovsOrderDetail.form_type.key: 'formType', GovsOrderDetail.case_is_visit.key: 'whetherToFollowUp', GovsOrderDetail.case_is_urgent.key: 'urgencyLevel', GovsOrderDetail.info_protect.key: 'informationProtection', GovsOrderDetail.relate_order_count.key: 'relatedWorkOrderNo', GovsOrderDetail.service_object_type.key: 'serviceObjectType', } """ 数据推送映射关系。 """ async def after_push_order_detail_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): """ 工单推送请求响应后的处理程序。 :param response: 响应对象 :param retry_queue: 重试队列 """ body = response.body.decode() echo_log(body) body_data = json.loads(body) code = udict.get_by_path(body_data, 'code') message = udict.get_by_path(body_data, 'msg') if code == 200: govs_task_id = getattr(response.request, "govs_task_id") await GovsPushStatus.set_push_order_detail_status(govs_task_id) echo_log(f"推送工单详情成功.") else: echo_log(f"推送工单详情失败:{message}") if retry_queue: echo_log(f"工单详情重试队列中有:{retry_queue.qsize()} 个请求在等待.") async def done_push_order_detail(response_list: list[HTTPResponse]): """ 推送完成工单详情后的回调 """ unique_task_ids = set() for response in response_list: task_id = getattr(response.request, 'govs_task_id', None) if task_id: unique_task_ids.add(task_id) return unique_task_ids async def push_order_detail(fetch_size: int = 50, task_id: Optional[Union[str, int, list[Union[str, int]]]] = None): """ 推送12345工单详情。 :param fetch_size: 本次推送数量 :param task_id: 待办任务 ID 可选 """ # 根据条件获取目标任务 ID 列表(支持指定 task_id 或分页获取) task_query = select(GovsOrderMaster.id).order_by(desc(GovsOrderMaster.id)) if task_id is not None: if isinstance(task_id, list): task_query = task_query.where(GovsOrderMaster.id.in_(task_id)) echo_log(f"本次推送待办列表:{task_id} 的详细数据...") else: task_query = task_query.where(GovsOrderMaster.id == task_id) echo_log(f"本次推送待办:{task_id} 的详细数据...") else: task_query = task_query.limit(fetch_size) echo_log(f"本次推送前 {fetch_size} 条待办的详细数据...") task_id_list = (await GovsOrderMaster.orm_execute_scalars(task_query)).all() task_id_list = [f"{id}" for id in task_id_list] # 查询属于这些任务的所有详细数据 detail_query = select( GovsOrderMaster.id, GovsOrderMaster.order_id, GovsOrderUser.customer_name, GovsOrderUser.customer_sex, GovsOrderDetail.call_number, GovsOrderUser.customer_connect_phone, GovsOrderDetail.belong_platform_name, GovsOrderDetail.area_code_area, GovsOrderDetail.area_code_city, GovsOrderDetail.area_code_street, GovsOrderDetail.address_detail, GovsOrderDetail.form_type, GovsOrderDetail.case_accord_type_one_name, GovsOrderDetail.case_accord_type_two_name, GovsOrderDetail.case_accord_type_three_name, GovsOrderDetail.case_is_visit, GovsOrderDetail.order_source_for_view, GovsOrderDetail.order_source, GovsOrderDetail.order_source_detail, GovsOrderDetail.case_is_urgent, GovsOrderDetail.info_protect, GovsOrderDetail.relate_order_count, GovsOrderDetail.service_object_type, GovsOrderUser.customer_credentials_type, GovsOrderUser.customer_credentials_no, GovsOrderDetail.call_time ).join( GovsOrderUser, GovsOrderMaster.id == GovsOrderUser.master_id ).join( GovsOrderDetail, GovsOrderMaster.id == GovsOrderDetail.master_id ).where( GovsOrderMaster.id.in_(task_id_list) ) govs_task_df = await GovsOrderMaster.query_as_df(detail_query) # 格式化为字符串 govs_task_df[GovsOrderMaster.id.key] = govs_task_df[GovsOrderMaster.id.key].astype(str) govs_task_df[GovsOrderDetail.relate_order_count.key] = govs_task_df[GovsOrderDetail.relate_order_count.key].astype( str) # 拼接诉求地址 govs_task_df['appealAddress'] = (govs_task_df[GovsOrderDetail.area_code_city.key] + '/' + govs_task_df[GovsOrderDetail.area_code_area.key] + '/' + govs_task_df[GovsOrderDetail.area_code_street.key] + '/' + govs_task_df[GovsOrderDetail.address_detail.key]) # 拼接诉求归口 govs_task_df['appealCategory'] = (govs_task_df[GovsOrderDetail.case_accord_type_one_name.key] + '/' + govs_task_df[GovsOrderDetail.case_accord_type_two_name.key] + '/' + govs_task_df[GovsOrderDetail.case_accord_type_three_name.key]) # 拼接诉求来源 govs_task_df['appealSource'] = (govs_task_df[GovsOrderDetail.order_source.key] + '/' + govs_task_df[GovsOrderDetail.order_source_detail.key]) # 日期转换为字符串 govs_task_df[GovsOrderDetail.call_time.key] = govs_task_df[GovsOrderDetail.call_time.key].apply( lambda x: x.strftime('%Y-%m-%d %H:%M:%S') if pd.notna(x) and hasattr(x, 'strftime') else x ) # 处理无待办工单详情状态 empty_task_ids = set(task_id_list) - set(govs_task_df[GovsOrderMaster.id.key].unique().tolist()) empty_task_data = [ { GovsPushStatus.master_id.key: govs_task_id, GovsPushStatus.push_order_status.key: 1 } for govs_task_id in list(empty_task_ids) ] empty_task_df = pd.DataFrame(empty_task_data) await GovsPushStatus.save_batch(empty_task_df) # 处理数据映射,适应接口推送 mapped_df = govs_task_df.rename(columns=GovsOrderDetailMapping) # 仅保留需要的列 mapped_df = mapped_df[ list(GovsOrderDetailMapping.values()) + ['appealAddress', 'appealCategory', 'appealSource']] # 这里把空数据都换成 None,以便存入数据库时是 null mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True) echo_log(f"正在准备请求队列...") # 构建请求队列 govs_push_queue = asyncio.Queue() # 向队列中填充请求对象 for _h, row in mapped_df.iterrows(): push_request = await oa_api_request.get_push_govs_order_detail_request(**row.to_dict()) setattr(push_request, "govs_task_id", row.get('gdId')) await govs_push_queue.put(push_request) # 并发提交推送请求 echo_log(f"开始推送工单详情数据...") pushed_task_ids = await requests.async_concurrency( govs_push_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT, after_request=after_push_order_detail_request, after_done=done_push_order_detail ) echo_log(f"工单详情推送已经完成...") return pushed_task_ids if __name__ == "__main__": from paste.core import aio_pool _runner = aio_pool.get_aio_runner() _runner(push_order_detail(fetch_size=60))