Files
d3i-szct/models/govs_create_reply.py
T
2026-06-02 17:46:38 +08:00

349 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
import datetime
import random
from typing import Union
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, TextAreaField, IntegerField, DateTimeField
from wtforms.validators import Length
import models
from models.db_models import TD3iGovsReplyFormal
from models.common_model import CommonModel
from paste.core.logging import echo_log
from paste.rbac.rbac_user import RbacUser
from paste.util.pagination import Pagination
from paste.web.form import ModelForm
class GovsReplyFormalForm(ModelForm):
"""答复办结表单验证类"""
id = IntegerField('主键')
master_id = IntegerField('主表ID')
master_id = StringField('代签收唯一标志', validators=[Length(max=64)])
is_contact = StringField('是否联系服务对象', validators=[Length(max=10)])
contact_name = StringField('联系人员', validators=[Length(max=100)])
contact_time = StringField('联系时间', validators=[Length(max=64)])
contact_type = StringField('联系情况', validators=[Length(max=255)])
advice = TextAreaField('处理意见(面向群众公开)')
reason = TextAreaField('处理意见(面向群众公开2')
remarks = StringField('备注', validators=[Length(max=500)])
file_id_str = TextAreaField('OA文件id')
save_id = StringField('提交数据ID', validators=[Length(max=64)])
process_instance_id = StringField('流程实例ID', validators=[Length(max=64)])
business_key = StringField('业务键', validators=[Length(max=64)])
order_no = StringField('工单号', validators=[Length(max=64)])
action_name = StringField('操作名称', validators=[Length(max=255)])
case_accord_type_one_name = StringField('诉求归口一级名称', validators=[Length(max=255)])
case_accord_type_two_name = StringField('诉求归口二级名称', validators=[Length(max=255)])
case_accord_type_three_name = StringField('诉求归口三级名称', validators=[Length(max=255)])
save_status = IntegerField('提交状态')
oa_feedback_status = IntegerField('OA反馈状态')
flow_token = StringField('流令牌', validators=[Length(max=256)])
created_at = DateTimeField('创建时间')
created_by = StringField('创建者', validators=[Length(max=64)])
updated_at = DateTimeField('更新时间')
updated_by = StringField('更新者', validators=[Length(max=64)])
def process(self, formdata=None, obj=None, **kwargs):
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj=None, **kwargs)
class GovsReplyFormalBase(TD3iGovsReplyFormal, CommonModel):
"""答复办结业务基类"""
FieldMapping = {
# 主键 & 关联ID
'id': 'id',
'master_id': 'masterId',
'gd_id': 'gdId',
'order_no': 'orderNo',
'process_instance_id': 'processInstanceId',
'business_key': 'businessKey',
'save_id': 'saveId',
'action_name': 'actionName',
# 联系服务对象信息
'is_contact': 'isContact',
'contact_name': 'contactName',
'contact_time': 'contactTime',
'contact_type': 'contactType',
# 处理意见 & 备注
'advice': 'advice',
'reason': 'reason',
'remarks': 'remarks',
'file_id_str': 'fileIdStr',
# 诉求归口分类
'case_accord_type_one_name': 'caseAccordTypeOneName',
'case_accord_type_two_name': 'caseAccordTypeTwoName',
'case_accord_type_three_name': 'caseAccordTypeThreeName',
# 流令牌
'flow_token': 'flowToken',
}
@classmethod
async def exist_other(cls, id: Union[str, int], master_id: Union[str, int] = None, order_no: str = None,
business_key: str = None):
"""检查是否存在除当前记录外的同唯一标识答复办结记录"""
_query = select(cls).where(cls.id != id)
if master_id:
_query = _query.where(cls.master_id == master_id)
if order_no:
_query = _query.where(cls.order_no == order_no)
if business_key:
_query = _query.where(cls.business_key == business_key)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def find_by_ids(cls, ids: list[Union[str, int]]):
"""根据ID列表批量查询答复办结记录"""
_query = select(cls).where(cls.id.in_(ids))
_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _list
@classmethod
async def is_exist(cls, master_id: Union[str, int] = None, order_no: str = None, business_key: str = None):
"""检查答复办结记录是否已存在"""
_query = select(cls)
if master_id:
_query = _query.where(cls.master_id == master_id)
if order_no:
_query = _query.where(cls.order_no == order_no)
if business_key:
_query = _query.where(cls.business_key == business_key)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""答复办结基础搜索(分页/不分页)"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
# 模糊查询字段配置
_name_likes = {
cls.master_id.key: '%{}%',
cls.order_no.key: '%{}%',
cls.master_id.key: '%{}%',
cls.contact_name.key: '%{}%',
}
_query = select(cls).where(
*cls.search_wheres(likes=_name_likes, **kwargs)
).group_by(cls.id)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
# 排序处理
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.created_at.desc())
_df = await cls.query_as_df(_data_query)
if not _df.empty:
_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
_df[cls.id.key] = _df[cls.id.key].astype(str)
return _df, _paging
@classmethod
async def search(cls, **kwargs):
"""分页搜索答复办结记录,返回标准分页结构"""
_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count,
'rows': _df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number,
'page_count': _paging.page_count,
'page_size': _paging.page_size,
},
}
@classmethod
async def exists_master_id(cls, data_df: pd.DataFrame):
"""根据 master_id 区分已有数据/新增数据(批量保存用)"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
master_ids = data_df[cls.master_id.key].unique().tolist()
if not master_ids:
return pd.DataFrame(), data_df.copy()
_query = select(cls.id, cls.master_id).where(cls.master_id.in_(master_ids))
existing_df = await cls.query_as_df(_query)
if existing_df.empty:
return pd.DataFrame(), data_df.copy()
master_id_to_id_map = dict(zip(existing_df[cls.master_id.key], existing_df[cls.id.key]))
mask_exists = data_df[cls.master_id.key].isin(existing_df[cls.master_id.key])
exists_df = data_df[mask_exists].copy()
exists_df[cls.id.key] = exists_df[cls.master_id.key].map(master_id_to_id_map)
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@classmethod
async def find_by_master_id(cls, master_id: Union[str, int]):
"""根据主表ID查询单条答复办结记录"""
_query = select(cls).where(cls.master_id == master_id)
return await cls.query_first(_query)
@classmethod
async def find_by_business_key(cls, business_key: str):
"""根据业务键查询答复办结记录"""
_query = select(cls).where(cls.business_key == business_key)
return await cls.query_first(_query)
@classmethod
async def find_by_master_ids(cls, master_ids: list[Union[str, int]]):
"""根据主表ID列表批量查询答复办结记录"""
_query = select(cls).where(cls.master_id.in_(master_ids))
_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _list
@register_swagger_model
class GovsReplyFormal(GovsReplyFormalBase):
"""答复办结业务操作类"""
@classmethod
async def create(cls, user: RbacUser = None, **kwargs):
"""新增答复办结记录"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsReplyFormalForm(formdata=kwargs)
_form.validate_form()
_existing = await cls.is_exist(
master_id=_form.master_id.data,
order_no=_form.order_no.data,
business_key=_form.business_key.data
)
assert _existing is None, "该任务已存在答复办结记录,无法重复创建。"
_reply_info = cls().copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_reply_info.created_by = user.username
_reply_info.updated_by = user.username
await _reply_info.async_save()
return _reply_info
@classmethod
async def delete(cls, reply_id: Union[str, int]):
"""删除答复办结记录"""
_reply_info: cls = await cls.async_find_by_id(reply_id)
assert _reply_info, f"根据 ID {reply_id} 未找到答复办结记录。"
_del_query = delete(cls).where(cls.id == _reply_info.id)
await cls.raw_execute(_del_query)
echo_log(f'已删除答复办结记录(工单ID{_reply_info.master_id}ID{_reply_info.id}.')
return _reply_info
@classmethod
async def modify(cls, reply_id: Union[str, int], user: RbacUser = None, **kwargs):
"""修改答复办结记录"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsReplyFormalForm(formdata=kwargs)
_form.validate_form()
_reply_info: cls = await cls.async_find_by_id(reply_id)
assert _reply_info, f'查无此答复办结记录。'
_reply_info.copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_reply_info.updated_by = user.username
await _reply_info.async_save()
return _reply_info
@classmethod
async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量新增答复办结记录"""
if data_df.empty:
return 0
if user:
data_df['created_by'] = user.username
data_df['updated_by'] = user.username
records = data_df.to_dict('records')
reply_list = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records]
session = cls.get_aio_session()
try:
session.add_all(reply_list)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(reply_list)} 条答复办结记录。")
return len(reply_list)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量修改答复办结记录"""
if data_df.empty:
return 0
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}'")
return 0
data_df['updated_at'] = datetime.datetime.now()
if user:
data_df['updated_by'] = user.username
update_data = data_df.to_dict('records')
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条答复办结记录。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量保存(自动区分新增/更新)"""
_exists_df, _latest_df = await cls.exists_master_id(data_df)
_created_count = await cls.create_batch(_latest_df, user)
_updated_count = await cls.modify_batch(_exists_df, user)
return _created_count, _updated_count