Files
d3i-szct/models/dcm_task_attachment.py
2026-06-02 17:46:38 +08:00

734 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
from typing import Union, Optional, Callable
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, TextAreaField, IntegerField
from wtforms.validators import Length
import models
from models.common_model import CommonModel
from models.db_models import TD3iDcmTaskAttachment
from paste.core.logging import echo_log
from paste.util.pagination import Pagination
from paste.web.form import ModelForm
class DcmTaskAttachmentForm(ModelForm):
"""
附件表单验证类(完全映射 TD3iDcmTaskAttachment 字段)。
用于验证和处理数字城管-部门待办任务附件的上传/修改表单数据。
字段完全映射数据库表 t_d3i_dcm_task_attachment 的字段结构。
"""
# 关联信息
relation_type_id = IntegerField('关联类型ID')
relation_id = IntegerField('主关联ID')
relation_main_id = IntegerField('主关联ID(可空)')
relation_sub_id = IntegerField('子关联ID(可空)')
# 媒体信息
act_def_name = StringField('流程节点名称', validators=[Length(max=255, message='流程节点名称长度不能超过255字符')])
media_id = IntegerField('媒体唯一ID')
media_path = StringField('服务器存储路径', validators=[Length(max=512, message='存储路径长度不能超过512字符')])
media_type = StringField('媒体类型', validators=[Length(max=50, message='媒体类型长度不能超过50字符')])
media_name = StringField('原始文件名', validators=[Length(max=255, message='原始文件名长度不能超过255字符')])
media_usage = StringField('使用场景', validators=[Length(max=100, message='使用场景长度不能超过100字符')])
media_server_name = StringField('媒体服务器名称', validators=[Length(max=100, message='媒体服务器名称长度不能超过100字符')])
media_property = IntegerField('媒体属性')
media_uploaded_name = StringField('上传时的原始文件名', validators=[Length(max=255, message='上传文件名长度不能超过255字符')])
media_shot = StringField('截图标识或路径', validators=[Length(max=255, message='截图路径长度不能超过255字符')])
media_label_type_id = IntegerField('标签类型ID')
media_url = StringField('内部访问URL', validators=[Length(max=512, message='内部URL长度不能超过512字符')])
media_default_url = StringField('外部可访问URL', validators=[Length(max=512, message='外部URL长度不能超过512字符')])
display_order = IntegerField('显示顺序')
store_type_id = IntegerField('存储类型ID')
# 图像信息
special_item_image_type = StringField('特殊图片类型', validators=[Length(max=100, message='特殊图片类型长度不能超过100字符')])
height = IntegerField('图片高度')
width = IntegerField('图片宽度')
# 上传与状态
send_flag = IntegerField('发送标志')
public_flag = IntegerField('公开标志(0私有,1公开)')
unit_name = StringField('所属单位', validators=[Length(max=255, message='单位名称长度不能超过255字符')])
gen_thumb = IntegerField('是否生成缩略图(0否,1是)')
can_delete = IntegerField('是否可删除(0否,1是)')
# 时间与人员
upload_time = IntegerField('上传时间戳(毫秒)')
create_human_id = IntegerField('创建人ID')
human_name = StringField('创建人姓名', validators=[Length(max=255, message='创建人姓名长度不能超过255字符')])
create_time = IntegerField('创建时间戳(毫秒)')
update_time = IntegerField('更新时间戳(毫秒)')
delete_reason = TextAreaField('删除原因', validators=[Length(max=65535, message='删除原因长度不能超过65535字符')])
delete_flag = IntegerField('删除标记(0未删,1已删)')
delete_human_id = IntegerField('删除人ID')
delete_time = IntegerField('删除时间戳(毫秒)')
def process(self, formdata=None, obj=None, **kwargs):
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class DcmTaskAttachmentBase(TD3iDcmTaskAttachment, CommonModel):
"""
附件基础类(完全映射 TD3iDcmTaskAttachment 字段)。
封装所有与任务附件相关的通用操作方法。
"""
FieldMapping = {
'store_type_id': 'storeTypeID',
'relation_type_id': 'relationTypeID',
'relation_id': 'relationID',
'relation_main_id': 'relationMainID',
'relation_sub_id': 'relationSubID',
'media_type': 'mediaType',
'media_name': 'mediaName',
'media_usage': 'mediaUsage',
'create_time': 'createTime',
'update_time': 'updateTime',
'display_order': 'displayOrder',
'delete_reason': 'deleteReason',
'delete_flag': 'deleteFlag',
'create_human_id': 'createHumanID',
'delete_human_id': 'deleteHumanID',
'delete_time': 'deleteTime',
'media_path': 'mediaPath',
'media_server_name': 'mediaServerName',
'media_property': 'mediaProperty',
'special_item_image_type': 'specialitemImageType',
'media_uploaded_name': 'mediaUploadedName',
'height': 'height',
'width': 'width',
'send_flag': 'sendFlag',
'media_shot': 'mediaShot',
'public_flag': 'publicFlag',
'media_label_type_id': 'mediaLabelTypeID',
'media_url': 'mediaURL',
'media_default_url': 'mediaDefaultURL',
'human_name': 'humanName',
'unit_name': 'unitName',
'act_def_name': 'actDefName',
'upload_time': 'uploadTime',
'gen_thumb': 'genThumb',
'can_delete': 'canDelete',
'media_id': 'mediaID',
}
"""
附件数据映射
"""
@classmethod
async def exist_other(cls, id: Union[str, int], relation_id: Union[str, int], media_id: Union[str, int]):
"""
检查是否存在除当前附件外的其他同关联ID和类型附件。
:param id: 当前附件ID
:param relation_id: 关联主ID
:param media_id: 媒体ID
:return: 存在返回附件对象,不存在返回None
"""
_query = select(cls).where(cls.id != id, cls.relation_id == relation_id, cls.media_id == media_id)
_attachment: cls = await cls.query_first(_query)
return _attachment
@classmethod
async def find_by_ids(cls, ids: list[Union[str, int]]):
"""
根据ID列表批量查找附件数据。
"""
_query = select(cls).where(cls.id.in_(ids))
_attachment_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _attachment_list
@classmethod
async def is_exist(cls, relation_id: Union[str, int], media_id: Union[str, int]):
"""
检查附件是否已经存在(根据关联ID和类型)。
"""
_query = select(cls).where(cls.relation_id == relation_id, cls.media_id == media_id)
_attachment: cls = await cls.query_first(_query)
return _attachment
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""
按参数搜索附件数据的基础方法。
支持字段:
- relation_type_id, relation_id, media_type, unit_name, delete_flag
- 支持模糊匹配:media_name, act_def_name, media_usage
:param is_paging: 是否分页
:param kwargs: 查询参数
:key int page_number: 页码(缺省随机1~100
:key int page_size: 每页数量(缺省20
:key dict sort_clause: 排序配置,如 {'display_order': 'asc'}
:key int relation_type_id: 精确匹配关联类型
:key int relation_id: 精确匹配主关联ID
:key str media_type: 精确匹配媒体类型
:key str unit_name: 精确匹配单位
:key int delete_flag: 精确匹配删除标记
:key str media_name: 模糊匹配原始文件名
:key str act_def_name: 模糊匹配流程节点名称
:key str media_usage: 模糊匹配使用场景
:return: (DataFrame, Pagination)
"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
# 模糊查询字段
_name_likes = {
cls.media_name.key: '%{}%',
cls.act_def_name.key: '%{}%',
cls.media_usage.key: '%{}%',
}
_query = select(cls).where(
*cls.search_wheres(likes=_name_likes, **kwargs)
)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.display_order)
_attachment_df = await cls.query_as_df(_data_query)
if not _attachment_df.empty:
_attachment_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
return _attachment_df, _paging
@classmethod
async def search(cls, **kwargs):
"""
按参数搜索附件数据,返回分页格式数据。
"""
_attachment_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count,
'rows': _attachment_df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number,
'page_count': _paging.page_count,
'page_size': _paging.page_size,
},
}
@classmethod
async def exists_relation(cls, data_df: pd.DataFrame):
"""
查找 data_df 中在数据库中已存在和不存在的记录。根据 relation_id + relation_type_id 判断。
:param data_df: 输入的数据框架,必须包含 relation_id 和 relation_type_id 列
:return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame)
- exists_df: 在数据库中存在的记录
- latest_df: 在数据库中不存在的记录
"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
# 获取待查询的 (relation_id, media_id) 组合
pairs = data_df[[cls.relation_id.key, cls.media_id.key]].drop_duplicates().values.tolist()
if not pairs:
return pd.DataFrame(), data_df.copy()
# 查询数据库中已存在的记录
_query = select(cls.id, cls.relation_id, cls.media_id).where(
(cls.relation_id.in_([p[0] for p in pairs])) &
(cls.media_id.in_([p[1] for p in pairs]))
)
exists_df = await cls.query_as_df(_query)
if exists_df.empty:
return pd.DataFrame(), data_df.copy()
# 构建 (relation_id, media_id) -> id 的映射
key_to_id_map = dict(zip(
zip(exists_df[cls.relation_id.key], exists_df[cls.media_id.key]),
exists_df[cls.id.key])
)
# 根据组合是否在数据库中划分数据
mask_exists = data_df.apply(lambda row: (row[cls.relation_id.key], row[cls.media_id.key]) in key_to_id_map, axis=1)
exists_df = data_df[mask_exists].copy()
exists_df[cls.id.key] = exists_df.apply(lambda row: key_to_id_map[(row[cls.relation_id.key], row[cls.media_id.key])], axis=1)
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@classmethod
async def fill_attachment(cls, data_df: pd.DataFrame, index_field: str = 'id',
column_name: str = 'attachments', is_full: bool = True,
preprocessing: Optional[Callable] = None):
"""
填充附件数据到数据框架。
用于在查询结果中添加关联的附件信息。
:param pandas.DataFrame data_df: 待填充的数据框架
:param str index_field: 索引字段,一般是任务ID
:param str column_name: 填充时,新增加的列名称,默认为`attachments`
:param is_full: 是否填充上传数据
:param preprocessing: 预处理,注意预处理必须要返回处理后的结果
:return: 附件数据框架(已填充)
:rtype: pandas.DataFrame
"""
if data_df.empty:
return pd.DataFrame()
_task_ids = list(set(data_df[index_field].unique().tolist()))
if not _task_ids:
return pd.DataFrame()
_query = select(cls).where(cls.dcm_task_id.in_(_task_ids))
if is_full:
# 默认加入文件上传得到的 OA MediaId 值
from models.dcm_task_file_upload import DcmTaskFileUpload
_query = _query.add_columns(
DcmTaskFileUpload.oa_media_id
).join(
DcmTaskFileUpload, DcmTaskFileUpload.dcm_task_attachment_id == cls.id
)
_atta_df: pd.DataFrame = await cls.query_as_df(_query)
if not _atta_df.empty:
_atta_df.replace(models.EmptyInDF+models.EmptyDatetimeInDF, '', inplace=True)
# 整理输出数据类型
_atta_df[cls.id.key] = _atta_df[cls.id.key].astype(str)
_atta_df[cls.dcm_task_id.key] = _atta_df[cls.dcm_task_id.key].astype(str)
# 设置索引
_atta_df['index_id'] = _atta_df[cls.id.key]
_atta_df.set_index(['index_id'], inplace=True)
# 对数据进行预处理
if isinstance(preprocessing, Callable):
_atta_df = preprocessing(_atta_df)
# 增加数据填充列
data_df[column_name] = data_df[index_field].apply(
lambda x: _atta_df.query(f"{cls.dcm_task_id.key}=='{x}'").to_dict('records')
)
else:
data_df[column_name] = [[] for _ in range(len(data_df))]
return _atta_df
@register_swagger_model
class DcmTaskAttachment(DcmTaskAttachmentBase):
"""
附件业务模型类(主业务类,完全继承 TD3iDcmTaskAttachment 字段)。
---
description: 数字城管-部门待办任务附件
type: object
properties:
id:
description: 主键ID
type: integer
example: 1001
readOnly: true
relation_type_id:
description: 关联类型ID
type: integer
example: 1
relation_id:
description: 主关联ID
type: integer
example: 2001
relation_main_id:
description: 主关联ID(可空)
type: integer
example: 2001
relation_sub_id:
description: 子关联ID(可空)
type: integer
example: 2002
act_def_name:
description: 流程节点名称
type: string
example: "受理"
maxLength: 255
media_id:
description: 媒体唯一ID
type: integer
example: 3001
media_path:
description: 服务器存储路径
type: string
example: "/uploads/2024/05/01/photo.jpg"
maxLength: 512
media_type:
description: 媒体类型(IMAGE, VIDEO等)
type: string
example: "IMAGE"
maxLength: 50
media_name:
description: 原始文件名
type: string
example: "IMG_20240501.jpg"
maxLength: 255
media_usage:
description: 使用场景
type: string
example: "上报"
maxLength: 100
media_server_name:
description: 媒体服务器名称
type: string
example: "oss-1"
maxLength: 100
media_property:
description: 媒体属性
type: integer
example: 1
media_uploaded_name:
description: 上传时的原始文件名
type: string
example: "DSC_001.jpg"
maxLength: 255
media_shot:
description: 截图标识或路径
type: string
example: "/thumbs/photo.jpg"
maxLength: 255
media_label_type_id:
description: 标签类型ID
type: integer
example: 5
media_url:
description: 内部访问URL
type: string
example: "http://internal/oss/123"
maxLength: 512
media_default_url:
description: 外部可访问URL
type: string
example: "https://public.example.com/oss/123"
maxLength: 512
display_order:
description: 显示顺序
type: integer
example: 1
store_type_id:
description: 存储类型ID
type: integer
example: 1
special_item_image_type:
description: 特殊图片类型
type: string
example: "现场照片"
maxLength: 100
height:
description: 图片高度
type: integer
example: 1080
width:
description: 图片宽度
type: integer
example: 1920
send_flag:
description: 发送标志
type: integer
example: 1
public_flag:
description: 公开标志(0私有,1公开)
type: integer
example: 1
unit_name:
description: 所属单位
type: string
example: "市政工程处"
maxLength: 255
gen_thumb:
description: 是否生成缩略图(0否,1是)
type: integer
example: 1
can_delete:
description: 是否可删除(0否,1是)
type: integer
example: 1
upload_time:
description: 上传时间戳(毫秒)
type: integer
example: 1714567890000
create_human_id:
description: 创建人ID
type: integer
example: 101
human_name:
description: 创建人姓名
type: string
example: "张三"
maxLength: 255
create_time:
description: 创建时间戳(毫秒)
type: integer
example: 1714567890000
update_time:
description: 更新时间戳(毫秒)
type: integer
example: 1714567900000
delete_reason:
description: 删除原因
type: string
example: "重复上传"
maxLength: 65535
delete_flag:
description: 删除标记(0未删,1已删)
type: integer
example: 0
delete_human_id:
description: 删除人ID
type: integer
example: 102
delete_time:
description: 删除时间戳(毫秒)
type: integer
example: 1714567910000
created_at:
description: 创建时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-15 10:30:00"
readOnly: true
created_by:
description: 创建者用户名
type: string
example: "admin"
readOnly: true
updated_at:
description: 修改时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-16 14:25:00"
readOnly: true
updated_by:
description: 修改者用户名
type: string
example: "editor"
readOnly: true
"""
@classmethod
async def create(cls, **kwargs):
"""
创建新的附件。
业务流程:
1. 使用 D3iDcmTaskAttachmentForm 验证表单数据完整性
2. 检查是否已存在相同关联ID和类型的附件(避免重复)
3. 创建新附件对象
4. 设置创建者和更新者为当前用户
5. 保存到数据库
6. 返回创建的附件对象
:param kwargs: 附件参数字典
:return: 新建附件对象
:rtype: DcmTaskAttachment
:raises AssertionError: 当附件已存在时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = DcmTaskAttachmentForm(formdata=kwargs)
_form.validate_form()
# 检查是否存在同关联ID和类型的附件(排除自身)
_attachment: cls = await cls.is_exist(_form.relation_id.data, _form.relation_type_id.data)
assert _attachment is None, "相同关联ID和类型的附件已存在,不能重复创建。"
# 创建附件对象
_attachment = cls().copy_from_dict(_form.data, skip_none=True).before_save()
await _attachment.async_save()
return _attachment
@classmethod
async def delete(cls, attachment_id: Union[str, int]):
"""
删除附件(软删除,设置 delete_flag=1)。
注意:物理删除需谨慎,建议使用软删除机制。
业务流程:
1. 根据ID查找附件
2. 验证附件存在性
3. 设置删除标记和删除信息
4. 保存更新
:param attachment_id: 要删除的附件ID
:return: 更新后的附件对象
:rtype: DcmTaskAttachment
:raises AssertionError: 当附件不存在时抛出
"""
_attachment: cls = await cls.async_find_by_id(attachment_id)
assert _attachment, f"根据 ID {attachment_id} 未找到附件。"
# 执行删除
_del_query = delete(cls).where(cls.id == _attachment.id)
_del_count = (await cls.raw_execute(_del_query)).rowcount
echo_log(f'已删除任务附件(记录ID{_attachment.rec_id}ID{_attachment.id}.')
return _attachment
@classmethod
async def modify(cls, attachment_id: Union[str, int], **kwargs):
"""
修改已有附件信息。
注意:不允许修改 media_id、media_path 等核心存储字段,仅允许修改 metadata(如显示顺序、公开标志、备注等)。
业务流程:
1. 将 attachment_id 添加到参数中
2. 处理字符串字段去除首尾空格
3. 使用 D3iDcmTaskAttachmentForm 验证表单数据
4. 检查是否有其他附件使用了相同的关联ID和类型(排除自身)
5. 查询原附件对象
6. 验证附件存在性
7. 更新允许字段并设置更新者
8. 保存到数据库
9. 返回更新后的附件对象
:param attachment_id: 要修改的附件ID
:param kwargs: 需要更新的字段
:return: 修改后的附件对象
:rtype: DcmTaskAttachment
:raises AssertionError: 当附件不存在或信息重复时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
# 表单验证
_form = DcmTaskAttachmentForm(formdata=kwargs)
_form.validate_form()
# 检查是否与其他附件重复(排除自身)
_other = await cls.exist_other(attachment_id, _form.relation_id.data, _form.relation_type_id.data)
assert _other is None, "相同关联ID和类型的附件已存在,不能重复修改。"
# 查询原附件
_attachment: cls = await cls.async_find_by_id(attachment_id)
assert _attachment, f'查无此附件信息。'
# 仅允许更新非核心存储字段
allowed_fields = {
'display_order', 'public_flag', 'unit_name', 'media_usage',
'media_label_type_id', 'media_shot', 'media_property',
'gen_thumb', 'can_delete', 'delete_reason', 'delete_flag',
'act_def_name', 'human_name'
}
update_data = {k: v for k, v in _form.data.items() if k in allowed_fields and v is not None}
_attachment.copy_from_dict(update_data, skip_none=True).before_save()
await _attachment.async_save()
return _attachment
@classmethod
async def create_batch(cls, data_df: pd.DataFrame):
"""
批量创建新附件(传入数据应为全新记录,无需校验是否存在)。
:param data_df: 包含附件数据的 DataFrame,字段需与模型属性匹配(如 relation_id, relation_type_id 等)
:return: 成功创建的附件数量
:rtype: int
"""
if data_df.empty:
return 0
# 一次性转为字典列表(C 层高效)
records = data_df.to_dict('records')
# 用列表推导式构造对象
attachments = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records]
# 批量插入
session = cls.get_aio_session()
try:
session.add_all(attachments)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(attachments)} 条附件。")
return len(attachments)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame):
"""
批量修改已有附件。
:param data_df: 包含附件数据的 DataFrame,必须包含 id 列
:return: 成功更新的附件数量
:rtype: int
"""
if data_df.empty:
return 0
# 必须包含 id 列
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}' 列(主键)")
return 0
# 转换为字典列表
update_data = data_df.to_dict('records')
# 使用 bulk_update_mappings
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条附件。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame):
"""
批量保存数据,自动处理新建和更新。
:param data_df: 要保存的数据框架
:param user: 用户
:return: 新建和更新的数量
"""
# 筛选数据状态
_exists_df, _latest_df = await DcmTaskAttachment.exists_relation(data_df)
# 保存到数据库
_created_count = await DcmTaskAttachment.create_batch(_latest_df)
_updated_count = await DcmTaskAttachment.modify_batch(_exists_df)
return _created_count, _updated_count