Files
d3i-szct/dock/dcm/dcm_scrape_operation.py
2026-06-02 17:46:38 +08:00

119 lines
4.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from sqlalchemy import select
from tornado.httpclient import HTTPResponse, HTTPRequest
import dock
from dock.dcm import dcm_api
from models.dcm_task import DcmTask
from paste.core.logging import echo_log
from paste.util import udict
from paste.web import requests
async def get_operation_request(
rec_id: int, act_id: int, task_num: str, other_task_num: str,
task_list_id: int = 600058,
task_list_name: str = "部门待办栏"
):
"""
获取 DCM 企业待办的可用菜单列表。
向 DCM 的可用菜单列表接口发送 GET 请求,获取与指定关系 ID 和类型 ID 关联的可用菜单信息。
自动注入有效的 Cookie(如 JSESSIONID)至请求头,并解析返回的 JSON 数据。
Args:
rec_id (int): 记录 ID
act_id (int): 任务 ID
task_num (str): 任务号
other_task_num (str): 第三方任务号
task_list_id: 任务列表类型 ID,默认为企业待办:600058
task_list_name: 任务列表名称,默认为:部门待办栏
"""
api_url = f"/home/workflow/assign"
request_body = {
"recID": rec_id,
"actID": act_id,
"tasknum": task_num,
"otherTaskNum": other_task_num,
"taskListID": task_list_id,
"taskListName": task_list_name,
"recDispNum": "",
"menuName": "assign",
"menuDisplayName": "办理",
}
# 构造 API 请求
return await dcm_api.new_api_request(api_url, request_body, 'GET', timeout=30)
async def after_operation_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
response_body = response.body.decode()
response_data = json.loads(response_body)
menus: list[dict] = udict.get_by_path(response_data, 'resultInfo.data.menuData.menus')
operation: list[str] = []
if menus is None:
menus = []
for menu in menus:
if menu.get('menuName') != 'unassign' and menu.get('visible', False) and menu.get('displayProperty', 0) & 2 == 2:
# 排除撤销办理,和不显示的菜单,仅返回可用的操作菜单
operation.append(menu.get('displayName'))
setattr(response.request, "response_data", response_data)
setattr(response.request, "operation", response_data)
dcm_task_id = getattr(response.request, 'dcm_task_id')
dcm_task = await DcmTask(id=dcm_task_id).async_find_first()
dcm_task.operation = ','.join(operation)
await dcm_task.async_save()
rec_id = getattr(response.request, 'rec_id')
echo_log(f"成功更新企业待办 {rec_id} 的可用操作:{dcm_task.operation}.")
if retry_queue:
echo_log(f"企业待办可用操作重试队列中有:{retry_queue.qsize()} 个请求在等待.")
async def fetch_operation(dcm_task: DcmTask):
"""
抓取列表菜单可用操作。
:param dcm_task: 数字城管待办工单
:return:
"""
# 取得请求对象
operation_request = await get_operation_request(
dcm_task.rec_id, dcm_task.act_id, dcm_task.task_num, dcm_task.other_task_num
)
setattr(operation_request, "dcm_task_id", dcm_task.id)
request_queue = asyncio.Queue()
await request_queue.put(operation_request)
await requests.async_concurrency(
request_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_operation_request
)
operation = getattr(operation_request, "operation")
return operation
if __name__ == "__main__":
from paste.core import aio_pool
async def test(dcm_task_id):
dcm_task = await DcmTask(id=dcm_task_id).async_find_first()
assert dcm_task, f"未找到待办工单,工单ID{dcm_task_id}"
await fetch_operation(dcm_task)
async def test_all():
dcm_task_list: list[DcmTask] = await DcmTask.query_all(select(DcmTask))
queue = asyncio.Queue()
for dcm_task in dcm_task_list:
request = await get_operation_request(
dcm_task.rec_id, dcm_task.act_id, dcm_task.task_num, dcm_task.other_task_num
)
setattr(request, "dcm_task_id", dcm_task.id)
setattr(request, "rec_id", dcm_task.rec_id)
await queue.put(request)
await requests.async_concurrency(queue, after_request=after_operation_request)
_runner = aio_pool.get_aio_runner()
# _runner(test(2054174091304374276))
_runner(test_all())