Files
d3i-szct/dock/govs/govs_scrape_order_detail.py
T
2026-06-02 17:46:38 +08:00

160 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from typing import Union
import pandas as pd
from dateutil import parser
from tornado.httpclient import HTTPResponse, HTTPRequest
import dock
import models
from dock.govs import govs_api
from models.govs_order_attachment import GovsOrderAttachment
from models.govs_order_detail import GovsOrderDetail
from models.govs_order_user import GovsOrderUser
from paste.core.logging import echo_log
from paste.util import udict
from paste.web import requests
async def get_task_request(order_id: str, master_id: Union[str, int], tenant_id: Union[str, int]):
"""
获取省12345任务详情数据。
通过 POST 请求向省12345的任务详情接口提交表单数据,获取任务详情数据。
自动注入有效的 Cookie(如 JSESSIONID)至请求头,并解析返回的 JSON 响应。
Args:
order_id (str): 待办任务ID
master_id (int): 关联订单主表ID
tenant_id (int): 租户ID
"""
api_url = f"/orderreceive/orderMaster/queryOrderDetail"
request_body = {
"orderId": order_id,
"masterId": master_id,
"tenantId": tenant_id
}
# 构造 API 请求
return await govs_api.new_api_request(api_url, request_body)
async def after_task_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
"""
任务请求响应后的处理程序。
:param response: 响应对象
:param retry_queue: 重试队列
"""
order_id = getattr(response.request, 'order_id')
order_no = getattr(response.request, 'order_no')
master_id = getattr(response.request, 'master_id')
tenant_id = getattr(response.request, 'tenant_id')
response_body = response.body.decode()
response_data = json.loads(response_body)
order_detail_data = udict.get_by_path(response_data, 'result')
mapped_df = pd.DataFrame([order_detail_data])
# 更换映射方向,用于将源数据列名改为与数据库表对应
forward_mapping = {dict_f: table_f for table_f, dict_f in GovsOrderDetail.FieldMapping.items()}
mapped_df = mapped_df.rename(columns=forward_mapping)
# 把数组和字典转换为json字符串
mapped_df[GovsOrderDetail.order_custom_form_fields.key] = mapped_df[
GovsOrderDetail.order_custom_form_fields.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderDetail.order_phone_dto.key] = mapped_df[GovsOrderDetail.order_phone_dto.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderDetail.order_user.key] = mapped_df[GovsOrderDetail.order_user.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderDetail.order_attachment_list.key] = mapped_df[GovsOrderDetail.order_attachment_list.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderDetail.pre_process_list.key] = mapped_df[GovsOrderDetail.pre_process_list.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderDetail.tripartite_call_records_list.key] = mapped_df[
GovsOrderDetail.tripartite_call_records.key].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else None
)
mapped_df[GovsOrderDetail.plan_finish_time.key] = mapped_df[GovsOrderDetail.plan_finish_time.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
mapped_df[GovsOrderDetail.order_finish_time.key] = mapped_df[GovsOrderDetail.order_finish_time.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
mapped_df[GovsOrderDetail.plan_sign_time.key] = mapped_df[GovsOrderDetail.plan_sign_time.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
# 这里把空数据都换成 None,以便存入数据库时是 null
mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, None, inplace=True)
_created, _updated = await GovsOrderDetail.save_batch(mapped_df)
# 存储用户信息
user_data = udict.get_by_path(response_data, 'result.orderUser')
if user_data:
user_df = pd.DataFrame([user_data])
# 更换映射方向,用于将源数据列名改为与数据库表对应
forward_mapping = {dict_f: table_f for table_f, dict_f in GovsOrderUser.FieldMapping.items()}
user_mapped_df = user_df.rename(columns=forward_mapping)
# 比较字段转字符串
user_mapped_df[GovsOrderUser.id.key] = user_mapped_df[GovsOrderUser.id.key].astype(str)
user_mapped_df[GovsOrderUser.master_id.key] = user_mapped_df[GovsOrderUser.master_id.key].astype(str)
# 转换日期时间
user_mapped_df[GovsOrderUser.created_at.key] = user_mapped_df[GovsOrderUser.created_at.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
user_mapped_df[GovsOrderUser.updated_at.key] = user_mapped_df[GovsOrderUser.updated_at.key].apply(
lambda x: parser.parse(x).strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, str) and x.strip() else None
)
# 这里把空数据都换成 None,以便存入数据库时是 null
user_mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, None, inplace=True)
await GovsOrderUser.save_batch(user_mapped_df)
# 存储附件信息
attachment_list = udict.get_by_path(response_data, 'result.orderAttachmentList')
if attachment_list:
attachment_df = pd.DataFrame(attachment_list)
# 更换映射方向,用于将源数据列名改为与数据库表对应
forward_mapping = {dict_f: table_f for table_f, dict_f in GovsOrderAttachment.FieldMapping.items()}
attachment_mapped_df = attachment_df.rename(columns=forward_mapping)
attachment_mapped_df[GovsOrderAttachment.master_id.key] = master_id
attachment_mapped_df[GovsOrderAttachment.order_id.key] = order_id
# 比较字段转字符串
attachment_mapped_df[GovsOrderAttachment.id.key] = attachment_mapped_df[GovsOrderAttachment.id.key].astype(str)
attachment_mapped_df[GovsOrderAttachment.master_id.key] = attachment_mapped_df[GovsOrderAttachment.master_id.key].astype(str)
# 这里把空数据都换成 None,以便存入数据库时是 null
attachment_mapped_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, None, inplace=True)
await GovsOrderAttachment.save_batch(attachment_mapped_df)
# 输出数据创建状态
echo_log(f"成功创建租户:{tenant_id} 的待办工单:{master_id}({order_id}{order_no}) 详情.")
if retry_queue:
echo_log(f"待办工单详情重试队列中有:{retry_queue.qsize()} 个请求在等待.")
if __name__ == "__main__":
from paste.core import aio_pool
async def scrape(order_id: Union[str, int], master_id: Union[str, int], tenant_id: Union[str, int],
order_no: Union[str, int], ):
task_request = await get_task_request(order_id, master_id, tenant_id)
setattr(task_request, 'order_id', order_id)
setattr(task_request, 'master_id', master_id)
setattr(task_request, 'tenant_id', tenant_id)
setattr(task_request, 'order_no', order_no)
request_queue = asyncio.Queue()
await request_queue.put(task_request)
await requests.async_concurrency(
request_queue, retry=dock.MAX_RETRY_COUNT,
after_request=after_task_request
)
_runner = aio_pool.get_aio_runner()
_runner(scrape('DH050826052517663', '2058851271599333378', '1773611023340371969', 'DH050826052517663*3'))