import datetime import random from typing import Union import pandas as pd from sqlalchemy import select, delete from tornado_swagger.model import register_swagger_model from wtforms import StringField, TextAreaField, IntegerField, FloatField from wtforms.validators import Length import models from models.common_model import CommonModel from models.db_models import TD3iDcmTask from paste.core.logging import echo_log from paste.rbac.rbac_user import RbacUser from paste.util.pagination import Pagination from paste.web.form import ModelForm class DcmTaskForm(ModelForm): """ 专业表单验证类(已完全根据 TD3iDcmTask 字段重构)。 用于验证和处理数字城管-部门待办任务的创建/修改表单数据。 字段完全映射数据库表 t_dcm_department_task 的字段结构。 """ # 基础信息 rec_id = IntegerField('记录ID') rec_disp_num = StringField('显示编号', validators=[Length(max=50, message='显示编号长度不能超过50字符')]) rec_type_id = IntegerField('类型ID') rec_type_name = StringField('案件类型', validators=[Length(max=100, message='案件类型长度不能超过100字符')]) # 任务信息 act_id = IntegerField('任务ID') act_deadline_time = IntegerField('任务截止时间戳(毫秒)') act_warning_time = IntegerField('预警时间戳(毫秒)') act_property_id = IntegerField('任务属性ID') act_ard_state_name = StringField('阶段授权状态', validators=[Length(max=50, message='阶段授权状态长度不能超过50字符')]) act_time_state_id = IntegerField('阶段状态ID') # 业务信息 biz_id = IntegerField('业务ID') sys_id = IntegerField('系统ID') task_num = StringField('任务号', validators=[Length(max=50, message='任务号长度不能超过50字符')]) other_task_num = StringField('第三方任务号', validators=[Length(max=100, message='第三方任务号长度不能超过100字符')]) bundle_remain_char = StringField('剩余时间描述', validators=[Length(max=20, message='剩余时间描述长度不能超过20字符')]) bundle_deadline_time = IntegerField('捆绑截止时间戳') bundle_deadline_char = StringField('捆绑截止时间描述', validators=[Length(max=20, message='捆绑截止时间描述长度不能超过20字符')]) bundle_warning_time = IntegerField('捆绑预警时间戳') bundle_time_state_id = IntegerField('捆绑阶段红绿灯状态') # 事件信息 event_type_id = IntegerField('问题类型ID') max_event_type_id = IntegerField('最大事件类型ID') event_type_name = StringField('问题类型', validators=[Length(max=100, message='问题类型长度不能超过100字符')]) event_src_name = StringField('问题来源', validators=[Length(max=100, message='问题来源长度不能超过100字符')]) event_desc = TextAreaField('问题描述', validators=[Length(max=65535, message='问题描述长度不能超过65535字符')]) # 紧急程度与分类 urgency_level = IntegerField('紧急程度(0正常,1紧急)') main_type_id = IntegerField('大类ID') main_type_name = StringField('大类名称', validators=[Length(max=100, message='大类名称长度不能超过100字符')]) sub_type_id = IntegerField('小类ID') sub_type_name = StringField('小类名称', validators=[Length(max=100, message='小类名称长度不能超过100字符')]) # 地址与坐标 address = TextAreaField('地址描述', validators=[Length(max=65535, message='地址描述长度不能超过65535字符')]) district_name = StringField('所属区域', validators=[Length(max=50, message='所属区域长度不能超过50字符')]) coordinate_x = FloatField('经度') coordinate_y = FloatField('纬度') # 处理流程 proc_time_state_id = IntegerField('处理流程状态ID') deadline_time = IntegerField('处理截止时间戳') warning_time = IntegerField('处理预警时间戳') processing_deadline = StringField('处置时限描述', validators=[Length(max=50, message='处置时限描述长度不能超过50字符')]) new_inst_cond_name = StringField('立案条件', validators=[Length(max=200, message='立案条件长度不能超过200字符')]) case_closure_condition = StringField('结案条件', validators=[Length(max=200, message='结案条件长度不能超过200字符')]) # 回复与回访 reply_intime = IntegerField('是否两小时回复(0无需回复,1待回复,2已回复,3超时,4无需回复已恢复)') return_visit_flag = IntegerField('回访标识(0无需,1待回访,2已回访)') # 部门信息 first_depart_name = StringField('一级专业部门', validators=[Length(max=100, message='一级专业部门长度不能超过100字符')]) second_depart_name = StringField('二级专业部门', validators=[Length(max=100, message='二级专业部门长度不能超过100字符')]) # 举报人信息 reporter_name = StringField('举报人姓名', validators=[Length(max=100, message='举报人姓名长度不能超过100字符')]) reporter_contact = StringField('举报电话', validators=[Length(max=50, message='举报电话长度不能超过50字符')]) # 阅读与颜色 read_flag = IntegerField('是否已读(0未读,1已读)') back_color_bit_id = IntegerField('背景色ID') font_color_bit_id = IntegerField('字体色ID') # 部件与显示 part_code = StringField('部件编码', validators=[Length(max=100, message='部件编码长度不能超过100字符')]) display_style_id = IntegerField('显示样式ID') # 功能控制 func_forbid_reporter_info_flag = IntegerField('是否禁止举报人信息') def process(self, formdata=None, obj=None, **kwargs): """ 处理表单数据,在数据绑定前进行预处理。 主要功能: - 遍历所有表单字段 - 对字符串类型的值去除两端空白字符 - 调用父类的process方法继续处理 """ if formdata: for name, values in formdata.items(): if isinstance(values, list) and values: formdata[name] = [v.strip() if isinstance(v, str) else v for v in values] elif isinstance(values, str): formdata[name] = values.strip() super().process(formdata, obj, **kwargs) class DcmTaskBase(TD3iDcmTask, CommonModel): """ 专业基础类(已完全映射 TD3iDcmTask 字段)。 继承自数据库模型 TD3iDcmTask 和通用模型 CommonModel。 封装所有与部门待办任务相关的通用操作方法。 """ FieldMapping = { 'id': 'id', 'rec_id': 'rec_id', 'rec_disp_num': 'rec_disp_num', 'rec_type_id': 'rec_type_id', 'rec_type_name': 'rec_type_name', 'act_id': 'act_id', 'act_deadline_time': 'act_deadline_time', 'act_warning_time': 'act_warning_time', 'act_property_id': 'act_property_id', 'act_ard_state_name': 'act_ard_state_name', 'act_time_state_id': 'act_time_state_id', 'biz_id': 'biz_id', 'sys_id': 'sys_id', 'task_num': 'task_num', 'other_task_num': 'other_task_num', 'bundle_remain_char': 'bundle_remain_char', 'bundle_deadline_time': 'bundle_deadline_time', 'bundle_deadline_char': 'bundle_deadline_char', 'bundle_warning_time': 'bundle_warning_time', 'bundle_time_state_id': 'bundle_time_state_id', 'rollback_deadline': 'rollback_deadline', 'event_type_id': 'event_type_id', 'max_event_type_id': 'max_event_type_id', 'event_type_name': 'event_type_name', 'event_src_name': 'event_src_name', 'event_desc': 'event_desc', 'urgency_level': '紧急程度', 'main_type_id': 'main_type_id', 'main_type_name': 'main_type_name', 'sub_type_id': 'sub_type_id', 'sub_type_name': 'sub_type_name', 'address': 'address', 'district_name': 'district_name', 'coordinate_x': 'coordinate_x', 'coordinate_y': 'coordinate_y', 'proc_time_state_id': 'proc_time_state_id', 'deadline_time': 'deadline_time', 'warning_time': 'warning_time', 'processing_deadline': '处置时限', 'new_inst_cond_name': 'new_inst_cond_name', 'case_closure_condition': '结案条件', 'reply_intime': 'reply_intime', 'return_visit_flag': 'return_visit_flag', 'first_depart_name': 'first_depart_name', 'second_depart_name': 'second_depart_name', 'reporter_name': 'reporter_name', 'reporter_contact': 'reporter_contact', 'read_flag': 'read_flag', 'back_color_bit_id': 'back_color_bit_id', 'font_color_bit_id': 'font_color_bit_id', 'part_code': 'part_code', 'display_style_id': 'display_style_id', 'func_forbid_reporter_info_flag': 'func_forbid_reporter_info_flag', } """ 任务数据映射 """ @classmethod async def exist_other(cls, id: Union[str, int], rec_id: str): """ 检查是否存在除当前任务外的其他同任务号或同显示编号的任务。 :param id: 当前任务ID :param task_num: 任务号 :param rec_disp_num: 显示编号 :return: 存在返回任务对象,不存在返回None """ _query = select(cls).where(cls.id != id, cls.rec_id == rec_id) _task: cls = await cls.query_first(_query) return _task @classmethod async def find_by_ids(cls, ids: list[Union[str, int]]): """ 根据ID列表批量查找任务数据。 """ _query = select(cls).where(cls.id.in_(ids)) _task_list: list[cls] = (await cls.orm_execute_scalars(_query)).all() return _task_list @classmethod async def is_exist(cls, rec_id: str): """ 检查任务是否已经存在(根据任务号或显示编号)。 """ _query = select(cls).where(cls.rec_id == rec_id) _task: cls = await cls.query_first(_query) return _task @classmethod async def search_base(cls, is_paging=True, **kwargs): """ 按参数搜索任务数据的基础方法。 支持字段: - task_num, rec_disp_num, event_type_name, district_name, urgency_level, read_flag 等 - 支持模糊匹配:event_type_name, rec_type_name, event_src_name, first_depart_name, second_depart_name - 支持精确匹配:biz_id, sys_id, urgency_level, read_flag, rec_type_id, act_time_state_id 等 :param is_paging: 是否分页 :param kwargs: 查询参数 :key int page_number: 页码(缺省随机1~100) :key int page_size: 每页数量(缺省20) :key dict sort_clause: 排序配置,如 {'task_num': 'asc'} :key str task_num: 精确匹配任务号 :key str rec_disp_num: 精确匹配显示编号 :key str event_type_name: 模糊匹配问题类型 :key str district_name: 精确匹配区域 :key int urgency_level: 精确匹配紧急程度 :key int read_flag: 精确匹配是否已读 :key int biz_id: 精确匹配业务ID :key int sys_id: 精确匹配系统ID :key int rec_type_id: 精确匹配类型ID :key int act_time_state_id: 精确匹配阶段状态ID :key int deadline_time: 精确匹配处理截止时间戳 :key int act_deadline_time: 精确匹配任务截止时间戳 :return: (DataFrame, Pagination) """ page_number = kwargs.get('page_number', random.randint(1, 100)) page_size = kwargs.get('page_size', 20) kwargs.update({'page_number': page_number, 'page_size': page_size}) # 模糊查询字段 _name_likes = { cls.event_type_name.key: '%{}%', cls.rec_type_name.key: '%{}%', cls.event_src_name.key: '%{}%', cls.first_depart_name.key: '%{}%', cls.second_depart_name.key: '%{}%', } _query = select(cls).where( *cls.search_wheres(likes=_name_likes, **kwargs) ).group_by(cls.id) _paging = None if is_paging: _row_count = await cls.query_count(_query) _paging = Pagination(_row_count).paging(page_number, page_size) _data_query = _query.limit(page_size).offset(_paging.offset_size) else: _data_query = _query.where() _sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {})) if _sort_clause: _data_query = _data_query.order_by(*_sort_clause) else: _data_query = _data_query.order_by(cls.task_num, cls.rec_disp_num) _task_df = await cls.query_as_df(_data_query) if not _task_df.empty: _task_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True) _task_df[cls.id.key] = _task_df[cls.id.key].astype(str) return _task_df, _paging @classmethod async def search(cls, **kwargs): """ 按参数搜索任务数据,返回分页格式数据。 """ _task_df, _paging = await cls.search_base(**kwargs) return { 'total': _paging.row_count, 'rows': _task_df.to_dict('records'), 'pagination': { 'page_number': _paging.page_number, 'page_count': _paging.page_count, 'page_size': _paging.page_size, }, } @classmethod async def exists_rec_id(cls, data_df: pd.DataFrame): """ 查找 data_df 中在数据库中已存在和不存在的记录。根据 rec_id 字段判断。 :param data_df: 输入的数据框架,必须包含 rec_id 列 :return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame) - exists_df: 在数据库中存在的记录 - latest_df: 在数据库中不存在的记录 """ if data_df.empty: return pd.DataFrame(), pd.DataFrame() # 获取待查询的 rec_id 列表(去重) rec_ids = data_df[cls.rec_id.key].unique().tolist() if not rec_ids: return pd.DataFrame(), data_df.copy() # 查询数据库中已存在的 rec_id _query = select(cls.id, cls.rec_id).where(cls.rec_id.in_(rec_ids)) rec_ids_df = await cls.query_as_df(_query) if rec_ids_df.empty: return pd.DataFrame(), data_df.copy() # 构建 rec_id -> id 的映射字典 rec_id_to_id_map = dict(zip(rec_ids_df[cls.rec_id.key], rec_ids_df[cls.id.key])) # 根据 rec_id 是否在数据库中,划分数据 mask_exists = data_df[cls.rec_id.key].isin(rec_ids_df[cls.rec_id.key]) # 数据库已经有的记录 exists_df = data_df[mask_exists].copy() # 自动补充从数据库查到的 id 字段 exists_df[cls.id.key] = exists_df[cls.rec_id.key].map(rec_id_to_id_map) # 新的数据 latest_df = data_df[~mask_exists].copy() return exists_df, latest_df @register_swagger_model class DcmTask(DcmTaskBase): """ 部门待办任务类(主业务类,完全继承 TD3iDcmTask 字段)。 --- description: 数字城管-部门待办任务 type: object properties: id: description: 主键ID type: integer example: 1001 readOnly: true rec_id: description: 记录ID type: integer example: 2001 rec_disp_num: description: 显示编号 type: string example: "D20240501001" maxLength: 50 rec_type_id: description: 类型ID type: integer example: 101 rec_type_name: description: 案件类型 type: string example: "市容环境" maxLength: 100 act_id: description: 任务ID type: integer example: 3001 act_deadline_time: description: 任务截止时间戳(毫秒) type: integer example: 1714567890000 act_warning_time: description: 预警时间戳(毫秒) type: integer example: 1714560000000 act_property_id: description: 任务属性ID type: integer example: 5 act_ard_state_name: description: 阶段授权状态 type: string example: "已授权" maxLength: 50 act_time_state_id: description: 阶段状态ID type: integer example: 1 biz_id: description: 业务ID type: integer example: 10 sys_id: description: 系统ID type: integer example: 1 task_num: description: 任务号 type: string example: "TASK20240501001" maxLength: 50 other_task_num: description: 第三方任务号 type: string example: "THIRD-2024-001" maxLength: 100 bundle_remain_char: description: 剩余时间描述 type: string example: "3天" maxLength: 20 bundle_deadline_time: description: 捆绑截止时间戳 type: integer example: 1714578000000 bundle_deadline_char: description: 捆绑截止时间描述 type: string example: "3天" maxLength: 20 bundle_warning_time: description: 捆绑预警时间戳 type: integer example: 1714570000000 bundle_time_state_id: description: 捆绑阶段红绿灯状态 type: integer example: 0 rollback_deadline: description: 拒绝超时截止时间戳 type: integer example: 1714580000000 event_type_id: description: 问题类型ID type: integer example: 1001 max_event_type_id: description: 最大事件类型ID type: integer example: 1002 event_type_name: description: 问题类型 type: string example: "道路破损" maxLength: 100 event_src_name: description: 问题来源 type: string example: "市民举报" maxLength: 100 event_desc: description: 问题描述 type: string example: "中山路与解放路交叉口路面大面积破损" maxLength: 65535 urgency_level: description: 紧急程度(0正常,1紧急) type: integer example: 1 main_type_id: description: 大类ID type: integer example: 101 main_type_name: description: 大类名称 type: string example: "市容环境" maxLength: 100 sub_type_id: description: 小类ID type: integer example: 10101 sub_type_name: description: 小类名称 type: string example: "道路破损" maxLength: 100 address: description: 地址描述 type: string example: "中山路与解放路交叉口" maxLength: 65535 district_name: description: 所属区域 type: string example: "鼓楼区" maxLength: 50 coordinate_x: description: 经度 type: number format: decimal example: 118.789012 coordinate_y: description: 纬度 type: number format: decimal example: 32.045678 proc_time_state_id: description: 处理流程状态ID type: integer example: 2 deadline_time: description: 处理截止时间戳 type: integer example: 1714578000000 warning_time: description: 处理预警时间戳 type: integer example: 1714570000000 processing_deadline: description: 处置时限描述 type: string example: "24小时" maxLength: 50 new_inst_cond_name: description: 立案条件 type: string example: "破损面积大于0.5㎡" maxLength: 200 case_closure_condition: description: 结案条件 type: string example: "修复完成并验收" maxLength: 200 reply_intime: description: 是否两小时回复(0无需回复,1待回复,2已回复,3超时,4无需回复已恢复) type: integer example: 2 return_visit_flag: description: 回访标识(0无需,1待回访,2已回访) type: integer example: 1 first_depart_name: description: 一级专业部门 type: string example: "市政工程处" maxLength: 100 second_depart_name: description: 二级专业部门 type: string example: "道路养护科" maxLength: 100 reporter_name: description: 举报人姓名 type: string example: "张三" maxLength: 100 reporter_contact: description: 举报电话 type: string example: "13800138000" maxLength: 50 read_flag: description: 是否已读(0未读,1已读) type: integer example: 1 back_color_bit_id: description: 背景色ID type: integer example: 10 font_color_bit_id: description: 字体色ID type: integer example: 20 part_code: description: 部件编码 type: string example: "P0012345" maxLength: 100 display_style_id: description: 显示样式ID type: integer example: 5 func_forbid_reporter_info_flag: description: 是否禁止举报人信息 type: integer example: 0 operation: description: 操作(工单上的操作按钮) type: string example: "批转" created_at: description: 创建时间,ISO格式的日期时间字符串 type: string format: date-time example: "2024-01-15 10:30:00" readOnly: true created_by: description: 创建者用户名 type: string example: "admin" readOnly: true updated_at: description: 修改时间,ISO格式的日期时间字符串 type: string format: date-time example: "2024-01-16 14:25:00" readOnly: true updated_by: description: 修改者用户名 type: string example: "editor" readOnly: true """ @classmethod async def create(cls, user: RbacUser = None, **kwargs): """ 创建新的任务。 业务流程: 1. 使用 DcmDepartmentTaskForm 验证表单数据完整性 2. 检查任务是否已存在(根据 task_num 或 rec_disp_num) 3. 创建新任务对象 4. 设置创建者和更新者为当前用户 5. 保存到数据库 6. 返回创建的任务对象 :param RbacUser user: 操作用户对象 :param kwargs: 任务参数字典 :return: 新建任务对象 :rtype: DcmTask :raises AssertionError: 当任务已存在时抛出 :raises ValidationError: 当表单验证失败时抛出 """ # 处理字符串字段去除空格 for _k, _v in kwargs.items(): if isinstance(_v, str): kwargs[_k] = _v.strip() _task_form = DcmTaskForm(formdata=kwargs) _task_form.validate_form() # 检查是否存在同任务号或同显示编号的任务 _task: cls = await cls.is_exist(_task_form.rec_id.data) assert _task is None, "任务号或显示编号已存在,不能重复创建。" # 创建任务对象 _task = cls().copy_from_dict(_task_form.data, skip_none=True).before_save() if user: _task.created_by = user.username _task.updated_by = user.username await _task.async_save() return _task @classmethod async def delete(cls, task_id: Union[str, int]): """ 删除任务。 注意:任务删除需根据业务规则判断是否允许(如是否已处理、是否有附件等)。 业务流程: 1. 根据ID查找任务 2. 验证任务存在性 3. 执行删除操作 :param task_id: 要删除的任务ID :return: 删除的任务对象 :rtype: DcmTask :raises AssertionError: 当任务不存在时抛出 """ _task: cls = await cls.async_find_by_id(task_id) assert _task, f"根据 ID {task_id} 未找到任务。" # 执行删除 _del_query = delete(cls).where(cls.id == _task.id) _del_count = (await cls.raw_execute(_del_query)).rowcount echo_log(f'已删除任务(任务号:{_task.task_num},ID:{_task.id}).') return _task @classmethod async def modify(cls, task_id: Union[str, int], user: RbacUser=None, **kwargs): """ 修改已有任务信息。 注意:修改任务号或显示编号时需检查是否与其他任务重复。 业务流程: 1. 将 task_id 添加到参数中 2. 处理字符串字段去除首尾空格 3. 使用 DcmDepartmentTaskForm 验证表单数据 4. 检查是否有其他任务使用了相同的 task_num 或 rec_disp_num 5. 查询原任务对象 6. 验证任务存在性 7. 更新字段并设置更新者 8. 保存到数据库 9. 返回更新后的任务对象 :param task_id: 要修改的任务ID :param RbacUser user: 操作用户对象 :param kwargs: 需要更新的字段 :return: 修改后的任务对象 :rtype: DcmTask :raises AssertionError: 当任务不存在或信息重复时抛出 :raises ValidationError: 当表单验证失败时抛出 """ # 处理字符串字段去除空格 for _k, _v in kwargs.items(): if isinstance(_v, str): kwargs[_k] = _v.strip() # 表单验证 _task_form = DcmTaskForm(formdata=kwargs) _task_form.validate_form() # 检查是否与其他任务重复(排除自身) _other = await cls.exist_other(task_id, _task_form.rec_id.data) assert _other is None, "待办任务号或显示编号已存在,不能重复修改。" # 查询原任务 _task: cls = await cls.async_find_by_id(task_id) assert _task, f'查无此待办信息。' # 更新字段 _task.copy_from_dict(_task_form.data, skip_none=True).before_save() _task.updated_by = user.username await _task.async_save() return _task @classmethod async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None): """ 批量创建新任务(传入数据应为全新记录,无需校验是否存在)。 :param data_df: 包含任务数据的 DataFrame,字段需与模型属性匹配(如 rec_id, task_num 等) :param user: 操作用户对象,用于设置 created_by / updated_by :return: 成功创建的任务数量 :rtype: int """ if data_df.empty: return 0 # 向量化设置用户字段(无循环) if user: data_df['created_by'] = user.username data_df['updated_by'] = user.username # 一次性转为字典列表(C 层高效) records = data_df.to_dict('records') # 用列表推导式构造对象 tasks = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records] # 批量插入 session = cls.get_aio_session() try: session.add_all(tasks) await session.commit() except Exception as e: await session.rollback() raise e finally: await session.close() echo_log(f"批量创建成功:创建 {len(tasks)} 条新待办。") return len(tasks) @classmethod async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None): """ 批量修改已有任务。 :param data_df: 包含任务数据的 DataFrame :param user: 操作用户对象,用于设置 updated_by :return: 成功更新的任务数量 :rtype: int """ if data_df.empty: return 0 # 必须包含 id 列 if 'id' not in data_df.columns: echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}' 列(主键)") return 0 # 手动添加更新时间戳 data_df['updated_at'] = datetime.datetime.now() # 添加更新者信息 if user: data_df['updated_by'] = user.username # 转换为字典列表 update_data = data_df.to_dict('records') # 使用 bulk_update_mappings session = cls.get_aio_session() try: await session.run_sync( lambda sync_session: sync_session.bulk_update_mappings(cls, update_data) ) await session.commit() updated_count = len(update_data) except Exception as e: await session.rollback() raise e finally: await session.close() echo_log(f"批量修改成功:更新 {updated_count} 条待办。") return updated_count @classmethod async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None): """ 批量保存数据,自动处理新建和更新。 :param data_df: 要保存的数据框架 :param user: 用户 :return: 新建和更新的数量 """ # 筛选数据状态 _exists_df, _latest_df = await DcmTask.exists_rec_id(data_df) # 保存到数据库 _created_count = await DcmTask.create_batch(_latest_df, user) _updated_count = await DcmTask.modify_batch(_exists_df, user) return _created_count, _updated_count