# coding: utf-8 import datetime import random from typing import Union import pandas as pd from sqlalchemy import select, delete from tornado_swagger.model import register_swagger_model from wtforms import StringField, TextAreaField, IntegerField, DateTimeField from wtforms.validators import Length import models from models.db_models import TD3iGovsPhaseWiseCompletion from models.common_model import CommonModel from paste.core.logging import echo_log from paste.rbac.rbac_user import RbacUser from paste.util.pagination import Pagination from paste.web.form import ModelForm class GovsPhaseWiseCompletionForm(ModelForm): """阶段性办结表单验证类""" id = IntegerField('主键') master_id = IntegerField('主表ID') master_id = StringField('代签收唯一标志', validators=[Length(max=64)]) is_contact = StringField('联系诉求人情况', validators=[Length(max=10)]) contact_name = StringField('联系人员', validators=[Length(max=100)]) contact_time = StringField('联系时间', validators=[Length(max=64)]) contact_type = StringField('联系情况', validators=[Length(max=255)]) next_feedback_time = StringField('下一次反馈时间', validators=[Length(max=64)]) advice = TextAreaField('处理意见') reason = TextAreaField('处理意见1') remark = StringField('备注', validators=[Length(max=500)]) file_id_str = TextAreaField('OA文件id') action_name = StringField('操作名称', validators=[Length(max=255)]) case_accord_type_one_name = StringField('诉求归口一级名称', validators=[Length(max=255)]) case_accord_type_two_name = StringField('诉求归口二级名称', validators=[Length(max=255)]) case_accord_type_three_name = StringField('诉求归口三级名称', validators=[Length(max=255)]) order_id = StringField('工单ID', validators=[Length(max=64)]) task_id = StringField('任务ID', validators=[Length(max=64)]) save_status = IntegerField('提交状态') oa_feedback_status = IntegerField('OA反馈状态') flow_token = StringField('流令牌', validators=[Length(max=256)]) created_at = DateTimeField('创建时间') created_by = StringField('创建者', validators=[Length(max=64)]) updated_at = DateTimeField('更新时间') updated_by = StringField('更新者', validators=[Length(max=64)]) def process(self, formdata=None, obj=None, **kwargs): if formdata: for name, values in formdata.items(): if isinstance(values, list) and values: formdata[name] = [v.strip() if isinstance(v, str) else v for v in values] elif isinstance(values, str): formdata[name] = values.strip() super().process(formdata, obj=None, **kwargs) class GovsPhaseWiseCompletionBase(TD3iGovsPhaseWiseCompletion, CommonModel): """阶段性办结业务基类""" FieldMapping = { # 主键 & 关联ID 'id': 'id', 'master_id': 'masterId', 'gd_id': 'gdId', 'order_id': 'orderId', 'task_id': 'taskId', 'action_name': 'actionName', # 联系诉求人信息 'is_contact': 'isContact', 'contact_name': 'contactName', 'contact_time': 'contactTime', 'contact_type': 'contactType', 'next_feedback_time': 'nextFeedbackTime', # 处理意见 & 备注 'advice': 'advice', 'reason': 'reason', 'remark': 'remark', 'file_id_str': 'fileIdStr', # 诉求归口分类 'case_accord_type_one_name': 'caseAccordTypeOneName', 'case_accord_type_two_name': 'caseAccordTypeTwoName', 'case_accord_type_three_name': 'caseAccordTypeThreeName', # 令牌 'flow_token': 'flowToken' } @classmethod async def exist_other(cls, id: Union[str, int], master_id: Union[str, int] = None, order_id: str = None): """检查是否存在除当前记录外的同唯一标识阶段性办结记录""" _query = select(cls).where(cls.id != id) if master_id: _query = _query.where(cls.master_id == master_id) if order_id: _query = _query.where(cls.order_id == order_id) _task: cls = await cls.query_first(_query) return _task @classmethod async def find_by_ids(cls, ids: list[Union[str, int]]): """根据ID列表批量查询阶段性办结记录""" _query = select(cls).where(cls.id.in_(ids)) _list: list[cls] = (await cls.orm_execute_scalars(_query)).all() return _list @classmethod async def is_exist(cls, master_id: Union[str, int] = None, order_id: str = None): """检查阶段性办结记录是否已存在""" _query = select(cls) if master_id: _query = _query.where(cls.master_id == master_id) if order_id: _query = _query.where(cls.order_id == order_id) _task: cls = await cls.query_first(_query) return _task @classmethod async def search_base(cls, is_paging=True, **kwargs): """阶段性办结基础搜索(分页/不分页)""" page_number = kwargs.get('page_number', random.randint(1, 100)) page_size = kwargs.get('page_size', 20) kwargs.update({'page_number': page_number, 'page_size': page_size}) # 模糊查询字段配置 _name_likes = { cls.master_id.key: '%{}%', cls.order_id.key: '%{}%', cls.master_id.key: '%{}%', cls.contact_name.key: '%{}%', } _query = select(cls).where( *cls.search_wheres(likes=_name_likes, **kwargs) ).group_by(cls.id) _paging = None if is_paging: _row_count = await cls.query_count(_query) _paging = Pagination(_row_count).paging(page_number, page_size) _data_query = _query.limit(page_size).offset(_paging.offset_size) else: _data_query = _query.where() # 排序处理 _sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {})) if _sort_clause: _data_query = _data_query.order_by(*_sort_clause) else: _data_query = _data_query.order_by(cls.created_at.desc()) _df = await cls.query_as_df(_data_query) if not _df.empty: _df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True) _df[cls.id.key] = _df[cls.id.key].astype(str) return _df, _paging @classmethod async def search(cls, **kwargs): """分页搜索阶段性办结记录,返回标准分页结构""" _df, _paging = await cls.search_base(**kwargs) return { 'total': _paging.row_count, 'rows': _df.to_dict('records'), 'pagination': { 'page_number': _paging.page_number, 'page_count': _paging.page_count, 'page_size': _paging.page_size, }, } @classmethod async def exists_master_id(cls, data_df: pd.DataFrame): """根据 master_id 区分已有数据/新增数据(批量保存用)""" if data_df.empty: return pd.DataFrame(), pd.DataFrame() master_ids = data_df[cls.master_id.key].unique().tolist() if not master_ids: return pd.DataFrame(), data_df.copy() _query = select(cls.id, cls.master_id).where(cls.master_id.in_(master_ids)) existing_df = await cls.query_as_df(_query) if existing_df.empty: return pd.DataFrame(), data_df.copy() master_id_to_id_map = dict(zip(existing_df[cls.master_id.key], existing_df[cls.id.key])) mask_exists = data_df[cls.master_id.key].isin(existing_df[cls.master_id.key]) exists_df = data_df[mask_exists].copy() exists_df[cls.id.key] = exists_df[cls.master_id.key].map(master_id_to_id_map) latest_df = data_df[~mask_exists].copy() return exists_df, latest_df @classmethod async def find_by_master_id(cls, master_id: Union[str, int]): """根据主表ID查询单条阶段性办结记录""" _query = select(cls).where(cls.master_id == master_id) return await cls.query_first(_query) @classmethod async def find_by_order_id(cls, order_id: str): """根据工单ID查询阶段性办结记录""" _query = select(cls).where(cls.order_id == order_id) return await cls.query_first(_query) @classmethod async def find_by_master_ids(cls, master_ids: list[Union[str, int]]): """根据主表ID列表批量查询阶段性办结记录""" _query = select(cls).where(cls.master_id.in_(master_ids)) _list: list[cls] = (await cls.orm_execute_scalars(_query)).all() return _list @register_swagger_model class GovsPhaseWiseCompletion(GovsPhaseWiseCompletionBase): """阶段性办结业务操作类""" @classmethod async def create(cls, user: RbacUser = None, **kwargs): """新增阶段性办结记录""" for _k, _v in kwargs.items(): if isinstance(_v, str): kwargs[_k] = _v.strip() _form = GovsPhaseWiseCompletionForm(formdata=kwargs) _form.validate_form() _existing = await cls.is_exist( master_id=_form.master_id.data, order_id=_form.order_id.data ) assert _existing is None, "该任务已存在阶段性办结记录,无法重复创建。" _phase_info = cls().copy_from_dict(_form.data, skip_none=True).before_save() if user: _phase_info.created_by = user.username _phase_info.updated_by = user.username await _phase_info.async_save() return _phase_info @classmethod async def delete(cls, phase_id: Union[str, int]): """删除阶段性办结记录""" _phase_info: cls = await cls.async_find_by_id(phase_id) assert _phase_info, f"根据 ID {phase_id} 未找到阶段性办结记录。" _del_query = delete(cls).where(cls.id == _phase_info.id) await cls.raw_execute(_del_query) echo_log(f'已删除阶段性办结记录(工单ID:{_phase_info.master_id},ID:{_phase_info.id}).') return _phase_info @classmethod async def modify(cls, phase_id: Union[str, int], user: RbacUser = None, **kwargs): """修改阶段性办结记录""" for _k, _v in kwargs.items(): if isinstance(_v, str): kwargs[_k] = _v.strip() _form = GovsPhaseWiseCompletionForm(formdata=kwargs) _form.validate_form() _phase_info: cls = await cls.async_find_by_id(phase_id) assert _phase_info, f'查无此阶段性办结记录。' _phase_info.copy_from_dict(_form.data, skip_none=True).before_save() if user: _phase_info.updated_by = user.username await _phase_info.async_save() return _phase_info @classmethod async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None): """批量新增阶段性办结记录""" if data_df.empty: return 0 if user: data_df['created_by'] = user.username data_df['updated_by'] = user.username records = data_df.to_dict('records') phase_list = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records] session = cls.get_aio_session() try: session.add_all(phase_list) await session.commit() except Exception as e: await session.rollback() raise e finally: await session.close() echo_log(f"批量创建成功:创建 {len(phase_list)} 条阶段性办结记录。") return len(phase_list) @classmethod async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None): """批量修改阶段性办结记录""" if data_df.empty: return 0 if 'id' not in data_df.columns: echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}' 列") return 0 data_df['updated_at'] = datetime.datetime.now() if user: data_df['updated_by'] = user.username update_data = data_df.to_dict('records') session = cls.get_aio_session() try: await session.run_sync( lambda sync_session: sync_session.bulk_update_mappings(cls, update_data) ) await session.commit() updated_count = len(update_data) except Exception as e: await session.rollback() raise e finally: await session.close() echo_log(f"批量修改成功:更新 {updated_count} 条阶段性办结记录。") return updated_count @classmethod async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None): """批量保存(自动区分新增/更新)""" _exists_df, _latest_df = await cls.exists_master_id(data_df) _created_count = await cls.create_batch(_latest_df, user) _updated_count = await cls.modify_batch(_exists_df, user) return _created_count, _updated_count