import random from typing import Union, Optional, Callable import pandas as pd from sqlalchemy import select, delete from tornado_swagger.model import register_swagger_model from wtforms import StringField, TextAreaField, IntegerField from wtforms.validators import Length import models from models.common_model import CommonModel from models.db_models import TD3iDcmTaskProcessInfo from paste.core.logging import echo_log from paste.util.pagination import Pagination from paste.web.form import ModelForm class DcmTaskProcessInfoForm(ModelForm): """ 办理经过表单验证类(完全映射 TD3iDcmTaskProcessInfo 字段)。 用于验证和处理数字城管-部门待办任务办理经过的记录/更新表单数据。 字段完全映射数据库表 t_d3i_dcm_task_process_info 的字段结构。 """ # 基础信息 raw_id = IntegerField('原始主键ID') rec_id = IntegerField('记录ID') act_id = IntegerField('任务ID') act_def_id = IntegerField('流程节点定义ID') act_def_name = StringField('流程节点名称', validators=[Length(max=100, message='流程节点名称长度不能超过100字符')]) act_time_state_id = IntegerField('操作时间状态ID') act_limit_info = StringField('操作时限信息', validators=[Length(max=255, message='时限信息长度不能超过255字符')]) act_used_time_char = StringField('已用时间(字符串)', validators=[Length(max=50, message='已用时间描述长度不能超过50字符')]) act_remain_time_char = StringField('剩余时间(字符串)', validators=[Length(max=50, message='剩余时间描述长度不能超过50字符')]) act_deadline_time = IntegerField('操作截止时间戳(毫秒)') act_property_id = IntegerField('操作属性ID') # 操作信息 action_name = StringField('操作动作名称', validators=[Length(max=100, message='操作动作名称长度不能超过100字符')]) action_time = IntegerField('操作时间戳(毫秒)') title = StringField('操作标题', validators=[Length(max=100, message='标题长度不能超过100字符')]) detail = TextAreaField('操作详细意见', validators=[Length(max=65535, message='意见长度不能超过65535字符')]) backup_detail = TextAreaField('备用意见', validators=[Length(max=65535, message='备用意见长度不能超过65535字符')]) medias = TextAreaField('附件信息(JSON格式)', validators=[Length(max=65535, message='附件信息长度不能超过65535字符')]) # 单位与人员 unit_name = StringField('当前操作单位', validators=[Length(max=100, message='单位名称长度不能超过100字符')]) unit_contact = StringField('单位联系方式', validators=[Length(max=255, message='联系方式长度不能超过255字符')]) human_id = IntegerField('操作人ID') human_name = StringField('操作人名称(含单位)', validators=[Length(max=255, message='操作人名称长度不能超过255字符')]) role_name = StringField('当前角色名称', validators=[Length(max=100, message='角色名称长度不能超过100字符')]) # 项目信息 item_id = IntegerField('项目ID') item_type_id = IntegerField('任务类型ID') item_content = TextAreaField('任务内容摘要', validators=[Length(max=65535, message='内容摘要长度不能超过65535字符')]) item_process_info_list = TextAreaField('子流程列表(JSON)', validators=[Length(max=65535, message='子流程列表长度不能超过65535字符')]) sub_process_info = TextAreaField('子流程信息', validators=[Length(max=65535, message='子流程信息长度不能超过65535字符')]) # 捆绑信息 bundle_time_state_id = IntegerField('组合时间状态ID') bundle_limit_info = StringField('组合时限信息', validators=[Length(max=255, message='组合时限信息长度不能超过255字符')]) bundle_used_char = StringField('组合已用时间', validators=[Length(max=50, message='组合已用时间长度不能超过50字符')]) bundle_remain_char = StringField('组合剩余时间', validators=[Length(max=50, message='组合剩余时间长度不能超过50字符')]) bundle_deadline_time = IntegerField('组合截止时间戳(毫秒)') # 下一节点信息 show_unit_contact = IntegerField('是否显示单位联系方式') pre_unit_name = StringField('上一单位', validators=[Length(max=100, message='上一单位名称长度不能超过100字符')]) pre_action_name = StringField('上一操作名称', validators=[Length(max=100, message='上一操作名称长度不能超过100字符')]) pre_human_name = StringField('上一操作人', validators=[Length(max=255, message='上一操作人名称长度不能超过255字符')]) pre_act_opinion = TextAreaField('上一操作意见', validators=[Length(max=65535, message='上一操作意见长度不能超过65535字符')]) next_act_def_name = StringField('下一节点名称', validators=[Length(max=100, message='下一节点名称长度不能超过100字符')]) next_role_part_name = StringField('下一角色/单位', validators=[Length(max=255, message='下一角色/单位长度不能超过255字符')]) next_role_name = StringField('下一角色名称', validators=[Length(max=100, message='下一角色名称长度不能超过100字符')]) next_act_property_id = IntegerField('下一操作属性ID') # 最后节点标识 last_act_flag = IntegerField('是否为最后一节点(0否,1是)') def process(self, formdata=None, obj=None, **kwargs): if formdata: for name, values in formdata.items(): if isinstance(values, list) and values: formdata[name] = [v.strip() if isinstance(v, str) else v for v in values] elif isinstance(values, str): formdata[name] = values.strip() super().process(formdata, obj, **kwargs) class DcmTaskProcessInfoBase(TD3iDcmTaskProcessInfo, CommonModel): """ 办理经过基础类(完全映射 TD3iDcmTaskProcessInfo 字段)。 封装所有与任务办理经过相关的通用操作方法。 """ FieldMapping = { 'raw_id': 'id', 'act_id': 'actID', 'act_def_id': 'actDefID', 'act_def_name': 'actDefName', 'act_time_state_id': 'actTimeStateID', 'act_limit_info': 'actLimitInfo', 'act_used_time_char': 'actUsedTimeChar', 'act_remain_time_char': 'actRemainTimeChar', 'act_deadline_time': 'actDeadlineTime', 'act_property_id': 'actPropertyID', 'action_name': 'actionName', 'action_time': 'actionTime', 'title': 'title', 'detail': 'detail', 'backup_detail': 'backupDetail', 'medias': 'medias', 'unit_name': 'unitName', 'unit_contact': 'unitContact', 'human_id': 'humanID', 'human_name': 'humanName', 'role_name': 'roleName', 'item_id': 'itemID', 'item_type_id': 'itemTypeID', 'item_content': 'itemContent', 'item_process_info_list': 'itemProcessInfoList', 'sub_process_info': 'subProcessInfo', 'bundle_time_state_id': 'bundleTimeStateID', 'bundle_limit_info': 'bundleLimitInfo', 'bundle_used_char': 'bundleUsedChar', 'bundle_remain_char': 'bundleRemainChar', 'bundle_deadline_time': 'bundleDeadlineTime', 'show_unit_contact': 'showUnitContact', 'pre_unit_name': 'preUnitName', 'pre_action_name': 'preActionName', 'pre_human_name': 'preHumanName', 'pre_act_opinion': 'preActOpinion', 'next_act_def_name': 'nextActDefName', 'next_role_part_name': 'nextRolePartName', 'next_role_name': 'nextRoleName', 'next_act_property_id': 'nextActPropertyID', 'last_act_flag': 'lastActFlag', } """ 处理流程映射 """ @classmethod async def exist_other(cls, id: Union[str, int], rec_id: Union[str, int], act_id: Union[str, int]): """ 检查是否存在除当前记录外的其他同任务ID或同原始主键ID的记录。 :param act_id: 当前任务ID :param rec_id: 原始主键ID :return: 存在返回记录对象,不存在返回None """ _query = select(cls).where(cls.id != id, cls.rec_id == rec_id, cls.act_id == act_id) _record: cls = await cls.query_first(_query) return _record @classmethod async def find_by_ids(cls, ids: list[Union[str, int]]): """ 根据ID列表批量查找办理经过。 """ _query = select(cls).where(cls.id.in_(ids)) _record_list: list[cls] = (await cls.orm_execute_scalars(_query)).all() return _record_list @classmethod async def is_exist(cls, rec_id: Union[str, int], act_id: Union[str, int]): """ 检查办理经过是否已经存在(根据原始主键ID)。 """ _query = select(cls).where(cls.rec_id == rec_id, cls.act_id == act_id) _record: cls = await cls.query_first(_query) return _record @classmethod async def search_base(cls, is_paging=True, **kwargs): """ 按参数搜索办理流程记录的基础方法。 支持字段: - act_id, unit_name, human_name, role_name, last_act_flag - 支持模糊匹配:act_def_name, action_name, title, item_content :param is_paging: 是否分页 :param kwargs: 查询参数 :key int page_number: 页码(缺省随机1~100) :key int page_size: 每页数量(缺省20) :key dict sort_clause: 排序配置,如 {'action_time': 'desc'} :key int act_id: 精确匹配任务ID :key str unit_name: 精确匹配单位 :key str human_name: 精确匹配操作人 :key str role_name: 精确匹配角色 :key int last_act_flag: 精确匹配是否为最后一节点 :key str act_def_name: 模糊匹配流程节点名称 :key str action_name: 模糊匹配操作动作 :key str title: 模糊匹配标题 :key str item_content: 模糊匹配内容摘要 :key int action_time_start: 时间范围起始(毫秒) :key int action_time_end: 时间范围结束(毫秒) :return: (DataFrame, Pagination) """ page_number = kwargs.get('page_number', random.randint(1, 100)) page_size = kwargs.get('page_size', 20) kwargs.update({'page_number': page_number, 'page_size': page_size}) _name_likes = { cls.act_def_name.key: '%{}%', cls.action_name.key: '%{}%', cls.title.key: '%{}%', cls.item_content.key: '%{}%', } _query = select(cls).where( *cls.search_wheres(likes=_name_likes, **kwargs) ) if 'action_time_start' in kwargs and 'action_time_end' in kwargs: _query = _query.where( cls.action_time >= kwargs['action_time_start'], cls.action_time <= kwargs['action_time_end'] ) _paging = None if is_paging: _row_count = await cls.query_count(_query) _paging = Pagination(_row_count).paging(page_number, page_size) _data_query = _query.limit(page_size).offset(_paging.offset_size) else: _data_query = _query.where() _sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {})) if _sort_clause: _data_query = _data_query.order_by(*_sort_clause) else: _data_query = _data_query.order_by(cls.action_time.desc()) _process_df = await cls.query_as_df(_data_query) if not _process_df.empty: _process_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True) return _process_df, _paging @classmethod async def search(cls, **kwargs): """ 按参数搜索办理流程记录,返回分页格式数据。 """ _process_df, _paging = await cls.search_base(**kwargs) return { 'total': _paging.row_count, 'rows': _process_df.to_dict('records'), 'pagination': { 'page_number': _paging.page_number, 'page_count': _paging.page_count, 'page_size': _paging.page_size, }, } @classmethod async def exists_rec_id(cls, data_df: pd.DataFrame): """ 查找 data_df 中在数据库中已存在和不存在的记录。根据 raw_id 字段判断。 :param data_df: 输入的数据框架,必须包含 raw_id 列 :return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame) - exists_df: 在数据库中存在的记录 - latest_df: 在数据库中不存在的记录 """ if data_df.empty: return pd.DataFrame(), pd.DataFrame() # 获取待查询的 (rec_id, act_id) 组合 pairs = data_df[[cls.rec_id.key, cls.act_id.key]].drop_duplicates().values.tolist() if not pairs: return pd.DataFrame(), data_df.copy() # 查询数据库中已存在的记录 _query = select(cls.id, cls.rec_id, cls.act_id).where( (cls.rec_id.in_([p[0] for p in pairs])) & (cls.act_id.in_([p[1] for p in pairs])) ) exists_df = await cls.query_as_df(_query) if exists_df.empty: return pd.DataFrame(), data_df.copy() # 构建 (rec_id, act_id) -> id 的映射 key_to_id_map = dict(zip(zip(exists_df[cls.rec_id.key], exists_df[cls.act_id.key]), exists_df[cls.id.key])) # 根据组合是否在数据库中划分数据 mask_exists = data_df.apply(lambda row: (row[cls.rec_id.key], row[cls.act_id.key]) in key_to_id_map, axis=1) exists_df = data_df[mask_exists].copy() exists_df[cls.id.key] = exists_df.apply(lambda row: key_to_id_map[(row[cls.rec_id.key], row[cls.act_id.key])], axis=1) latest_df = data_df[~mask_exists].copy() return exists_df, latest_df @classmethod async def fill_process_info(cls, data_df: pd.DataFrame, index_field: str = 'id', column_name: str = 'process_infos', preprocessing: Optional[Callable] = None): """ 填充办理过程数据到数据框架。 用于在查询结果中添加关联的办理过程信息。 :param pandas.DataFrame data_df: 待填充的数据框架 :param str index_field: 索引字段,一般是任务ID :param str column_name: 填充时,新增加的列名称,默认为`process_infos` :param preprocessing: 预处理,注意预处理必须要返回处理后的结果 :return: 办理过程数据框架(已填充) :rtype: pandas.DataFrame """ if data_df.empty: return pd.DataFrame() _task_ids = list(set(data_df[index_field].unique().tolist())) if not _task_ids: return pd.DataFrame() _query = select(cls).where(cls.dcm_task_id.in_(_task_ids)) _info_df: pd.DataFrame = await cls.query_as_df(_query) if not _info_df.empty: _info_df.replace(models.EmptyInDF+models.EmptyDatetimeInDF, '', inplace=True) # 整理输出数据类型 _info_df[cls.id.key] = _info_df[cls.id.key].astype(str) _info_df[cls.dcm_task_id.key] = _info_df[cls.dcm_task_id.key].astype(str) # 设置索引 _info_df['index_id'] = _info_df[cls.dcm_task_id.key] _info_df.set_index(['index_id'], inplace=True) # 对数据进行预处理 if isinstance(preprocessing, Callable): _info_df = preprocessing(_info_df) # 增加数据填充列 data_df[column_name] = data_df[index_field].apply( lambda x: _info_df.query(f"{cls.dcm_task_id.key}=='{x}'").to_dict('records') ) else: data_df[column_name] = [[] for _ in range(len(data_df))] return _info_df @register_swagger_model class DcmTaskProcessInfo(DcmTaskProcessInfoBase): """ 办理经过业务模型类(主业务类,完全继承 TD3iDcmTaskProcessInfo 字段)。 --- description: 数字城管-部门待办任务办理经过 type: object properties: id: description: 主键ID type: integer example: 1001 readOnly: true raw_id: description: 原始主键ID type: integer example: 2001 act_id: description: 任务ID type: integer example: 3001 act_def_id: description: 流程节点定义ID type: integer example: 101 act_def_name: description: 流程节点名称 type: string example: "受理" maxLength: 100 act_time_state_id: description: 操作时间状态ID type: integer example: 1 act_limit_info: description: 操作时限信息 type: string example: "24小时内" maxLength: 255 act_used_time_char: description: 已用时间(字符串) type: string example: "12小时" maxLength: 50 act_remain_time_char: description: 剩余时间(字符串) type: string example: "12小时" maxLength: 50 act_deadline_time: description: 操作截止时间戳(毫秒) type: integer example: 1714578000000 act_property_id: description: 操作属性ID type: integer example: 5 action_name: description: 操作动作名称(如批转、回退) type: string example: "批转" maxLength: 100 action_time: description: 操作时间戳(毫秒) type: integer example: 1714567890000 title: description: 操作标题 type: string example: "案件受理完成" maxLength: 100 detail: description: 操作详细意见 type: string example: "经核查,该案件属实,已转交市政工程处处理。" maxLength: 65535 backup_detail: description: 备用意见 type: string example: "系统自动记录" maxLength: 65535 medias: description: 附件信息(JSON格式) type: string example: "[{\"media_id\":3001,\"name\":\"photo.jpg\"}]" maxLength: 65535 unit_name: description: 当前操作单位 type: string example: "市政工程处" maxLength: 100 unit_contact: description: 单位联系方式 type: string example: "025-88888888" maxLength: 255 human_id: description: 操作人ID,-1为系统 type: integer example: 101 human_name: description: 操作人名称(含单位) type: string example: "张三(市政工程处)" maxLength: 255 role_name: description: 当前角色名称 type: string example: "审批员" maxLength: 100 item_id: description: 项目ID type: integer example: 4001 item_type_id: description: 任务类型ID type: integer example: 101 item_content: description: 任务内容摘要 type: string example: "中山路破损路面修复" maxLength: 65535 item_process_info_list: description: 子流程列表(JSON) type: string example: "[{\"node\":\"受理\",\"time\":1714567890000}]" maxLength: 65535 sub_process_info: description: 子流程信息 type: string example: "{\"sub1\":\"已处理\",\"sub2\":\"待验收\"}" maxLength: 65535 bundle_time_state_id: description: 组合时间状态ID type: integer example: 2 bundle_limit_info: description: 组合时限信息 type: string example: "48小时内" maxLength: 255 bundle_used_char: description: 组合已用时间 type: string example: "36小时" maxLength: 50 bundle_remain_char: description: 组合剩余时间 type: string example: "12小时" maxLength: 50 bundle_deadline_time: description: 组合截止时间戳(毫秒) type: integer example: 1714580000000 show_unit_contact: description: 是否显示单位联系方式 type: integer example: 1 pre_unit_name: description: 上一单位 type: string example: "街道办" maxLength: 100 pre_action_name: description: 上一操作名称 type: string example: "初审" maxLength: 100 pre_human_name: description: 上一操作人 type: string example: "李四(街道办)" maxLength: 255 pre_act_opinion: description: 上一操作意见 type: string example: "建议转交专业部门处理" maxLength: 65535 next_act_def_name: description: 下一节点名称 type: string example: "审批" maxLength: 100 next_role_part_name: description: 下一角色/单位 type: string example: "市政工程处-审批组" maxLength: 255 next_role_name: description: 下一角色名称 type: string example: "审批员" maxLength: 100 next_act_property_id: description: 下一操作属性ID type: integer example: 6 last_act_flag: description: 是否为最后一节点(0否,1是) type: integer example: 0 created_at: description: 创建时间,ISO格式的日期时间字符串 type: string format: date-time example: "2024-01-15 10:30:00" readOnly: true created_by: description: 创建者用户名 type: string example: "admin" readOnly: true updated_at: description: 修改时间,ISO格式的日期时间字符串 type: string format: date-time example: "2024-01-16 14:25:00" readOnly: true updated_by: description: 修改者用户名 type: string example: "editor" readOnly: true """ @classmethod async def create(cls, **kwargs): """ 创建办理经过。 业务流程: 1. 使用 D3iDcmTaskProcessInfoForm 验证表单数据 2. 设置创建者、更新者 3. 保存到数据库 4. 返回创建的流程记录对象 :param kwargs: 办理经过参数字典 :return: 新建流程记录对象 :rtype: DcmTaskProcessInfo :raises ValidationError: 当表单验证失败时抛出 """ # 处理字符串字段去除空格 for _k, _v in kwargs.items(): if isinstance(_v, str): kwargs[_k] = _v.strip() _form = DcmTaskProcessInfoForm(formdata=kwargs) _form.validate_form() # 检查是否存在同记录ID的任务 _task: cls = await cls.is_exist(_form.rec_id.data, _form.act_id.data) assert _task is None, "记录ID已存在,不能重复创建。" _record = cls().copy_from_dict(_form.data, skip_none=True).before_save() await _record.async_save() return _record @classmethod async def delete(cls, process_id: Union[str, int]): """ 软删除办理经过(设置 delete_flag=1)。 :param process_id: 要删除的办理经过ID :return: 删除的流程记录对象 :rtype: DcmTaskProcessInfo :raises AssertionError: 当记录不存在时抛出 """ _record: cls = await cls.async_find_by_id(process_id) assert _record, f"根据 ID {process_id} 未找到办理经过。" # 执行删除 _del_query = delete(cls).where(cls.id == _record.id) _del_count = (await cls.raw_execute(_del_query)).rowcount echo_log(f'已删除任务办理经过(记录ID:{_record.rec_id},ID:{_record.id}).') return _record @classmethod async def modify(cls, process_id: Union[str, int], **kwargs): """ 修改办理经过(仅允许修改非核心流程字段,如意见、标题、联系方式等)。 注意:不允许修改 act_id、action_time、act_def_name 等关键流程节点信息。 业务流程: 1. 将 process_id 加入参数 2. 处理字符串字段去除空格 3. 使用表单验证 4. 查询原记录 5. 验证存在性 6. 更新允许字段 7. 设置更新者 8. 保存 :param process_id: 要修改的办理经过ID :param kwargs: 需要更新的字段 :return: 修改后的流程记录对象 :rtype: DcmTaskProcessInfo :raises AssertionError: 当记录不存在时抛出 :raises ValidationError: 当表单验证失败时抛出 """ kwargs[cls.id.key] = process_id for _k, _v in kwargs.items(): if isinstance(_v, str): kwargs[_k] = _v.strip() _form = DcmTaskProcessInfoForm(formdata=kwargs) _form.validate_form() _record: cls = await cls.async_find_by_id(process_id) assert _record, f'查无此办理经过。' # 仅允许更新非核心流程字段 allowed_fields = { 'title', 'detail', 'backup_detail', 'medias', 'unit_contact', 'human_name', 'role_name', 'item_content', 'item_process_info_list', 'sub_process_info', 'show_unit_contact', 'pre_act_opinion', 'pre_human_name' } update_data = {k: v for k, v in _form.data.items() if k in allowed_fields and v is not None} _record.copy_from_dict(update_data, skip_none=True).before_save() await _record.async_save() return _record @classmethod async def create_batch(cls, data_df: pd.DataFrame): """ 批量创建新办理经过(传入数据应为全新记录,无需校验是否存在)。 :param data_df: 包含办理经过数据的 DataFrame,字段需与模型属性匹配(如 raw_id, act_id 等) :return: 成功创建的记录数量 :rtype: int """ if data_df.empty: return 0 # 一次性转为字典列表(C 层高效) records = data_df.to_dict('records') # 用列表推导式构造对象 records = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records] # 批量插入 session = cls.get_aio_session() try: session.add_all(records) await session.commit() except Exception as e: await session.rollback() raise e finally: await session.close() echo_log(f"批量创建成功:创建 {len(records)} 条任务办理经过。") return len(records) @classmethod async def modify_batch(cls, data_df: pd.DataFrame): """ 批量修改已有办理经过。 :param data_df: 包含办理经过数据的 DataFrame :return: 成功更新的记录数量 :rtype: int """ if data_df.empty: return 0 # 必须包含 id 列 if 'id' not in data_df.columns: echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}' 列(主键)") return 0 # 转换为字典列表 update_data = data_df.to_dict('records') # 使用 bulk_update_mappings session = cls.get_aio_session() try: await session.run_sync( lambda sync_session: sync_session.bulk_update_mappings(cls, update_data) ) await session.commit() updated_count = len(update_data) except Exception as e: await session.rollback() raise e finally: await session.close() echo_log(f"批量修改成功:更新 {updated_count} 条任务办理经过。") return updated_count @classmethod async def save_batch(cls, data_df: pd.DataFrame): """ 批量保存数据,自动处理新建和更新。 :param data_df: 要保存的数据框架 :return: 新建和更新的数量 """ # 筛选数据状态 _exists_df, _latest_df = await DcmTaskProcessInfo.exists_rec_id(data_df) # 保存到数据库 _created_count = await DcmTaskProcessInfo.create_batch(_latest_df) _updated_count = await DcmTaskProcessInfo.modify_batch(_exists_df) return _created_count, _updated_count