import asyncio import json from typing import Union import pandas as pd from dateutil import parser from tornado.httpclient import HTTPResponse, HTTPRequest import dock import models from dock.govs import govs_api from models.govs_order_attachment import GovsOrderAttachment from models.govs_order_detail import GovsOrderDetail from models.govs_order_user import GovsOrderUser from paste.core.logging import echo_log from paste.util import udict from paste.web import requests async def get_task_request(order_id: str, master_id: Union[str, int], tenant_id: Union[str, int]): """ 获取省12345任务详情数据。 通过 POST 请求向省12345的任务详情接口提交表单数据,获取任务详情数据。 自动注入有效的 Cookie(如 JSESSIONID)至请求头,并解析返回的 JSON 响应。 Args: order_id (str): 待办任务ID master_id (int): 关联订单主表ID tenant_id (int): 租户ID """ api_url = f"/orderreceive/orderMaster/queryOrderDetail" request_body = { "orderId": order_id, "masterId": master_id, "tenantId": tenant_id } # 构造 API 请求 return await govs_api.new_api_request(api_url, request_body) async def after_task_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): """ 任务请求响应后的处理程序。 :param response: 响应对象 :param retry_queue: 重试队列 """ order_id = getattr(response.request, 'order_id') order_no = getattr(response.request, 'order_no') master_id = getattr(response.request, 'master_id') tenant_id = getattr(response.request, 'tenant_id') response_body = response.body.decode() response_data = json.loads(response_body) order_detail_data = udict.get_by_path(response_data, 'result') mapped_df = pd.DataFrame([order_detail_data]) # 更换映射方向,用于将源数据列名改为与数据库表对应 forward_mapping = {dict_f: table_f for table_f, dict_f in GovsOrderDetail.FieldMapping.items()} mapped_df = mapped_df.rename(columns=forward_mapping) # 把数组和字典转换为json字符串 mapped_df[GovsOrderDetail.order_custom_form_fields.key] = mapped_df[ GovsOrderDetail.order_custom_form_fields.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderDetail.order_phone_dto.key] = mapped_df[GovsOrderDetail.order_phone_dto.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderDetail.order_user.key] = mapped_df[GovsOrderDetail.order_user.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderDetail.order_attachment_list.key] = mapped_df[GovsOrderDetail.order_attachment_list.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderDetail.pre_process_list.key] = mapped_df[GovsOrderDetail.pre_process_list.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderDetail.tripartite_call_records_list.key] = mapped_df[ GovsOrderDetail.tripartite_call_records.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderDetail.plan_finish_time.key] = mapped_df[GovsOrderDetail.plan_finish_time.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) mapped_df[GovsOrderDetail.order_finish_time.key] = mapped_df[GovsOrderDetail.order_finish_time.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) mapped_df[GovsOrderDetail.plan_sign_time.key] = mapped_df[GovsOrderDetail.plan_sign_time.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) # 这里把空数据都换成 None,以便存入数据库时是 null mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, None, inplace=True) _created, _updated = await GovsOrderDetail.save_batch(mapped_df) # 存储用户信息 user_data = udict.get_by_path(response_data, 'result.orderUser') if user_data: user_df = pd.DataFrame([user_data]) # 更换映射方向,用于将源数据列名改为与数据库表对应 forward_mapping = {dict_f: table_f for table_f, dict_f in GovsOrderUser.FieldMapping.items()} user_mapped_df = user_df.rename(columns=forward_mapping) # 比较字段转字符串 user_mapped_df[GovsOrderUser.id.key] = user_mapped_df[GovsOrderUser.id.key].astype(str) user_mapped_df[GovsOrderUser.master_id.key] = user_mapped_df[GovsOrderUser.master_id.key].astype(str) # 转换日期时间 user_mapped_df[GovsOrderUser.created_at.key] = user_mapped_df[GovsOrderUser.created_at.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) user_mapped_df[GovsOrderUser.updated_at.key] = user_mapped_df[GovsOrderUser.updated_at.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) # 这里把空数据都换成 None,以便存入数据库时是 null user_mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, None, inplace=True) await GovsOrderUser.save_batch(user_mapped_df) # 存储附件信息 attachment_list = udict.get_by_path(response_data, 'result.orderAttachmentList') if attachment_list: attachment_df = pd.DataFrame(attachment_list) # 更换映射方向,用于将源数据列名改为与数据库表对应 forward_mapping = {dict_f: table_f for table_f, dict_f in GovsOrderAttachment.FieldMapping.items()} attachment_mapped_df = attachment_df.rename(columns=forward_mapping) attachment_mapped_df[GovsOrderAttachment.master_id.key] = master_id attachment_mapped_df[GovsOrderAttachment.order_id.key] = order_id # 比较字段转字符串 attachment_mapped_df[GovsOrderAttachment.id.key] = attachment_mapped_df[GovsOrderAttachment.id.key].astype(str) attachment_mapped_df[GovsOrderAttachment.master_id.key] = attachment_mapped_df[GovsOrderAttachment.master_id.key].astype(str) # 这里把空数据都换成 None,以便存入数据库时是 null attachment_mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, None, inplace=True) await GovsOrderAttachment.save_batch(attachment_mapped_df) # 输出数据创建状态 echo_log(f"成功创建租户:{tenant_id} 的待办工单:{master_id}({order_id},{order_no}) 详情.") if retry_queue: echo_log(f"待办工单详情重试队列中有:{retry_queue.qsize()} 个请求在等待.") if __name__ == "__main__": from paste.core import aio_pool async def scrape(order_id: Union[str, int], master_id: Union[str, int], tenant_id: Union[str, int], order_no: Union[str, int], ): task_request = await get_task_request(order_id, master_id, tenant_id) setattr(task_request, 'order_id', order_id) setattr(task_request, 'master_id', master_id) setattr(task_request, 'tenant_id', tenant_id) setattr(task_request, 'order_no', order_no) request_queue = asyncio.Queue() await request_queue.put(task_request) await requests.async_concurrency( request_queue, retry=dock.MAX_RETRY_COUNT, after_request=after_task_request ) _runner = aio_pool.get_aio_runner() _runner(scrape('DH050826052517663', '2058851271599333378', '1773611023340371969', 'DH050826052517663*3'))