import asyncio import json from typing import Union import pandas as pd from dateutil import parser from tornado.httpclient import HTTPResponse, HTTPRequest import dock import models from dock.govs import govs_api from models.govs_order_process import GovsOrderProcess from paste.core.logging import echo_log from paste.util import udict from paste.web import requests async def get_task_request(order_id: str, order_no: str, master_id: Union[str, int], tenant_id: Union[str, int], dept_id: Union[str, int], area_code: str, sort: str = ""): """ 获取省12345任务处理过程数据。 通过 POST 请求向省12345的任务处理过程接口提交表单数据,获取任务处理过程数据。 自动注入有效的 Cookie(如 JSESSIONID)至请求头,并解析返回的 JSON 响应。 Args: order_id (str): 待办任务ID order_no (str): 待办任务号 master_id (int): 关联订单主表ID tenant_id (str, int): 租户ID dept_id (str, int): 部门ID area_code (str): 邮编 sort (str): 排序 """ api_url = f"/orderreceive/orderMaster/queryOrderProcess" request_body = { "orderId": order_id, "orderNo": order_no, "masterId": master_id, "tenantId": tenant_id, "deptId": dept_id, "areaCode": area_code, "sort": sort, } # 构造 API 请求 return await govs_api.new_api_request(api_url, request_body) async def after_task_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): """ 任务请求响应后的处理程序。 :param response: 响应对象 :param retry_queue: 重试队列 """ order_id = getattr(response.request, 'order_id') order_no = getattr(response.request, 'order_no') master_id = getattr(response.request, 'master_id') tenant_id = getattr(response.request, 'tenant_id') response_body = response.body.decode() response_data = json.loads(response_body) list_data = udict.get_by_path(response_data, 'result') task_df = pd.DataFrame(list_data) # 更换映射方向,用于将源数据列名改为与数据库表对应 forward_mapping = {dict_f: table_f for table_f, dict_f in GovsOrderProcess.FieldMapping.items()} mapped_df = task_df.rename(columns=forward_mapping) mapped_df[GovsOrderProcess.master_id.key] = master_id mapped_df[GovsOrderProcess.tenant_id.key] = tenant_id # 比较字段转字符串 mapped_df[GovsOrderProcess.id.key] = mapped_df[GovsOrderProcess.id.key].astype(str) mapped_df[GovsOrderProcess.master_id.key] = mapped_df[GovsOrderProcess.master_id.key].astype(str) # 过滤掉 id 和 order_id 为空的数据 mapped_df = mapped_df[ mapped_df[GovsOrderProcess.id.key].notna() & (mapped_df[GovsOrderProcess.id.key] != "") ] mapped_df = mapped_df[ mapped_df[GovsOrderProcess.order_id.key].notna() & (mapped_df[GovsOrderProcess.order_id.key] != "") ] # 字典转化为字符串 mapped_df[GovsOrderProcess.child_order_processes.key] = mapped_df[GovsOrderProcess.child_order_processes.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderProcess.handler_user_ids.key] = mapped_df[GovsOrderProcess.handler_user_ids.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderProcess.handler_org_ids.key] = mapped_df[GovsOrderProcess.handler_org_ids.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderProcess.next_handler_user_ids.key] = mapped_df[GovsOrderProcess.next_handler_user_ids.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderProcess.attachment_dto_list.key] = mapped_df[GovsOrderProcess.attachment_dto_list.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) # 时间字段转化为日期对象 mapped_df[GovsOrderProcess.plan_sign_time.key] = mapped_df[GovsOrderProcess.plan_sign_time.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) mapped_df[GovsOrderProcess.plan_finish_time.key] = mapped_df[GovsOrderProcess.plan_finish_time.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) mapped_df[GovsOrderProcess.plan_back_time.key] = mapped_df[GovsOrderProcess.plan_back_time.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) mapped_df[GovsOrderProcess.contact_time.key] = mapped_df[GovsOrderProcess.contact_time.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) mapped_df[GovsOrderProcess.contact_time.key] = mapped_df[GovsOrderProcess.contact_time.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) mapped_df[GovsOrderProcess.origin_plan_finish_time.key] = mapped_df[ GovsOrderProcess.origin_plan_finish_time.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) mapped_df[GovsOrderProcess.origin_plan_sign_time.key] = mapped_df[GovsOrderProcess.origin_plan_sign_time.key].apply( lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None ) # 这里把空数据都换成 None,以便存入数据库时是 null mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, None, inplace=True) _created, _updated = await GovsOrderProcess.save_batch(mapped_df) # 输出数据创建状态 echo_log( f"成功创建租户:{tenant_id} 的待办工单:{master_id}({order_id},{order_no}) 处理流程:{_created}条,更新:{_updated}条.") if retry_queue: echo_log(f"待办工单处理流程重试队列中有:{retry_queue.qsize()} 个请求在等待.") if __name__ == "__main__": from paste.core import aio_pool async def scrape(order_id: str, order_no: str, master_id: Union[str, int], tenant_id: Union[str, int], dept_id: Union[str, int], area_code: str, sort: str = ""): task_request = await get_task_request(order_id, order_no, master_id, tenant_id, dept_id, area_code, sort) setattr(task_request, 'order_id', order_id) setattr(task_request, 'order_no', order_no) setattr(task_request, 'master_id', master_id) setattr(task_request, 'tenant_id', tenant_id) request_queue = asyncio.Queue() await request_queue.put(task_request) await requests.async_concurrency( request_queue, retry=dock.MAX_RETRY_COUNT, after_request=after_task_request ) _runner = aio_pool.get_aio_runner() _runner(scrape( 'DH050826052517663', 'DH050826052517663*3', '2058851271599333378', '1773611023340371969', '1700467981117980074', '320500', ))