Files
2026-06-02 17:46:38 +08:00

205 lines
7.8 KiB
Python

"""
待办工单附件文件上传。
对应文档接口:3、文件上传接口
"""
import asyncio
import hashlib
import io
import json
from typing import Union, Optional
from urllib.parse import urlparse
import pandas as pd
from sqlalchemy import select, desc, exists
from tornado.httpclient import HTTPResponse, HTTPRequest
import apps
import dock
from dock.oa import oa_api_request
from models.dcm_push_status import DcmPushStatus
from models.dcm_task import DcmTask
from models.dcm_task_attachment import DcmTaskAttachment
from models.dcm_task_file_upload import DcmTaskFileUpload
from paste.core.logging import echo_log
from paste.util import udict
from paste.web import requests
file_url_column_name = '_file_url'
"""
文件路径列名。
"""
async def done_attachment_download(response_list: list[HTTPResponse]):
"""
下载全部完成后处理程序。
:param response_list: 响应列表
:return:
"""
echo_log(f"文件下载全部完成...")
async def after_attachment_upload(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
"""
附件上传请求后处理程序。
:param response: 响应对象
:param retry_queue: 重试队列
"""
task_file_upload_id = getattr(response.request, 'dcm_task_file_upload_id')
dcm_task_id = getattr(response.request, 'dcm_task_id')
body = response.body.decode()
echo_log(body)
body_data = json.loads(body)
nas = udict.get_by_path(body_data, 'n_a_s')
if nas:
oa_media_id = body_data.get('atts', [])[0].get('fileUrl')
file_upload: DcmTaskFileUpload = await DcmTaskFileUpload.async_find_by_id(task_file_upload_id)
file_upload.oa_media_id = oa_media_id
file_upload.status = 1
await file_upload.async_save()
dcm_task_id = getattr(response.request, "dcm_task_id")
await DcmPushStatus.set_push_task_file_upload_status(dcm_task_id)
echo_log(f"待办:{dcm_task_id} 的附件上传成功.")
else:
echo_log(f"待办:{dcm_task_id} 的附件上传失败.")
async def after_attachment_download(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
"""
附件下载请求后处理程序。
:param response: 请求对象
:param retry_queue: 重试队列
:return:
"""
dcm_task_id = getattr(response.request, 'dcm_task_id')
dcm_task_attachment_id = getattr(response.request, 'dcm_task_attachment_id')
dcm_media_id = getattr(response.request, 'dcm_media_id')
# 计算文件哈希
file_content = response.body
file_hash = hashlib.md5(file_content).hexdigest()
echo_log(f"待办:{dcm_task_id} 的附件:{dcm_task_attachment_id} 已经下载完成,附件 HASH 值:{file_hash}.")
echo_log(f"附件下载重试队列中有:{retry_queue.qsize()} 项在等待.")
# 保存文件上传记录数据
file_upload_data = {
DcmTaskFileUpload.dcm_task_id.key: dcm_task_id,
DcmTaskFileUpload.dcm_task_attachment_id.key: dcm_task_attachment_id,
DcmTaskFileUpload.dcm_media_id.key: dcm_media_id,
DcmTaskFileUpload.file_hash.key: file_hash,
}
file_upload = await DcmTaskFileUpload.is_exist(dcm_task_id, dcm_task_attachment_id)
if file_upload:
file_upload.copy_from_dict(file_upload_data)
else:
file_upload = DcmTaskFileUpload(**file_upload_data)
await file_upload.async_save()
file_obj = io.BytesIO(response.body)
file_name = urlparse(response.request.url).path.split('/')[-1]
upload_request = await oa_api_request.get_upload_request(file_obj, file_name=file_name)
setattr(upload_request, 'dcm_task_file_upload_id', file_upload.id)
setattr(upload_request, "dcm_task_id", dcm_task_id)
upload_queue = asyncio.Queue()
await upload_queue.put(upload_request)
echo_log(f"开始推送待办:{dcm_task_id} 的附件数据...")
await requests.async_concurrency(
upload_queue, con_count=1, retry=dock.MAX_RETRY_COUNT,
after_request=after_attachment_upload
)
async def upload_with_attachment(fetch_size: int = 50,
task_id: Optional[Union[str, int, list[Union[str, int]]]] = None):
"""
推送附件数据。
:param fetch_size: 本次推送数量
:param task_id: 待办任务 ID 可选
:return:
"""
from dock.dcm import dcm_api
# 根据条件获取目标任务 ID 列表(支持指定 task_id 或分页获取)
task_query = select(DcmTask.id).order_by(desc(DcmTask.act_id))
if task_id:
if isinstance(task_id, list):
task_query = task_query.where(DcmTask.id.in_(task_id))
echo_log(f"本次上传待办列表:{task_id} 的附件...")
else:
task_query = task_query.where(DcmTask.id == task_id)
echo_log(f"本次上传待办:{task_id} 的附件...")
else:
task_query = task_query.limit(fetch_size)
echo_log(f"本次上传前 {fetch_size} 条附件...")
task_id_list = (await DcmTask.orm_execute_scalars(task_query)).all()
task_id_list = [f"{id}" for id in task_id_list]
# 查询属于这些任务的所有附件信息
query = select(
DcmTaskAttachment.id, DcmTaskAttachment.dcm_task_id,
DcmTaskAttachment.media_id, DcmTaskAttachment.media_url
).where(
DcmTaskAttachment.dcm_task_id.in_(task_id_list)
)
# 生产环境过滤掉已成功上传的附件(避免重复上传)
if apps.get_active_env() == 'prod':
# 生产环境,不上传已经成功上传的附件文件
query = query.where(~exists().where(
(DcmTaskFileUpload.dcm_task_attachment_id == DcmTaskAttachment.id) &
(DcmTaskFileUpload.status == 1)
))
else:
echo_log(f"非生产环境,上传所有附件.")
attachment_df = await DcmTaskAttachment.query_as_df(query)
# 格式化为字符串
attachment_df[DcmTaskAttachment.id.key] = attachment_df[DcmTaskAttachment.id.key].astype(str)
attachment_df[DcmTaskAttachment.dcm_task_id.key] = attachment_df[DcmTaskAttachment.dcm_task_id.key].astype(str)
# 增加文件下载路径列
attachment_df[file_url_column_name] = attachment_df.apply(
lambda x: f"/{x.get(DcmTaskAttachment.media_url.key).lstrip('/')}"
if pd.notna(x.media_url) and str(x.media_url).strip() else "",
axis=1
)
# 处理无待办工单上传状态
empty_task_ids = set(task_id_list) - set(attachment_df[DcmTaskAttachment.dcm_task_id.key].unique().tolist())
empty_task_data = [
{
DcmPushStatus.dcm_task_id.key: dcm_task_id,
DcmPushStatus.push_task_file_upload_status.key: 1
}
for dcm_task_id in list(empty_task_ids)
]
empty_task_df = pd.DataFrame(empty_task_data)
await DcmPushStatus.save_batch(empty_task_df)
echo_log(f"正在准备下载队列...")
# 创建下载请求,填充请求队列
attachment_download_queue = asyncio.Queue()
for _h, _row in attachment_df.iterrows():
download_request = await dcm_api.new_api_request(_row.get(file_url_column_name), {}, method='GET')
setattr(download_request, 'dcm_task_id', _row.get(DcmTaskAttachment.dcm_task_id.key))
setattr(download_request, 'dcm_task_attachment_id', _row.get(DcmTaskAttachment.id.key))
setattr(download_request, 'dcm_media_id', _row.get(DcmTaskAttachment.media_id.key))
await attachment_download_queue.put(download_request)
# 提交附件下载请求
await requests.async_concurrency(
attachment_download_queue, con_count=dock.CONCURRENCY_COUNT, retry=dock.MAX_RETRY_COUNT,
after_request=after_attachment_download, after_done=done_attachment_download
)
if __name__ == "__main__":
from paste.core import aio_pool
_runner = aio_pool.get_aio_runner()
_runner(upload_with_attachment(task_id=2059900525595328517))