Files
d3i-szct/models/dcm_apply_rollback.py
2026-06-02 17:46:38 +08:00

321 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
from typing import Union
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, TextAreaField, IntegerField
from wtforms.validators import Length
from models.common_model import CommonModel
from models.db_models import TD3iDcmApplyRollback
from paste.core.logging import echo_log
from paste.rbac.rbac_user import RbacUser
from paste.web.form import ModelForm
class DcmApplyRollbackForm(ModelForm):
"""
申请回退任务表单验证类(完全映射 TD3iDcmApplyRollback 字段)。
用于验证和处理数字城管-申请回退操作的创建/修改表单数据。
字段完全映射数据库表 t_d3i_dcm_apply_rollback 的字段结构。
"""
# 基础信息
id = IntegerField('记录ID')
task_number = StringField('任务号', validators=[Length(max=64, message='任务号长度不能超过64字符')])
act_id = StringField('工单ID', validators=[Length(max=64, message='工单ID长度不能超过64字符')])
reply_part_id = IntegerField('回复部门ID')
ard_level = IntegerField('回退流向')
ard_type_id = IntegerField('延期类型ID')
opinion = TextAreaField('申请说明')
apply_type = StringField('申请类型(拒签、处置阶段照片未公开)',
validators=[Length(max=64, message='申请类型长度不能超过64字符')])
trans_info = StringField('流转信息', validators=[Length(max=255, message='流转信息长度不能超过255字符')])
attachments = TextAreaField('附件', validators=[Length(max=65535, message='附件长度不能超过65535字符')])
status = IntegerField('提交状态')
def process(self, formdata=None, obj=None, **kwargs):
"""
处理表单数据,在数据绑定前进行预处理。
主要功能:
- 遍历所有表单字段
- 对字符串类型的值去除两端空白字符
- 调用父类的process方法继续处理
"""
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class DcmApplyRollbackBase(TD3iDcmApplyRollback, CommonModel):
"""
申请回退任务基础类(完全映射 TD3iDcmApplyRollback 字段)。
继承自数据库模型 TD3iDcmApplyRollback 和通用模型 CommonModel。
封装所有与回退操作相关的通用操作方法。
"""
@classmethod
async def is_exist(cls, act_id: str):
"""
检查申请回退记录是否已存在(根据任务ID)。
:param act_id: 任务ID
:return: 存在返回对象,不存在返回None
"""
_query = select(cls).where(cls.act_id == act_id)
_rollback: cls = await cls.query_first(_query)
return _rollback
@classmethod
async def exists_act_id(cls, data_df: pd.DataFrame):
"""
查找 data_df 中在数据库中已存在和不存在的记录。根据 act_id 字段判断。
:param data_df: 输入的数据框架,必须包含 act_id 列
:return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame)
- exists_df: 在数据库中存在的记录
- latest_df: 在数据库中不存在的记录
"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
# 获取待查询的 act_id 列表(去重)
act_ids = data_df[cls.act_id.key].unique().tolist()
if not act_ids:
return pd.DataFrame(), data_df.copy()
# 查询数据库中已存在的 act_id
_query = select(cls.id, cls.act_id).where(cls.act_id.in_(act_ids))
act_ids_df = await cls.query_as_df(_query)
if act_ids_df.empty:
return pd.DataFrame(), data_df.copy()
# 构建 act_id -> id 的映射字典
act_id_to_id_map = dict(zip(act_ids_df[cls.act_id.key], act_ids_df[cls.id.key]))
# 根据 act_id 是否在数据库中,划分数据
mask_exists = data_df[cls.act_id.key].isin(act_ids_df[cls.act_id.key])
# 数据库已经有的记录
exists_df = data_df[mask_exists].copy()
# 自动补充从数据库查到的 id 字段
exists_df[cls.id.key] = exists_df[cls.act_id.key].map(act_id_to_id_map)
# 新的数据
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@register_swagger_model
class DcmApplyRollback(DcmApplyRollbackBase):
"""
申请回退任务模型类(主业务类,完全继承 TD3iDcmApplyRollback 字段)。
---
description: 数字城管-申请回退接口
"""
@classmethod
async def create(cls, user: RbacUser = None, **kwargs):
"""
创建新的申请回退记录。
业务流程:
1. 使用 DcmApplyRollbackForm 验证表单数据完整性
2. 检查是否已存在相同 act_id 的记录(避免重复提交)
3. 创建新申请回退对象
4. 设置创建者和更新者为当前用户
5. 保存到数据库
6. 返回创建的对象
:param RbacUser user: 操作用户对象
:param kwargs: 回退参数字典
:return: 新建申请回退对象
:rtype: DcmApplyRollback
:raises AssertionError: 当记录已存在时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = DcmApplyRollbackForm(formdata=kwargs)
_form.validate_form()
# 检查是否已存在相同 act_id 的记录
_existing = await cls.is_exist(_form.act_id.data)
assert _existing is None, "该任务已存在申请回退记录,不能重复提交。"
# 创建对象
_rollback = cls().copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_rollback.created_by = user.username
_rollback.updated_by = user.username
await _rollback.async_save()
return _rollback
@classmethod
async def delete(cls, rollback_id: Union[str, int]):
"""
删除申请回退记录。
业务流程:
1. 根据ID查找记录
2. 验证存在性
3. 执行删除
:param rollback_id: 要删除的申请回退记录ID
:return: 删除的记录对象
:rtype: DcmApplyRollback
:raises AssertionError: 当记录不存在时抛出
"""
_rollback: cls = await cls.async_find_by_id(rollback_id)
assert _rollback, f"根据 ID {rollback_id} 未找到申请回退记录。"
_del_query = delete(cls).where(cls.id == _rollback.id)
_del_count = (await cls.raw_execute(_del_query)).rowcount
echo_log(f'已删除回退记录(任务号:{_rollback.task_number}ID{_rollback.id}.')
return _rollback
@classmethod
async def modify(cls, rollback_id: Union[str, int], user: RbacUser = None, **kwargs):
"""
修改已有申请回退记录。
业务流程:
1. 将 rollback_id 添加到参数中
2. 处理字符串字段去除首尾空格
3. 使用 DcmApplyRollbackForm 验证表单数据
4. 查询原记录
5. 验证存在性
6. 更新字段并设置更新者
7. 保存到数据库
8. 返回更新后的对象
:param rollback_id: 要修改的申请回退记录ID
:param RbacUser user: 操作用户对象
:param kwargs: 需要更新的字段
:return: 修改后的申请回退对象
:rtype: DcmApplyRollback
:raises AssertionError: 当记录不存在时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
# 表单验证
_form = DcmApplyRollbackForm(formdata=kwargs)
_form.validate_form()
# 查询原记录
_rollback: cls = await cls.async_find_by_id(rollback_id)
assert _rollback, f'查无此申请回退信息。'
# 更新字段
_rollback.copy_from_dict(_form.data, skip_none=True).before_save()
_rollback.updated_by = user.username
await _rollback.async_save()
return _rollback
@classmethod
async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""
批量创建申请回退记录(传入数据应为全新记录)。
:param data_df: 包含回退数据的 DataFrame
:param user: 操作用户对象,用于设置 created_by / updated_by
:return: 成功创建的数量
:rtype: int
"""
if data_df.empty:
return 0
if user:
data_df['created_by'] = user.username
data_df['updated_by'] = user.username
records = data_df.to_dict('records')
rollbacks = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records]
session = cls.get_aio_session()
try:
session.add_all(rollbacks)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(rollbacks)} 条申请回退记录。")
return len(rollbacks)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""
批量修改已有申请回退记录。
:param data_df: 包含申请回退数据的 DataFrame(必须包含 id 列)
:param user: 操作用户对象,用于设置 updated_by
:return: 成功更新的数量
:rtype: int
"""
if data_df.empty:
return 0
# 必须包含 id 列
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}' 列(主键)")
return 0
# 手动添加更新时间戳
data_df['updated_at'] = datetime.datetime.now()
# 添加更新者信息
if user:
data_df['updated_by'] = user.username
# 转换为字典列表
update_data = data_df.to_dict('records')
# 使用 bulk_update_mappings
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条申请回退记录。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""
批量保存申请回退数据,自动处理新建和更新。
:param data_df: 要保存的数据框架
:param user: 用户
:return: 新建和更新的数量
"""
# 筛选数据状态
_exists_df, _latest_df = await DcmApplyRollback.exists_act_id(data_df)
# 保存到数据库
_created_count = await DcmApplyRollback.create_batch(_latest_df, user)
_updated_count = await DcmApplyRollback.modify_batch(_exists_df, user)
return _created_count, _updated_count