Files
d3i-szct/models/dcm_task_file_upload.py
T
2026-06-02 17:46:38 +08:00

462 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
from typing import Union, Optional, Callable
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, IntegerField
from wtforms.validators import Length
import models
from models.common_model import CommonModel
from models.db_models import TD3iDcmTaskFileUpload
from paste.core.logging import echo_log
from paste.util.pagination import Pagination
from paste.web.form import ModelForm
class DcmTaskFileUploadForm(ModelForm):
"""
文件上传关联表单验证类(完全映射 TD3iDcmTaskFileUpload 字段)。
用于验证和处理文件上传关联数据的表单。
字段完全映射数据库表 t_d3i_dcm_task_file_upload 的字段结构。
"""
# 关联信息
dcm_task_id = IntegerField('任务ID', validators=[Length(min=1, message='任务ID不能为空')])
dcm_task_attachment_id = IntegerField('附件ID', validators=[Length(min=1, message='附件ID不能为空')])
dcm_media_id = IntegerField('数字城管附件ID', validators=[Length(min=1, message='数字城管附件ID不能为空')])
oa_media_id = IntegerField('OA附件ID', validators=[Length(min=1, message='OA附件ID不能为空')])
# 文件信息
file_hash = StringField('文件哈希值', validators=[Length(max=256, message='文件哈希值长度不能超过256字符')])
# 状态与时间
status = IntegerField('上传状态', validators=[Length(min=0, max=1, message='状态只能是0或1')])
created_at = IntegerField('创建时间戳(毫秒)', validators=[Length(min=1, message='创建时间不能为空')])
created_by = StringField('创建者', validators=[Length(max=64, message='创建者长度不能超过64字符')])
def process(self, formdata=None, obj=None, **kwargs):
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class DcmTaskFileUploadBase(TD3iDcmTaskFileUpload, CommonModel):
"""
文件上传关联基础类(完全映射 TD3iDcmTaskFileUpload 字段)。
封装所有与文件上传关联相关的通用操作方法。
"""
FieldMapping = {
'dcm_task_id': 'dcmTaskId',
'dcm_task_attachment_id': 'dcmTaskAttachmentId',
'dcm_media_id': 'dcmMediaId',
'oa_media_id': 'oaMediaId',
'file_hash': 'fileHash',
'status': 'status',
'created_at': 'createdAt',
'created_by': 'createdBy',
}
"""
文件上传关联数据映射
"""
@classmethod
async def is_exist(cls, dcm_task_id: Union[str, int], dcm_task_attachment_id: Union[str, int]):
"""
检查是否已存在相同任务ID和附件ID的记录。
"""
_query = select(cls).where(
cls.dcm_task_id == dcm_task_id, cls.dcm_task_attachment_id == dcm_task_attachment_id
)
_record: cls = await cls.query_first(_query)
return _record
@classmethod
async def find_by_ids(cls, ids: list[Union[str, int]]):
"""
根据ID列表批量查找记录。
"""
_query = select(cls).where(cls.id.in_(ids))
_records: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _records
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""
按参数搜索文件上传记录的基础方法。
支持字段:
- dcm_task_id, dcm_task_attachment_id, dcm_media_id, oa_media_id, status
:param is_paging: 是否分页
:param kwargs: 查询参数
:key int page_number: 页码(缺省随机1~100
:key int page_size: 每页数量(缺省20
:key dict sort_clause: 排序配置,如 {'created_at': 'asc'}
:key int dcm_task_id: 精确匹配任务ID
:key int dcm_task_attachment_id: 精确匹配附件ID
:key int dcm_media_id: 精确匹配数字城管附件ID
:key int oa_media_id: 精确匹配OA附件ID
:key int status: 精确匹配上传状态
:return: (DataFrame, Pagination)
"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
_query = select(cls).where(
*cls.search_wheres(**kwargs)
)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.created_at)
_df = await cls.query_as_df(_data_query)
if not _df.empty:
_df[cls.id.key] = _df[cls.id.key].astype(str)
_df[cls.dcm_task_id.key] = _df[cls.dcm_task_id.key].astype(str)
_df[cls.dcm_task_attachment_id.key] = _df[cls.dcm_task_attachment_id.key].astype(str)
_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
return _df, _paging
@classmethod
async def search_by_attachment_ids(cls, attachment_ids: list[Union[str, int]]):
query = select(
cls.dcm_task_attachment_id, cls.oa_media_id, cls.updated_at
).where(
cls.dcm_task_attachment_id.in_(attachment_ids)
)
return await cls.query_as_df(query)
@classmethod
async def search(cls, **kwargs):
"""
按参数搜索文件上传记录,返回分页格式数据。
"""
_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count if _paging else len(_df),
'rows': _df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number if _paging else 1,
'page_count': _paging.page_count if _paging else 1,
'page_size': _paging.page_size if _paging else 20,
},
}
@classmethod
async def exists_relation(cls, data_df: pd.DataFrame):
"""
查找 data_df 中在数据库中已存在和不存在的记录。根据 dcm_task_id + dcm_task_attachment_id 判断。
:param data_df: 输入的数据框架,必须包含 dcm_task_id 和 dcm_task_attachment_id 列
:return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame)
"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
pairs = data_df[[cls.dcm_task_id.key, cls.dcm_task_attachment_id.key]].drop_duplicates().values.tolist()
if not pairs:
return pd.DataFrame(), data_df.copy()
_query = select(cls.id, cls.dcm_task_id, cls.dcm_task_attachment_id).where(
(cls.dcm_task_id.in_([p[0] for p in pairs])) &
(cls.dcm_task_attachment_id.in_([p[1] for p in pairs]))
)
exists_df = await cls.query_as_df(_query)
if exists_df.empty:
return pd.DataFrame(), data_df.copy()
key_to_id_map = dict(zip(zip(exists_df[cls.dcm_task_id.key], exists_df[cls.dcm_task_attachment_id.key]), exists_df[cls.id.key]))
mask_exists = data_df.apply(lambda row: (row[cls.dcm_task_id.key], row[cls.dcm_task_attachment_id.key]) in key_to_id_map, axis=1)
exists_df = data_df[mask_exists].copy()
exists_df[cls.id.key] = exists_df.apply(lambda row: key_to_id_map[(row[cls.dcm_task_id.key], row[cls.dcm_task_attachment_id.key])], axis=1)
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@classmethod
async def fill_file_upload(cls, data_df: pd.DataFrame, index_field: str = 'dcm_task_attachment_id',
column_name: str = 'file_uploads',
preprocessing: Optional[Callable] = None):
"""
填充文件上传数据到数据框架。
:param pandas.DataFrame data_df: 待填充的数据框架
:param str index_field: 索引字段,一般是任务ID
:param str column_name: 填充时新增列名称,默认为`file_uploads`
:param preprocessing: 预处理函数
:return: 填充后的数据框架
"""
if data_df.empty:
return pd.DataFrame()
_task_attachment_ids = list(set(data_df[index_field].unique().tolist()))
if not _task_attachment_ids:
return pd.DataFrame()
_query = select(cls).where(cls.dcm_task_attachment_id.in_(_task_attachment_ids))
_upload_df: pd.DataFrame = await cls.query_as_df(_query)
if not _upload_df.empty:
_upload_df.replace(models.EmptyInDF+models.EmptyDatetimeInDF, '', inplace=True)
_upload_df[cls.id.key] = _upload_df[cls.id.key].astype(str)
_upload_df[cls.dcm_task_id.key] = _upload_df[cls.dcm_task_id.key].astype(str)
_upload_df[cls.dcm_task_attachment_id.key] = _upload_df[cls.dcm_task_attachment_id.key].astype(str)
_upload_df['index_id'] = _upload_df[cls.id.key]
_upload_df.set_index(['index_id'], inplace=True)
if isinstance(preprocessing, Callable):
_upload_df = preprocessing(_upload_df)
data_df[column_name] = data_df[index_field].apply(
lambda x: next(iter(_upload_df.query(f"{cls.dcm_task_attachment_id.key}=='{x}'").to_dict('records')), {})
)
else:
data_df[column_name] = [[] for _ in range(len(data_df))]
return _upload_df
@register_swagger_model
class DcmTaskFileUpload(DcmTaskFileUploadBase):
"""
文件上传关联业务模型类(主业务类,完全继承 TD3iDcmTaskFileUpload 字段)。
---
description: 文件上传关联表
type: object
properties:
id:
description: 主键ID
type: integer
example: 1001
readOnly: true
dcm_task_id:
description: 任务唯一标志
type: integer
example: 2001
dcm_task_attachment_id:
description: 附件ID(数字城管附件)
type: integer
example: 3001
dcm_media_id:
description: 附件ID(数字城管媒体)
type: integer
example: 3002
oa_media_id:
description: 附件IDOA媒体)
type: integer
example: 3003
file_hash:
description: 文件哈希值
type: string
example: "a1b2c3d4e5..."
maxLength: 256
status:
description: 上传状态(0:未上传/失败,1:成功)
type: integer
example: 1
created_at:
description: 创建时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-15 10:30:00"
readOnly: true
created_by:
description: 创建者用户名
type: string
example: "D3I"
readOnly: true
updated_at:
description: 修改时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-16 14:25:00"
readOnly: true
updated_by:
description: 修改者用户名
type: string
example: "editor"
readOnly: true
"""
@classmethod
async def create(cls, **kwargs):
"""
创建新的文件上传关联记录。
业务流程:
1. 使用 DcmTaskFileUploadForm 验证表单数据
2. 检查是否已存在相同任务ID和附件ID的记录
3. 创建新对象,设置创建者
4. 保存到数据库
5. 返回创建对象
:param kwargs: 附件参数字典
:return: 新建的文件上传记录
:raises AssertionError: 当记录已存在时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = DcmTaskFileUploadForm(formdata=kwargs)
_form.validate_form()
# 检查是否已存在
_exist = await cls.is_exist(_form.dcm_task_id.data, _form.dcm_task_attachment_id.data)
assert _exist is None, "相同任务ID和附件ID的文件上传记录已存在,不能重复创建。"
_record = cls().copy_from_dict(_form.data, skip_none=True).before_save()
await _record.async_save()
return _record
@classmethod
async def delete(cls, id: Union[str, int]):
"""
软删除(不推荐物理删除,但可设 status=-1 或删除记录)
本系统暂不支持软删除,建议直接物理删除。
"""
_record: cls = await cls.async_find_by_id(id)
assert _record, f"根据 ID {id} 未找到文件上传记录。"
_del_query = delete(cls).where(cls.id == _record.id)
_del_count = (await cls.raw_execute(_del_query)).rowcount
echo_log(f'已删除文件上传记录(ID{_record.id}.')
return _record
@classmethod
async def modify(cls, id: Union[str, int], **kwargs):
"""
修改文件上传记录。
注意:仅允许修改 status、created_by 等非核心关联字段。
核心字段(dcm_task_id, dcm_task_attachment_id, dcm_media_id, oa_media_id)不允许修改。
:param id: 记录ID
:param kwargs: 更新字段
:return: 更新后的记录
:raises AssertionError: 当记录不存在或字段被非法修改时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = DcmTaskFileUploadForm(formdata=kwargs)
_form.validate_form()
# 核心字段禁止修改
protected_fields = {'dcm_task_id', 'dcm_task_attachment_id', 'dcm_media_id', 'oa_media_id'}
disallowed_fields = set(kwargs.keys()) & protected_fields
assert not disallowed_fields, f"禁止修改核心字段: {disallowed_fields}"
_record: cls = await cls.async_find_by_id(id)
assert _record, f'查无此文件上传记录(ID{id})。'
# 允许更新的字段
allowed_fields = {'status', 'created_by'}
update_data = {k: v for k, v in _form.data.items() if k in allowed_fields and v is not None}
_record.copy_from_dict(update_data, skip_none=True).before_save()
await _record.async_save()
return _record
@classmethod
async def create_batch(cls, data_df: pd.DataFrame):
"""
批量创建文件上传记录(传入数据应为全新记录)。
:param data_df: 包含字段 dcm_task_id, dcm_task_attachment_id, dcm_media_id, oa_media_id, file_hash, status, created_at, created_by 的 DataFrame
:return: 成功创建的数量
"""
if data_df.empty:
return 0
records = data_df.to_dict('records')
records = [cls().copy_from_dict(r, skip_none=True).before_save() for r in records]
session = cls.get_aio_session()
try:
session.add_all(records)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(records)} 条文件上传记录。")
return len(records)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame):
"""
批量修改文件上传记录。
:param data_df: 必须包含 id 列的 DataFrame
:return: 成功更新的数量
"""
if data_df.empty:
return 0
if 'id' not in data_df.columns:
echo_log("错误:modify_batch 要求输入数据必须包含 'id' 列(主键)")
return 0
update_data = data_df.to_dict('records')
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条文件上传记录。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame):
"""
批量保存数据,自动处理新建和更新。
:param data_df: 要保存的数据框架,必须包含 dcm_task_id 和 dcm_task_attachment_id
:return: (新建数量, 更新数量)
"""
_exists_df, _latest_df = await DcmTaskFileUpload.exists_relation(data_df)
_created_count = await DcmTaskFileUpload.create_batch(_latest_df)
_updated_count = await DcmTaskFileUpload.modify_batch(_exists_df)
return _created_count, _updated_count