Files
d3i-szct/dock/govs/govs_scrape_order_master.py
2026-06-02 17:46:38 +08:00

157 lines
5.8 KiB
Python

import asyncio
import json
from typing import Union
import numpy as np
import pandas as pd
from tornado.httpclient import HTTPResponse, HTTPRequest
import dock
import models
from dock.govs import govs_api
from models.govs_order_master import GovsOrderMaster
from paste.core.logging import echo_log
from paste.util import udict
from paste.web import requests
async def get_task_request(order_id: Union[str, int] = '', dept_page_tag: int = 1,
current_page: int = 1, num_per_page: int = 200):
"""
获取省12345任务列表数据。
通过 POST 请求向省12345的任务列表接口提交表单数据,获取任务分页数据。
Args:
order_id (int): 任务列表类型 ID,默认为企业待办:600058
dept_page_tag (int): 分页标志
current_page (int): 当前页码
num_per_page (int): 每页显示数据量,默认 200
"""
api_url = f"/orderhandler/taskQuery/getDeptAllToDoOrderProcess"
request_body = {
"data": {
"deptPageTag": dept_page_tag,
"orderId": f"{order_id}",
"keyWord": "",
"andOrFlag": "0",
"serviceObjectType": [],
"callNumber": "",
"orderSource": [],
"orderSourceDetailList": [],
"signedStatus": [],
"firstOrderStatus": [],
"secordOrderStatus": [],
"status": [],
"overDue": "",
"existQuotoInfo": [],
"isSupervise": [],
"planFinishTime": "",
"caseIsUrgent": [],
"areaCodeCity": "",
"areaCodeArea": "",
"areaCodeStreet": "",
"addressDetail": "",
"infoProtect": [],
"firstLevelAffiliations": [],
"secondLevelAffiliations": [],
"thirdLevelAffiliations": [],
"fourthLevelAffiliations": [],
"fifthLevelAffiliations": [],
"caseAccordTypeOneNames": [],
"caseAccordTypeTwoNames": [],
"caseAccordTypeThreeNames": [],
"caseAccordTypeFourNames": [],
"caseAccordTypeFiveNames": [],
"creatorId": "",
"assigneeUserId": "",
"callTimeEnd": "",
"callTimeFrom": "",
"caseLabels": [],
"contactNumber": "",
"createBy": "",
"deptName": "",
"deptType": "",
"fileExist": [],
"hotspot": [],
"claimStatus": "",
"orderSourceDetail": "",
"orderType": [],
"orgName": [],
"sortField": "",
"sortRule": "",
"actionName": "",
"returnReasonNameList": [],
"createDateFrom": "",
"createDateEnd": "",
"planBackTimeStart": "",
"planBackTimeEnd": "",
"planFinishTimeStart": "",
"planFinishTimeEnd": ""
},
"pageSize": num_per_page,
"pageNum": current_page
}
# 构造 API 请求
return await govs_api.new_api_request(api_url, request_body)
async def after_task_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
"""
任务请求响应后的处理程序。
:param response: 响应对象
:param retry_queue: 重试队列
"""
response_body = response.body.decode()
response_data = json.loads(response_body)
list_data: list[dict] = udict.get_by_path(response_data, 'data.list')
order_master_list: list[dict] = []
for d in list_data:
order_master_dto = d.get('orderMasterDTO')
order_master_dto['nextTaskId'] = d.get('nextTaskId')
order_master_dto['claimStatus'] = d.get('claimStatus')
order_master_list.append(order_master_dto)
if order_master_list:
mapped_df = pd.DataFrame(order_master_list)
# 更换映射方向,用于将源数据列名改为与数据库表对应
forward_mapping = {dict_f: table_f for table_f, dict_f in GovsOrderMaster.FieldMapping.items()}
mapped_df = mapped_df.rename(columns=forward_mapping)
# 把数组转换为 JSON 字符串
mapped_df[GovsOrderMaster.attachment_list.key] = mapped_df[GovsOrderMaster.attachment_list.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderMaster.back_count.key] = mapped_df[GovsOrderMaster.back_count.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
# 根据claim_status字段,更新govs_sign字段
mapped_df[GovsOrderMaster.govs_sign.key] = np.where(
mapped_df[GovsOrderMaster.claim_status.key] == '已签收', 1, 0
)
# 这里把空数据都换成 None,以便存入数据库时是 null
mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, None, inplace=True)
# 筛选数据状态
_created, _updated = await GovsOrderMaster.save_batch(mapped_df)
echo_log(f"成功创建企业待办:{_created}条,更新:{_updated}条.")
else:
echo_log('未获取到企业待办数据')
if retry_queue:
echo_log(f"企业待办重试队列中有:{retry_queue.qsize()} 个请求在等待.")
if __name__ == "__main__":
from paste.core import aio_pool
async def scrape():
task_request = await get_task_request(dept_page_tag=0, order_id='DH058726052903006')
request_queue = asyncio.Queue()
await request_queue.put(task_request)
await requests.async_concurrency(
request_queue, retry=dock.MAX_RETRY_COUNT,
after_request=after_task_request
)
_runner = aio_pool.get_aio_runner()
_runner(scrape())