Files
d3i-szct/dock/dcm/dcm_scrape_allow_postpone.py
2026-06-02 17:46:38 +08:00

87 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from tornado.httpclient import HTTPResponse, HTTPRequest
import dock
from dock.dcm import dcm_api
from models.dcm_task import DcmTask
from paste.util import udict
from paste.web import requests
async def get_allow_postpone_request(
rec_id: int, act_id: int, task_num: str, other_task_num: str,
task_list_id: int = 600058,
task_list_name: str = "部门待办栏"
):
"""
获取 DCM 企业待办的是否允许申请延期。
向 DCM 的是否允许申请延期接口发送 GET 请求,获取与指定关系 ID 和类型 ID 关联的可用菜单信息。
自动注入有效的 Cookie(如 JSESSIONID)至请求头,并解析返回的 JSON 数据。
Args:
rec_id (int): 记录 ID
act_id (int): 任务 ID
task_num (str): 任务号
other_task_num (str): 第三方任务号
task_list_id: 任务列表类型 ID,默认为企业待办:600058
task_list_name: 任务列表名称,默认为:部门待办栏
"""
api_url = f"/home/workflow/wfitemtypecheck?itemType=postpone"
request_body = {
"recID": rec_id,
"actID": act_id,
"tasknum": task_num,
"otherTaskNum": other_task_num,
"taskListID": task_list_id,
"taskListName": task_list_name,
"recDispNum": "",
"menuName": "assign",
"menuDisplayName": "办理",
}
# 构造 API 请求
return await dcm_api.new_api_request(api_url, request_body, 'POST', timeout=30)
async def after_allow_postpone_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
response_body = response.body.decode()
response_data = json.loads(response_body)
setattr(response.request, "response_data", response_data)
async def fetch_allow_postpone(dcm_task: DcmTask):
"""
抓取是否允许申请延期。
:param dcm_task: 数字城管待办工单
:return:
"""
# 取得请求对象
list_menu_request = await get_allow_postpone_request(
dcm_task.rec_id, dcm_task.act_id, dcm_task.task_num, dcm_task.other_task_num
)
request_queue = asyncio.Queue()
await request_queue.put(list_menu_request)
await requests.async_concurrency(
request_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_allow_postpone_request
)
response_data = getattr(list_menu_request, "response_data")
success: bool = udict.get_by_path(response_data, 'resultInfo.success')
message: str = udict.get_by_path(response_data, 'resultInfo.message')
return success, message
if __name__ == "__main__":
from paste.core import aio_pool
async def test(dcm_task_id):
dcm_task = await DcmTask(id=dcm_task_id).async_find_first()
assert dcm_task, f"未找到待办工单,工单ID{dcm_task_id}"
await fetch_allow_postpone(dcm_task)
_runner = aio_pool.get_aio_runner()
_runner(test(2054174091287597056))