Files
2026-06-02 17:46:38 +08:00

107 lines
4.3 KiB
Python

"""
数据抓取模块。
"""
import asyncio
from typing import Optional, Union
from sqlalchemy import select, desc
import dock
from dock.govs import govs_scrape_order_master, govs_scrape_order_detail, govs_scrape_order_process
from models.govs_order_master import GovsOrderMaster
from paste.core.logging import echo_log
from paste.web import requests
async def fetch_govs_task(dept_page_tag: int = 1, num_per_page: int = 60, task_id: Optional[Union[str, int]] = None):
"""
抓取待办数据及其明细数据。
:param num_per_page: 读取多少任务进行明细抓取
:param dept_page_tag: 0代表全部工单,1代表待签收工单,2代表待交办工单
:param task_id: 可选的指定的工单id
"""
echo_log(f"开始抓取待办数据...")
task_request = await govs_scrape_order_master.get_task_request(
dept_page_tag=dept_page_tag, num_per_page=num_per_page
)
request_queue = asyncio.Queue()
await request_queue.put(task_request)
await requests.async_concurrency(
request_queue, retry=dock.MAX_RETRY_COUNT,
after_request=govs_scrape_order_master.after_task_request
)
echo_log(f"待办数据抓取完成...")
# 读取任务数据,以便能对最新数据抓取详细数据
query = select(
GovsOrderMaster.id, GovsOrderMaster.order_id, GovsOrderMaster.order_no, GovsOrderMaster.tenant_id,
GovsOrderMaster.master_id, GovsOrderMaster.area_code
).order_by(
desc(GovsOrderMaster.id)
)
# 如果dept_page_tag=1,只抓取待签收的,如果dept_page_tag不是0或者1,只抓取已签收的,针对性抓取特定状态的工单数据
if dept_page_tag == 1:
query = query.where(GovsOrderMaster.govs_sign == 0)
elif dept_page_tag != 0:
query = query.where(GovsOrderMaster.govs_sign == 1)
if task_id:
if isinstance(task_id, list):
query = query.where(GovsOrderMaster.id.in_(task_id))
echo_log(f"开始抓取待办列表:{task_id} 的详细数据...")
else:
query = query.where(GovsOrderMaster.id == task_id)
echo_log(f"开始抓取待办:{task_id} 的详细数据...")
else:
echo_log(f"开始抓取前 {num_per_page} 条待办的详细数据...")
query = query.limit(num_per_page)
task_df = await GovsOrderMaster.query_as_df(query)
# 构建请求队列
detail_queue = asyncio.Queue()
process_queue = asyncio.Queue()
# 向队列中填充请求对象
echo_log(f"正在准备请求队列...")
for _h, _row in task_df.iterrows():
order_id = _row.get(GovsOrderMaster.order_id.key)
order_no = _row.get(GovsOrderMaster.order_no.key)
tenant_id = int(_row.get(GovsOrderMaster.tenant_id.key))
master_id = int(_row.get(GovsOrderMaster.master_id.key))
area_code = _row.get(GovsOrderMaster.area_code.key)
_detail_request = await govs_scrape_order_detail.get_task_request(order_id, master_id, tenant_id)
setattr(_detail_request, 'order_id', order_id)
setattr(_detail_request, 'order_no', order_no)
setattr(_detail_request, 'master_id', master_id)
setattr(_detail_request, 'tenant_id', tenant_id)
await detail_queue.put(_detail_request)
_process_request = await govs_scrape_order_process.get_task_request(
order_id, order_no, master_id, tenant_id, '1700467981117980074', area_code
)
setattr(_process_request, 'order_id', order_id)
setattr(_process_request, 'order_no', order_no)
setattr(_process_request, 'master_id', master_id)
setattr(_process_request, 'tenant_id', tenant_id)
await process_queue.put(_process_request)
echo_log(f"抓取待办详细数据...")
tasks = [
requests.async_concurrency(
detail_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govs_scrape_order_detail.after_task_request
),
requests.async_concurrency(
process_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govs_scrape_order_process.after_task_request
)
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
from paste.core import aio_pool
_runner = aio_pool.get_aio_runner()
_runner(fetch_govs_task(dept_page_tag=1, num_per_page=50))