import random from typing import Union, Optional, Callable import pandas as pd from sqlalchemy import select, delete from tornado_swagger.model import register_swagger_model from wtforms import StringField, IntegerField from wtforms.validators import Length import models from models.common_model import CommonModel from models.db_models import TD3iDcmTaskFileUpload from paste.core.logging import echo_log from paste.util.pagination import Pagination from paste.web.form import ModelForm class DcmTaskFileUploadForm(ModelForm): """ 文件上传关联表单验证类(完全映射 TD3iDcmTaskFileUpload 字段)。 用于验证和处理文件上传关联数据的表单。 字段完全映射数据库表 t_d3i_dcm_task_file_upload 的字段结构。 """ # 关联信息 dcm_task_id = IntegerField('任务ID', validators=[Length(min=1, message='任务ID不能为空')]) dcm_task_attachment_id = IntegerField('附件ID', validators=[Length(min=1, message='附件ID不能为空')]) dcm_media_id = IntegerField('数字城管附件ID', validators=[Length(min=1, message='数字城管附件ID不能为空')]) oa_media_id = IntegerField('OA附件ID', validators=[Length(min=1, message='OA附件ID不能为空')]) # 文件信息 file_hash = StringField('文件哈希值', validators=[Length(max=256, message='文件哈希值长度不能超过256字符')]) # 状态与时间 status = IntegerField('上传状态', validators=[Length(min=0, max=1, message='状态只能是0或1')]) created_at = IntegerField('创建时间戳(毫秒)', validators=[Length(min=1, message='创建时间不能为空')]) created_by = StringField('创建者', validators=[Length(max=64, message='创建者长度不能超过64字符')]) def process(self, formdata=None, obj=None, **kwargs): if formdata: for name, values in formdata.items(): if isinstance(values, list) and values: formdata[name] = [v.strip() if isinstance(v, str) else v for v in values] elif isinstance(values, str): formdata[name] = values.strip() super().process(formdata, obj, **kwargs) class DcmTaskFileUploadBase(TD3iDcmTaskFileUpload, CommonModel): """ 文件上传关联基础类(完全映射 TD3iDcmTaskFileUpload 字段)。 封装所有与文件上传关联相关的通用操作方法。 """ FieldMapping = { 'dcm_task_id': 'dcmTaskId', 'dcm_task_attachment_id': 'dcmTaskAttachmentId', 'dcm_media_id': 'dcmMediaId', 'oa_media_id': 'oaMediaId', 'file_hash': 'fileHash', 'status': 'status', 'created_at': 'createdAt', 'created_by': 'createdBy', } """ 文件上传关联数据映射 """ @classmethod async def is_exist(cls, dcm_task_id: Union[str, int], dcm_task_attachment_id: Union[str, int]): """ 检查是否已存在相同任务ID和附件ID的记录。 """ _query = select(cls).where( cls.dcm_task_id == dcm_task_id, cls.dcm_task_attachment_id == dcm_task_attachment_id ) _record: cls = await cls.query_first(_query) return _record @classmethod async def find_by_ids(cls, ids: list[Union[str, int]]): """ 根据ID列表批量查找记录。 """ _query = select(cls).where(cls.id.in_(ids)) _records: list[cls] = (await cls.orm_execute_scalars(_query)).all() return _records @classmethod async def search_base(cls, is_paging=True, **kwargs): """ 按参数搜索文件上传记录的基础方法。 支持字段: - dcm_task_id, dcm_task_attachment_id, dcm_media_id, oa_media_id, status :param is_paging: 是否分页 :param kwargs: 查询参数 :key int page_number: 页码(缺省随机1~100) :key int page_size: 每页数量(缺省20) :key dict sort_clause: 排序配置,如 {'created_at': 'asc'} :key int dcm_task_id: 精确匹配任务ID :key int dcm_task_attachment_id: 精确匹配附件ID :key int dcm_media_id: 精确匹配数字城管附件ID :key int oa_media_id: 精确匹配OA附件ID :key int status: 精确匹配上传状态 :return: (DataFrame, Pagination) """ page_number = kwargs.get('page_number', random.randint(1, 100)) page_size = kwargs.get('page_size', 20) kwargs.update({'page_number': page_number, 'page_size': page_size}) _query = select(cls).where( *cls.search_wheres(**kwargs) ) _paging = None if is_paging: _row_count = await cls.query_count(_query) _paging = Pagination(_row_count).paging(page_number, page_size) _data_query = _query.limit(page_size).offset(_paging.offset_size) else: _data_query = _query.where() _sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {})) if _sort_clause: _data_query = _data_query.order_by(*_sort_clause) else: _data_query = _data_query.order_by(cls.created_at) _df = await cls.query_as_df(_data_query) if not _df.empty: _df[cls.id.key] = _df[cls.id.key].astype(str) _df[cls.dcm_task_id.key] = _df[cls.dcm_task_id.key].astype(str) _df[cls.dcm_task_attachment_id.key] = _df[cls.dcm_task_attachment_id.key].astype(str) _df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True) return _df, _paging @classmethod async def search_by_attachment_ids(cls, attachment_ids: list[Union[str, int]]): query = select( cls.dcm_task_attachment_id, cls.oa_media_id, cls.updated_at ).where( cls.dcm_task_attachment_id.in_(attachment_ids) ) return await cls.query_as_df(query) @classmethod async def search(cls, **kwargs): """ 按参数搜索文件上传记录,返回分页格式数据。 """ _df, _paging = await cls.search_base(**kwargs) return { 'total': _paging.row_count if _paging else len(_df), 'rows': _df.to_dict('records'), 'pagination': { 'page_number': _paging.page_number if _paging else 1, 'page_count': _paging.page_count if _paging else 1, 'page_size': _paging.page_size if _paging else 20, }, } @classmethod async def exists_relation(cls, data_df: pd.DataFrame): """ 查找 data_df 中在数据库中已存在和不存在的记录。根据 dcm_task_id + dcm_task_attachment_id 判断。 :param data_df: 输入的数据框架,必须包含 dcm_task_id 和 dcm_task_attachment_id 列 :return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame) """ if data_df.empty: return pd.DataFrame(), pd.DataFrame() pairs = data_df[[cls.dcm_task_id.key, cls.dcm_task_attachment_id.key]].drop_duplicates().values.tolist() if not pairs: return pd.DataFrame(), data_df.copy() _query = select(cls.id, cls.dcm_task_id, cls.dcm_task_attachment_id).where( (cls.dcm_task_id.in_([p[0] for p in pairs])) & (cls.dcm_task_attachment_id.in_([p[1] for p in pairs])) ) exists_df = await cls.query_as_df(_query) if exists_df.empty: return pd.DataFrame(), data_df.copy() key_to_id_map = dict(zip(zip(exists_df[cls.dcm_task_id.key], exists_df[cls.dcm_task_attachment_id.key]), exists_df[cls.id.key])) mask_exists = data_df.apply(lambda row: (row[cls.dcm_task_id.key], row[cls.dcm_task_attachment_id.key]) in key_to_id_map, axis=1) exists_df = data_df[mask_exists].copy() exists_df[cls.id.key] = exists_df.apply(lambda row: key_to_id_map[(row[cls.dcm_task_id.key], row[cls.dcm_task_attachment_id.key])], axis=1) latest_df = data_df[~mask_exists].copy() return exists_df, latest_df @classmethod async def fill_file_upload(cls, data_df: pd.DataFrame, index_field: str = 'dcm_task_attachment_id', column_name: str = 'file_uploads', preprocessing: Optional[Callable] = None): """ 填充文件上传数据到数据框架。 :param pandas.DataFrame data_df: 待填充的数据框架 :param str index_field: 索引字段,一般是任务ID :param str column_name: 填充时新增列名称,默认为`file_uploads` :param preprocessing: 预处理函数 :return: 填充后的数据框架 """ if data_df.empty: return pd.DataFrame() _task_attachment_ids = list(set(data_df[index_field].unique().tolist())) if not _task_attachment_ids: return pd.DataFrame() _query = select(cls).where(cls.dcm_task_attachment_id.in_(_task_attachment_ids)) _upload_df: pd.DataFrame = await cls.query_as_df(_query) if not _upload_df.empty: _upload_df.replace(models.EmptyInDF+models.EmptyDatetimeInDF, '', inplace=True) _upload_df[cls.id.key] = _upload_df[cls.id.key].astype(str) _upload_df[cls.dcm_task_id.key] = _upload_df[cls.dcm_task_id.key].astype(str) _upload_df[cls.dcm_task_attachment_id.key] = _upload_df[cls.dcm_task_attachment_id.key].astype(str) _upload_df['index_id'] = _upload_df[cls.id.key] _upload_df.set_index(['index_id'], inplace=True) if isinstance(preprocessing, Callable): _upload_df = preprocessing(_upload_df) data_df[column_name] = data_df[index_field].apply( lambda x: next(iter(_upload_df.query(f"{cls.dcm_task_attachment_id.key}=='{x}'").to_dict('records')), {}) ) else: data_df[column_name] = [[] for _ in range(len(data_df))] return _upload_df @register_swagger_model class DcmTaskFileUpload(DcmTaskFileUploadBase): """ 文件上传关联业务模型类(主业务类,完全继承 TD3iDcmTaskFileUpload 字段)。 --- description: 文件上传关联表 type: object properties: id: description: 主键ID type: integer example: 1001 readOnly: true dcm_task_id: description: 任务唯一标志 type: integer example: 2001 dcm_task_attachment_id: description: 附件ID(数字城管附件) type: integer example: 3001 dcm_media_id: description: 附件ID(数字城管媒体) type: integer example: 3002 oa_media_id: description: 附件ID(OA媒体) type: integer example: 3003 file_hash: description: 文件哈希值 type: string example: "a1b2c3d4e5..." maxLength: 256 status: description: 上传状态(0:未上传/失败,1:成功) type: integer example: 1 created_at: description: 创建时间,ISO格式的日期时间字符串 type: string format: date-time example: "2024-01-15 10:30:00" readOnly: true created_by: description: 创建者用户名 type: string example: "D3I" readOnly: true updated_at: description: 修改时间,ISO格式的日期时间字符串 type: string format: date-time example: "2024-01-16 14:25:00" readOnly: true updated_by: description: 修改者用户名 type: string example: "editor" readOnly: true """ @classmethod async def create(cls, **kwargs): """ 创建新的文件上传关联记录。 业务流程: 1. 使用 DcmTaskFileUploadForm 验证表单数据 2. 检查是否已存在相同任务ID和附件ID的记录 3. 创建新对象,设置创建者 4. 保存到数据库 5. 返回创建对象 :param kwargs: 附件参数字典 :return: 新建的文件上传记录 :raises AssertionError: 当记录已存在时抛出 :raises ValidationError: 当表单验证失败时抛出 """ for _k, _v in kwargs.items(): if isinstance(_v, str): kwargs[_k] = _v.strip() _form = DcmTaskFileUploadForm(formdata=kwargs) _form.validate_form() # 检查是否已存在 _exist = await cls.is_exist(_form.dcm_task_id.data, _form.dcm_task_attachment_id.data) assert _exist is None, "相同任务ID和附件ID的文件上传记录已存在,不能重复创建。" _record = cls().copy_from_dict(_form.data, skip_none=True).before_save() await _record.async_save() return _record @classmethod async def delete(cls, id: Union[str, int]): """ 软删除(不推荐物理删除,但可设 status=-1 或删除记录) 本系统暂不支持软删除,建议直接物理删除。 """ _record: cls = await cls.async_find_by_id(id) assert _record, f"根据 ID {id} 未找到文件上传记录。" _del_query = delete(cls).where(cls.id == _record.id) _del_count = (await cls.raw_execute(_del_query)).rowcount echo_log(f'已删除文件上传记录(ID:{_record.id}).') return _record @classmethod async def modify(cls, id: Union[str, int], **kwargs): """ 修改文件上传记录。 注意:仅允许修改 status、created_by 等非核心关联字段。 核心字段(dcm_task_id, dcm_task_attachment_id, dcm_media_id, oa_media_id)不允许修改。 :param id: 记录ID :param kwargs: 更新字段 :return: 更新后的记录 :raises AssertionError: 当记录不存在或字段被非法修改时抛出 :raises ValidationError: 当表单验证失败时抛出 """ for _k, _v in kwargs.items(): if isinstance(_v, str): kwargs[_k] = _v.strip() _form = DcmTaskFileUploadForm(formdata=kwargs) _form.validate_form() # 核心字段禁止修改 protected_fields = {'dcm_task_id', 'dcm_task_attachment_id', 'dcm_media_id', 'oa_media_id'} disallowed_fields = set(kwargs.keys()) & protected_fields assert not disallowed_fields, f"禁止修改核心字段: {disallowed_fields}" _record: cls = await cls.async_find_by_id(id) assert _record, f'查无此文件上传记录(ID:{id})。' # 允许更新的字段 allowed_fields = {'status', 'created_by'} update_data = {k: v for k, v in _form.data.items() if k in allowed_fields and v is not None} _record.copy_from_dict(update_data, skip_none=True).before_save() await _record.async_save() return _record @classmethod async def create_batch(cls, data_df: pd.DataFrame): """ 批量创建文件上传记录(传入数据应为全新记录)。 :param data_df: 包含字段 dcm_task_id, dcm_task_attachment_id, dcm_media_id, oa_media_id, file_hash, status, created_at, created_by 的 DataFrame :return: 成功创建的数量 """ if data_df.empty: return 0 records = data_df.to_dict('records') records = [cls().copy_from_dict(r, skip_none=True).before_save() for r in records] session = cls.get_aio_session() try: session.add_all(records) await session.commit() except Exception as e: await session.rollback() raise e finally: await session.close() echo_log(f"批量创建成功:创建 {len(records)} 条文件上传记录。") return len(records) @classmethod async def modify_batch(cls, data_df: pd.DataFrame): """ 批量修改文件上传记录。 :param data_df: 必须包含 id 列的 DataFrame :return: 成功更新的数量 """ if data_df.empty: return 0 if 'id' not in data_df.columns: echo_log("错误:modify_batch 要求输入数据必须包含 'id' 列(主键)") return 0 update_data = data_df.to_dict('records') session = cls.get_aio_session() try: await session.run_sync( lambda sync_session: sync_session.bulk_update_mappings(cls, update_data) ) await session.commit() updated_count = len(update_data) except Exception as e: await session.rollback() raise e finally: await session.close() echo_log(f"批量修改成功:更新 {updated_count} 条文件上传记录。") return updated_count @classmethod async def save_batch(cls, data_df: pd.DataFrame): """ 批量保存数据,自动处理新建和更新。 :param data_df: 要保存的数据框架,必须包含 dcm_task_id 和 dcm_task_attachment_id :return: (新建数量, 更新数量) """ _exists_df, _latest_df = await DcmTaskFileUpload.exists_relation(data_df) _created_count = await DcmTaskFileUpload.create_batch(_latest_df) _updated_count = await DcmTaskFileUpload.modify_batch(_exists_df) return _created_count, _updated_count