import asyncio import json from typing import Union import numpy as np import pandas as pd from tornado.httpclient import HTTPResponse, HTTPRequest import dock import models from dock.govs import govs_api from models.govs_order_master import GovsOrderMaster from paste.core.logging import echo_log from paste.util import udict from paste.web import requests async def get_task_request(order_id: Union[str, int] = '', dept_page_tag: int = 1, current_page: int = 1, num_per_page: int = 200): """ 获取省12345任务列表数据。 通过 POST 请求向省12345的任务列表接口提交表单数据,获取任务分页数据。 Args: order_id (int): 任务列表类型 ID,默认为企业待办:600058 dept_page_tag (int): 分页标志 current_page (int): 当前页码 num_per_page (int): 每页显示数据量,默认 200 """ api_url = f"/orderhandler/taskQuery/getDeptAllToDoOrderProcess" request_body = { "data": { "deptPageTag": dept_page_tag, "orderId": f"{order_id}", "keyWord": "", "andOrFlag": "0", "serviceObjectType": [], "callNumber": "", "orderSource": [], "orderSourceDetailList": [], "signedStatus": [], "firstOrderStatus": [], "secordOrderStatus": [], "status": [], "overDue": "", "existQuotoInfo": [], "isSupervise": [], "planFinishTime": "", "caseIsUrgent": [], "areaCodeCity": "", "areaCodeArea": "", "areaCodeStreet": "", "addressDetail": "", "infoProtect": [], "firstLevelAffiliations": [], "secondLevelAffiliations": [], "thirdLevelAffiliations": [], "fourthLevelAffiliations": [], "fifthLevelAffiliations": [], "caseAccordTypeOneNames": [], "caseAccordTypeTwoNames": [], "caseAccordTypeThreeNames": [], "caseAccordTypeFourNames": [], "caseAccordTypeFiveNames": [], "creatorId": "", "assigneeUserId": "", "callTimeEnd": "", "callTimeFrom": "", "caseLabels": [], "contactNumber": "", "createBy": "", "deptName": "", "deptType": "", "fileExist": [], "hotspot": [], "claimStatus": "", "orderSourceDetail": "", "orderType": [], "orgName": [], "sortField": "", "sortRule": "", "actionName": "", "returnReasonNameList": [], "createDateFrom": "", "createDateEnd": "", "planBackTimeStart": "", "planBackTimeEnd": "", "planFinishTimeStart": "", "planFinishTimeEnd": "" }, "pageSize": num_per_page, "pageNum": current_page } # 构造 API 请求 return await govs_api.new_api_request(api_url, request_body) async def after_task_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): """ 任务请求响应后的处理程序。 :param response: 响应对象 :param retry_queue: 重试队列 """ response_body = response.body.decode() response_data = json.loads(response_body) list_data: list[dict] = udict.get_by_path(response_data, 'data.list') order_master_list: list[dict] = [] for d in list_data: order_master_dto = d.get('orderMasterDTO') order_master_dto['nextTaskId'] = d.get('nextTaskId') order_master_dto['claimStatus'] = d.get('claimStatus') order_master_list.append(order_master_dto) if order_master_list: mapped_df = pd.DataFrame(order_master_list) # 更换映射方向,用于将源数据列名改为与数据库表对应 forward_mapping = {dict_f: table_f for table_f, dict_f in GovsOrderMaster.FieldMapping.items()} mapped_df = mapped_df.rename(columns=forward_mapping) # 把数组转换为 JSON 字符串 mapped_df[GovsOrderMaster.attachment_list.key] = mapped_df[GovsOrderMaster.attachment_list.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) mapped_df[GovsOrderMaster.back_count.key] = mapped_df[GovsOrderMaster.back_count.key].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None ) # 根据claim_status字段,更新govs_sign字段 mapped_df[GovsOrderMaster.govs_sign.key] = np.where( mapped_df[GovsOrderMaster.claim_status.key] == '已签收', 1, 0 ) # 这里把空数据都换成 None,以便存入数据库时是 null mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, None, inplace=True) # 筛选数据状态 _created, _updated = await GovsOrderMaster.save_batch(mapped_df) echo_log(f"成功创建企业待办:{_created}条,更新:{_updated}条.") else: echo_log('未获取到企业待办数据') if retry_queue: echo_log(f"企业待办重试队列中有:{retry_queue.qsize()} 个请求在等待.") if __name__ == "__main__": from paste.core import aio_pool async def scrape(): task_request = await get_task_request(dept_page_tag=0, order_id='DH058726052903006') request_queue = asyncio.Queue() await request_queue.put(task_request) await requests.async_concurrency( request_queue, retry=dock.MAX_RETRY_COUNT, after_request=after_task_request ) _runner = aio_pool.get_aio_runner() _runner(scrape())