Files
d3i-szct/dock/dcm/dcm_scrape.py
T
2026-06-02 17:46:38 +08:00

161 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据抓取模块。
"""
import asyncio
import logging
from typing import Optional, Union
from sqlalchemy import select, desc
import dock
from dock.dcm.dcm_scrape_operation import get_operation_request, after_operation_request
from dock.dcm.dcm_scrape_task import get_task_request, after_task_request
from dock.dcm.dcm_scrape_attachment import get_attachment_request, after_attachment_request
from dock.dcm.dcm_scrape_process_info import get_process_info_request, after_process_info_request
from dock.dcm.dcm_scrape_form_data import get_form_data_request, after_form_data_request
from dock.dcm.dcm_scrape_more_info import get_more_info_request, after_more_info_request
from dock.dcm.dcm_scrape_extend_info import get_extend_info_request, after_extend_info_request
from models.dcm_task import DcmTask
from paste.core.logging import echo_log
from paste.web import requests
async def fetch_dcm_task(fetch_size: int = 60, task_id: Optional[Union[str, int]] = None):
"""
抓取待办数据及其明细数据。
:param fetch_size: 读取多少任务进行明细抓取
:param task_id: 待办任务 ID 可选
"""
echo_log(f"开始抓取待办数据...")
task_request = await get_task_request(num_per_page=fetch_size)
request_queue = asyncio.Queue()
await request_queue.put(task_request)
await requests.async_concurrency(request_queue, retry=dock.MAX_RETRY_COUNT, after_request=after_task_request)
echo_log(f"待办数据抓取完成...")
# 读取任务数据,以便能对最新数据抓取详细数据
query = select(
DcmTask.id, DcmTask.rec_id, DcmTask.act_id, DcmTask.task_num, DcmTask.other_task_num,
).order_by(
desc(DcmTask.act_id)
)
if task_id:
if isinstance(task_id, list):
query = query.where(DcmTask.id.in_(task_id))
echo_log(f"开始抓取待办列表:{task_id} 的详细数据...")
else:
query = query.where(DcmTask.id == task_id)
echo_log(f"开始抓取待办:{task_id} 的详细数据...")
else:
echo_log(f"开始抓取前 {fetch_size} 条待办的详细数据...")
query = query.limit(fetch_size)
task_df = await DcmTask.query_as_df(query)
# 构建请求队列
operation_queue = asyncio.Queue()
attachment_queue = asyncio.Queue()
process_info_queue = asyncio.Queue()
form_data_queue = asyncio.Queue()
more_info_queue = asyncio.Queue()
extend_info_queue = asyncio.Queue()
# 向队列中填充请求对象
echo_log(f"正在准备请求队列...")
for _h, _row in task_df.iterrows():
dcm_task_id = _row.get(DcmTask.id.key)
rec_id = int(_row.get(DcmTask.rec_id.key))
act_id = int(_row.get(DcmTask.act_id.key))
task_num = f"{_row.get(DcmTask.task_num.key)}"
other_task_num = f"{_row.get(DcmTask.other_task_num.key)}"
_operation_req = await get_operation_request(rec_id, act_id, task_num, other_task_num)
echo_log(_operation_req.url)
setattr(_operation_req, "dcm_task_id", dcm_task_id)
setattr(_operation_req, "rec_id", rec_id)
await operation_queue.put(_operation_req)
_attachment_req = await get_attachment_request(rec_id)
echo_log(_attachment_req.url)
setattr(_attachment_req, "dcm_task_id", dcm_task_id)
setattr(_attachment_req, "rec_id", rec_id)
await attachment_queue.put(_attachment_req)
_process_info_req = await get_process_info_request(rec_id)
echo_log(_process_info_req.url)
setattr(_process_info_req, "dcm_task_id", dcm_task_id)
setattr(_process_info_req, "rec_id", rec_id)
await process_info_queue.put(_process_info_req)
_form_data_req = await get_form_data_request(rec_id, act_id)
echo_log(_form_data_req.url)
setattr(_form_data_req, "dcm_task_id", dcm_task_id)
setattr(_form_data_req, "rec_id", rec_id)
await form_data_queue.put(_form_data_req)
_more_info_req = await get_more_info_request(rec_id)
echo_log(_more_info_req.url)
setattr(_more_info_req, "dcm_task_id", dcm_task_id)
setattr(_more_info_req, "rec_id", rec_id)
await more_info_queue.put(_more_info_req)
_extend_info_req = await get_extend_info_request(rec_id)
echo_log(_extend_info_req.url)
setattr(_extend_info_req, "dcm_task_id", dcm_task_id)
setattr(_extend_info_req, "rec_id", rec_id)
await extend_info_queue.put(_extend_info_req)
_count = (operation_queue.qsize()+attachment_queue.qsize()+process_info_queue.qsize()
+form_data_queue.qsize()+more_info_queue.qsize()+extend_info_queue.qsize())
echo_log(f"可用操作请求:{operation_queue.qsize()};附件请求:{attachment_queue.qsize()};处理过程请求:{process_info_queue.qsize()}")
echo_log(f"详细数据请求:{form_data_queue.qsize()};更多信息请求:{more_info_queue.qsize()};扩展信息请求:{extend_info_queue.qsize()}")
echo_log(f"共计:{_count} 个.")
echo_log(f"抓取待办详细数据...")
try:
tasks = [
requests.async_concurrency(
operation_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_operation_request
),
requests.async_concurrency(
attachment_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_attachment_request
),
requests.async_concurrency(
process_info_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_process_info_request
),
requests.async_concurrency(
form_data_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_form_data_request
),
requests.async_concurrency(
more_info_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_more_info_request
),
requests.async_concurrency(
extend_info_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_extend_info_request
)
]
await asyncio.gather(*tasks)
except Exception as e:
echo_log(f"抓取任务异常: {e}", level=logging.ERROR, is_log_exc=True)
raise
async def fetch_single_dcm_task(dcm_task: DcmTask):
"""
基于任务抓取指定某条待办数据及其明细数据。
:param dcm_task: 任务号
"""
await fetch_dcm_task(task_id=dcm_task.id)
if __name__ == "__main__":
from paste.core import aio_pool
_runner = aio_pool.get_aio_runner()
_runner(fetch_dcm_task(30))