Files
d3i-szct/dock/govc/govc_scrape.py
T
2026-06-03 09:58:49 +08:00

181 lines
8.8 KiB
Python

"""
数据抓取模块。
"""
import asyncio
from typing import Optional, Union
from sqlalchemy import select, desc
import dock
from dock.govc import govc_scrape_dept_feedback, govc_scrape_return_visit, govc_scrape_finish_info, govc_scrape_detail, \
govc_scrape_process, govc_scrape_requester, govc_scrape_delay_info, govc_scrape_contact_info, \
govc_scrape_order_status, govc_scrape_history_order, govc_scrape_supervision, govc_scrape_order
from models.govc_task import GovcTask
from paste.core.logging import echo_log
from paste.web import requests
async def fetch_govc_task(fetch_size: int = 60, task_id: Optional[Union[str, int]] = None):
"""
抓取待办数据及其明细数据。
:param fetch_size: 读取多少任务进行明细抓取
:param task_id: 可选的指定的工单id
"""
echo_log(f"开始抓取待办数据...")
task_request = await govc_scrape_order.get_task_request(
fetch_size=fetch_size
)
request_queue = asyncio.Queue()
await request_queue.put(task_request)
await requests.async_concurrency(
request_queue, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_order.after_task_request
)
echo_log(f"待办数据抓取完成...")
# 读取任务数据,以便能对最新数据抓取详细数据
query = select(
GovcTask.id, GovcTask.pvi_guid, GovcTask.c_guid
).order_by(
desc(GovcTask.id)
)
if task_id:
if isinstance(task_id, list):
query = query.where(GovcTask.id.in_(task_id))
echo_log(f"开始抓取待办列表:{task_id} 的详细数据...")
else:
query = query.where(GovcTask.id == task_id)
echo_log(f"开始抓取待办:{task_id} 的详细数据...")
else:
echo_log(f"开始抓取前 {fetch_size} 条待办的详细数据...")
query = query.limit(fetch_size)
task_df = await GovcTask.query_as_df(query)
# 构建请求队列
feedback_queue = asyncio.Queue()
finish_info_queue = asyncio.Queue()
contact_info_queue = asyncio.Queue()
delay_info_queue = asyncio.Queue()
detail_queue = asyncio.Queue()
status_queue = asyncio.Queue()
process_queue = asyncio.Queue()
requester_queue = asyncio.Queue()
return_visit_queue = asyncio.Queue()
history_order_queue = asyncio.Queue()
supervision_queue = asyncio.Queue()
# 向队列中填充请求对象
echo_log(f"正在准备请求队列...")
for _h, _row in task_df.iterrows():
_feedback_request = await govc_scrape_dept_feedback.get_feedback_request(_row.get(GovcTask.pvi_guid.key),
_row.get(GovcTask.c_guid.key))
setattr(_feedback_request, 'task_id', _row.get(GovcTask.id.key))
await feedback_queue.put(_feedback_request)
_finish_info_request = await govc_scrape_finish_info.get_finish_info_request(_row.get(GovcTask.pvi_guid.key),
_row.get(GovcTask.c_guid.key))
setattr(_finish_info_request, 'task_id', _row.get(GovcTask.id.key))
await finish_info_queue.put(_finish_info_request)
_contact_info_request = await govc_scrape_contact_info.get_contact_request(_row.get(GovcTask.pvi_guid.key),
_row.get(GovcTask.c_guid.key))
setattr(_contact_info_request, 'task_id', _row.get(GovcTask.id.key))
await contact_info_queue.put(_contact_info_request)
_delay_info_request = await govc_scrape_delay_info.get_delay_request(_row.get(GovcTask.pvi_guid.key),
_row.get(GovcTask.c_guid.key))
setattr(_delay_info_request, 'task_id', _row.get(GovcTask.id.key))
await delay_info_queue.put(_delay_info_request)
_detail_request = await govc_scrape_detail.get_detail_request(_row.get(GovcTask.c_guid.key))
setattr(_detail_request, 'task_id', _row.get(GovcTask.id.key))
await detail_queue.put(_detail_request)
_status_request = await govc_scrape_order_status.get_fetch_status_request(_row.get(GovcTask.pvi_guid.key),
_row.get(GovcTask.c_guid.key))
setattr(_status_request, 'task_id', _row.get(GovcTask.id.key))
await status_queue.put(_status_request)
_process_request = await govc_scrape_process.get_process_request(_row.get(GovcTask.pvi_guid.key),
_row.get(GovcTask.c_guid.key))
setattr(_process_request, 'task_id', _row.get(GovcTask.id.key))
await process_queue.put(_process_request)
_requester_request = await govc_scrape_requester.get_requster_request(_row.get(GovcTask.c_guid.key))
setattr(_requester_request, 'task_id', _row.get(GovcTask.id.key))
await requester_queue.put(_requester_request)
_return_visit_request = await govc_scrape_return_visit.get_return_visit_request(_row.get(GovcTask.pvi_guid.key),
_row.get(GovcTask.c_guid.key))
setattr(_return_visit_request, 'task_id', _row.get(GovcTask.id.key))
await return_visit_queue.put(_return_visit_request)
_history_order_request = await govc_scrape_history_order.get_history_order_request(
_row.get(GovcTask.c_guid.key))
setattr(_history_order_request, 'task_id', _row.get(GovcTask.id.key))
await history_order_queue.put(_history_order_request)
for supervise_type in ('zx', 'bm', 'bmhj'):
_supervision_request = await govc_scrape_supervision.get_supervision_request(
_row.get(GovcTask.pvi_guid.key),
_row.get(GovcTask.c_guid.key),
supervise_type)
setattr(_supervision_request, 'task_id', _row.get(GovcTask.id.key))
setattr(_supervision_request, 'supervise_type', supervise_type)
await supervision_queue.put(_supervision_request)
echo_log(f"抓取待办详细数据...")
tasks = [
requests.async_concurrency(
feedback_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_dept_feedback.after_feedback_request
),
requests.async_concurrency(
finish_info_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_finish_info.after_finish_info_request),
requests.async_concurrency(
contact_info_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_contact_info.after_contact_request
),
requests.async_concurrency(
delay_info_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_delay_info.after_delay_request
),
requests.async_concurrency(
detail_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_detail.after_detail_request
),
requests.async_concurrency(
status_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_order_status.after_fetch_status_request
),
requests.async_concurrency(
process_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_process.after_process_request
),
requests.async_concurrency(
request_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_requester.after_requester_request
),
requests.async_concurrency(
return_visit_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_return_visit.after_return_visit_request
),
requests.async_concurrency(
history_order_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_history_order.after_history_order_request
),
requests.async_concurrency(
supervision_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=govc_scrape_supervision.after_return_visit_request
)
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
from paste.core import aio_pool
_runner = aio_pool.get_aio_runner()
_runner(fetch_govc_task(10))