Files
d3i-szct/dock/govs/govs_scrape_order_process.py
2026-06-02 17:46:38 +08:00

155 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from typing import Union
import pandas as pd
from dateutil import parser
from tornado.httpclient import HTTPResponse, HTTPRequest
import dock
import models
from dock.govs import govs_api
from models.govs_order_process import GovsOrderProcess
from paste.core.logging import echo_log
from paste.util import udict
from paste.web import requests
async def get_task_request(order_id: str, order_no: str, master_id: Union[str, int],
tenant_id: Union[str, int], dept_id: Union[str, int], area_code: str,
sort: str = ""):
"""
获取省12345任务处理过程数据。
通过 POST 请求向省12345的任务处理过程接口提交表单数据,获取任务处理过程数据。
自动注入有效的 Cookie(如 JSESSIONID)至请求头,并解析返回的 JSON 响应。
Args:
order_id (str): 待办任务ID
order_no (str): 待办任务号
master_id (int): 关联订单主表ID
tenant_id (str, int): 租户ID
dept_id (str, int): 部门ID
area_code (str): 邮编
sort (str): 排序
"""
api_url = f"/orderreceive/orderMaster/queryOrderProcess"
request_body = {
"orderId": order_id,
"orderNo": order_no,
"masterId": master_id,
"tenantId": tenant_id,
"deptId": dept_id,
"areaCode": area_code,
"sort": sort,
}
# 构造 API 请求
return await govs_api.new_api_request(api_url, request_body)
async def after_task_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
"""
任务请求响应后的处理程序。
:param response: 响应对象
:param retry_queue: 重试队列
"""
order_id = getattr(response.request, 'order_id')
order_no = getattr(response.request, 'order_no')
master_id = getattr(response.request, 'master_id')
tenant_id = getattr(response.request, 'tenant_id')
response_body = response.body.decode()
response_data = json.loads(response_body)
list_data = udict.get_by_path(response_data, 'result')
task_df = pd.DataFrame(list_data)
# 更换映射方向,用于将源数据列名改为与数据库表对应
forward_mapping = {dict_f: table_f for table_f, dict_f in GovsOrderProcess.FieldMapping.items()}
mapped_df = task_df.rename(columns=forward_mapping)
mapped_df[GovsOrderProcess.master_id.key] = master_id
mapped_df[GovsOrderProcess.tenant_id.key] = tenant_id
# 比较字段转字符串
mapped_df[GovsOrderProcess.id.key] = mapped_df[GovsOrderProcess.id.key].astype(str)
mapped_df[GovsOrderProcess.master_id.key] = mapped_df[GovsOrderProcess.master_id.key].astype(str)
# 过滤掉 id 和 order_id 为空的数据
mapped_df = mapped_df[
mapped_df[GovsOrderProcess.id.key].notna() & (mapped_df[GovsOrderProcess.id.key] != "")
]
mapped_df = mapped_df[
mapped_df[GovsOrderProcess.order_id.key].notna() & (mapped_df[GovsOrderProcess.order_id.key] != "")
]
# 字典转化为字符串
mapped_df[GovsOrderProcess.child_order_processes.key] = mapped_df[GovsOrderProcess.child_order_processes.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderProcess.handler_user_ids.key] = mapped_df[GovsOrderProcess.handler_user_ids.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderProcess.handler_org_ids.key] = mapped_df[GovsOrderProcess.handler_org_ids.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderProcess.next_handler_user_ids.key] = mapped_df[GovsOrderProcess.next_handler_user_ids.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderProcess.attachment_dto_list.key] = mapped_df[GovsOrderProcess.attachment_dto_list.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
# 时间字段转化为日期对象
mapped_df[GovsOrderProcess.plan_sign_time.key] = mapped_df[GovsOrderProcess.plan_sign_time.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
mapped_df[GovsOrderProcess.plan_finish_time.key] = mapped_df[GovsOrderProcess.plan_finish_time.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
mapped_df[GovsOrderProcess.plan_back_time.key] = mapped_df[GovsOrderProcess.plan_back_time.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
mapped_df[GovsOrderProcess.contact_time.key] = mapped_df[GovsOrderProcess.contact_time.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
mapped_df[GovsOrderProcess.contact_time.key] = mapped_df[GovsOrderProcess.contact_time.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
mapped_df[GovsOrderProcess.origin_plan_finish_time.key] = mapped_df[
GovsOrderProcess.origin_plan_finish_time.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
mapped_df[GovsOrderProcess.origin_plan_sign_time.key] = mapped_df[GovsOrderProcess.origin_plan_sign_time.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
# 这里把空数据都换成 None,以便存入数据库时是 null
mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, None, inplace=True)
_created, _updated = await GovsOrderProcess.save_batch(mapped_df)
# 输出数据创建状态
echo_log(
f"成功创建租户:{tenant_id} 的待办工单:{master_id}({order_id}{order_no}) 处理流程:{_created}条,更新:{_updated}条.")
if retry_queue:
echo_log(f"待办工单处理流程重试队列中有:{retry_queue.qsize()} 个请求在等待.")
if __name__ == "__main__":
from paste.core import aio_pool
async def scrape(order_id: str, order_no: str, master_id: Union[str, int],
tenant_id: Union[str, int], dept_id: Union[str, int], area_code: str,
sort: str = ""):
task_request = await get_task_request(order_id, order_no, master_id, tenant_id, dept_id, area_code, sort)
setattr(task_request, 'order_id', order_id)
setattr(task_request, 'order_no', order_no)
setattr(task_request, 'master_id', master_id)
setattr(task_request, 'tenant_id', tenant_id)
request_queue = asyncio.Queue()
await request_queue.put(task_request)
await requests.async_concurrency(
request_queue, retry=dock.MAX_RETRY_COUNT,
after_request=after_task_request
)
_runner = aio_pool.get_aio_runner()
_runner(scrape(
'DH050826052517663', 'DH050826052517663*3', '2058851271599333378',
'1773611023340371969', '1700467981117980074', '320500',
))