Files
d3i-szct/models/dcm_apply_delay.py
2026-06-02 17:46:38 +08:00

554 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
import random
from typing import Union
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, TextAreaField, IntegerField
from wtforms.validators import Length
import models
from models.common_model import CommonModel
from models.db_models import TD3iDcmApplyPostpone
from paste.core.logging import echo_log
from paste.rbac.rbac_user import RbacUser
from paste.util.pagination import Pagination
from paste.web.form import ModelForm
class DcmApplyPostponeForm(ModelForm):
"""
专业表单验证类(已完全根据 TD3iDcmApplyDelay 字段重构)。
用于验证和处理数字城管-申请延期接口的创建/修改表单数据。
字段完全映射数据库表 t_d3i_dcm_apply_delay 的字段结构。
"""
# 基础信息
id = IntegerField('主键ID')
dcm_task_id = IntegerField('任务ID')
task_number = StringField('任务号', validators=[Length(max=64, message='任务号长度不能超过64字符')])
apply_act_id = StringField('工单流程ID', validators=[Length(max=64, message='工单流程ID长度不能超过64字符')])
reply_part_id = StringField('回复环节ID', validators=[Length(max=64, message='回复环节ID长度不能超过64字符')])
ard_level = StringField('固定值(等级)', validators=[Length(max=32, message='固定值长度不能超过32字符')])
ard_type_id = StringField('延期类型', validators=[Length(max=32, message='延期类型长度不能超过32字符')])
apply_memo = TextAreaField('申请意见', validators=[Length(max=65535, message='申请意见长度不能超过65535字符')])
time_num = StringField('延期时长', validators=[Length(max=64, message='延期时长长度不能超过64字符')])
apply_type = StringField('申请类型', validators=[Length(max=64, message='申请类型长度不能超过64字符')])
delay_multiple = IntegerField('延期倍数')
postpone_date = StringField('延期日期', validators=[Length(max=64, message='延期日期长度不能超过64字符')])
time_unit = StringField('时间单位', validators=[Length(max=64, message='时间单位长度不能超过64字符')])
attachments = TextAreaField('附件', validators=[Length(max=65535, message='附件长度不能超过65535字符')])
status = IntegerField('提交状态')
def process(self, formdata=None, obj=None, **kwargs):
"""
处理表单数据,在数据绑定前进行预处理。
主要功能:
- 遍历所有表单字段
- 对字符串类型的值去除两端空白字符
- 调用父类的process方法继续处理
"""
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class DcmApplyPostponeBase(TD3iDcmApplyPostpone, CommonModel):
"""
专业基础类(已完全映射 TD3iDcmApplyDelay 字段)。
继承自数据库模型 TD3iDcmApplyDelay 和通用模型 CommonModel。
封装所有与申请延期相关的通用操作方法。
"""
FieldMapping = {
'id': 'id',
'dcm_task_id': 'dcm_task_id',
'task_number': 'task_number',
'apply_act_id': 'apply_act_id',
'reply_part_id': 'reply_part_id',
'ard_level': 'ard_level',
'ard_type_id': 'ard_type_id',
'apply_memo': 'apply_memo',
'timeNum': 'timeNum',
'postponeDate': 'postponeDate',
'time_unit': 'time_unit',
'attachments': 'attachments',
'status': 'status',
'created_at': 'created_at',
'created_by': 'created_by',
'updated_at': 'updated_at',
'updated_by': 'updated_by',
}
@classmethod
async def exist_other(cls, id: Union[str, int], task_number: str):
"""
检查是否存在除当前记录外的其他同任务号的延期申请。
:param id: 当前记录ID
:param task_number: 任务号
:return: 存在返回记录对象,不存在返回None
"""
_query = select(cls).where(cls.id != id, cls.task_number == task_number)
_record: cls = await cls.query_first(_query)
return _record
@classmethod
async def find_by_ids(cls, ids: list[Union[str, int]]):
"""
根据ID列表批量查找延期申请数据。
"""
_query = select(cls).where(cls.id.in_(ids))
_record_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _record_list
@classmethod
async def is_exist(cls, task_number: str):
"""
检查延期申请是否已经存在(根据任务号)。
:param task_number: 任务号
:return: 存在返回记录对象,不存在返回None
"""
_query = select(cls).where(cls.task_number == task_number)
_record: cls = await cls.query_first(_query)
return _record
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""
按参数搜索延期申请数据的基础方法。
支持字段:
- task_number, ard_type_id, status, created_by 等
- 支持模糊匹配:apply_memo, attachments
- 支持精确匹配:task_number, ard_type_id, status, dcm_task_id
:param is_paging: 是否分页
:param kwargs: 查询参数
:key int page_number: 页码(缺省随机1~100
:key int page_size: 每页数量(缺省20
:key dict sort_clause: 排序配置,如 {'task_number': 'asc'}
:key str task_number: 精确匹配任务号
:key str ard_type_id: 精确匹配延期类型
:key int status: 精确匹配提交状态
:key str apply_memo: 模糊匹配申请意见
:key str attachments: 模糊匹配附件内容
:key str created_by: 精确匹配创建者
:return: (DataFrame, Pagination)
"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
# 模糊查询字段
_name_likes = {
cls.apply_memo.key: '%{}%',
cls.attachments.key: '%{}%',
}
_query = select(cls).where(
*cls.search_wheres(likes=_name_likes, **kwargs)
).group_by(cls.id)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.task_number)
_record_df = await cls.query_as_df(_data_query)
if not _record_df.empty:
_record_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
_record_df[cls.id.key] = _record_df[cls.id.key].astype(str)
return _record_df, _paging
@classmethod
async def search(cls, **kwargs):
"""
按参数搜索延期申请数据,返回分页格式数据。
"""
_record_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count,
'rows': _record_df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number,
'page_count': _paging.page_count,
'page_size': _paging.page_size,
},
}
@classmethod
async def exists_task_number(cls, data_df: pd.DataFrame):
"""
查找 data_df 中在数据库中已存在和不存在的记录。根据 task_number 字段判断。
:param data_df: 输入的数据框架,必须包含 task_number 列
:return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame)
- exists_df: 在数据库中存在的记录
- latest_df: 在数据库中不存在的记录
"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
# 获取待查询的 task_number 列表(去重)
task_numbers = data_df[cls.task_number.key].unique().tolist()
if not task_numbers:
return pd.DataFrame(), data_df.copy()
# 查询数据库中已存在的 task_number
_query = select(cls.id, cls.task_number).where(cls.task_number.in_(task_numbers))
task_numbers_df = await cls.query_as_df(_query)
if task_numbers_df.empty:
return pd.DataFrame(), data_df.copy()
# 构建 task_number -> id 的映射字典
task_number_to_id_map = dict(zip(task_numbers_df[cls.task_number.key], task_numbers_df[cls.id.key]))
# 根据 task_number 是否在数据库中,划分数据
mask_exists = data_df[cls.task_number.key].isin(task_numbers_df[cls.task_number.key])
# 数据库已经有的记录
exists_df = data_df[mask_exists].copy()
# 自动补充从数据库查到的 id 字段
exists_df[cls.id.key] = exists_df[cls.task_number.key].map(task_number_to_id_map)
# 新的数据
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@register_swagger_model
class DcmApplyPostpone(DcmApplyPostponeBase):
"""
专业模型类(主业务类,完全继承 TD3iDcmApplyDelay 字段)。
---
description: 数字城管-申请延期接口
type: object
properties:
id:
description: 主键ID
type: integer
example: 1001
readOnly: true
dcm_task_id:
description: 关联的任务ID
type: integer
example: 2001
task_number:
description: 任务号
type: string
example: "TASK20240501001"
maxLength: 64
apply_act_id:
description: 工单流程ID
type: string
example: "ACT20240501001"
maxLength: 64
reply_part_id:
description: 回复环节ID
type: string
example: "PART_REPLY_001"
maxLength: 64
ard_level:
description: 固定值(等级)
type: string
example: "LEVEL_1"
maxLength: 32
ard_type_id:
description: 延期类型(固定值)
type: string
example: "DELAY_TYPE_1"
maxLength: 32
apply_memo:
description: 申请意见
type: string
example: "因天气原因,申请延期处理。"
maxLength: 65535
timeNum:
description: 延期时长
type: string
example: "3"
maxLength: 64
postponeDate:
description: 延期日期
type: string
example: "2024-06-01"
maxLength: 64
time_unit:
description: 时间单位
type: string
example: ""
maxLength: 64
attachments:
description: 附件(JSON格式或逗号分隔)
type: string
example: "file1.jpg,file2.pdf"
maxLength: 65535
status:
description: 提交状态(0草稿,1已提交,2已审批)
type: integer
example: 1
created_at:
description: 创建时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-15 10:30:00"
readOnly: true
created_by:
description: 创建者用户名
type: string
example: "D3I"
readOnly: true
updated_at:
description: 修改时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-16 14:25:00"
readOnly: true
updated_by:
description: 修改者用户名
type: string
example: "D3I"
readOnly: true
"""
@classmethod
async def create(cls, user: RbacUser = None, **kwargs):
"""
创建新的延期申请。
业务流程:
1. 使用 DcmApplyDelayForm 验证表单数据完整性
2. 检查任务号是否已存在延期申请
3. 创建新延期申请对象
4. 设置创建者和更新者为当前用户(默认为 D3I)
5. 保存到数据库
6. 返回创建的对象
:param RbacUser user: 操作用户对象(可选)
:param kwargs: 延期申请参数字典
:return: 新建延期申请对象
:rtype: DcmApplyPostpone
:raises AssertionError: 当任务号已存在时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = DcmApplyPostponeForm(formdata=kwargs)
_form.validate_form()
# 检查是否已存在相同任务号的延期申请
_exist = await cls.is_exist(_form.task_number.data)
assert _exist is None, "该任务已存在延期申请,不能重复提交。"
# 创建延期申请对象
_record = cls().copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_record.created_by = user.username
_record.updated_by = user.username
else:
# 默认为 D3I
if not _record.created_by:
_record.created_by = "D3I"
if not _record.updated_by:
_record.updated_by = "D3I"
await _record.async_save()
return _record
@classmethod
async def delete(cls, id: Union[str, int]):
"""
删除延期申请。
业务流程:
1. 根据ID查找延期申请
2. 验证存在性
3. 执行删除操作
:param id: 要删除的延期申请ID
:return: 删除的对象
:rtype: DcmApplyPostpone
:raises AssertionError: 当记录不存在时抛出
"""
_record: cls = await cls.async_find_by_id(id)
assert _record, f"根据 ID {id} 未找到延期申请记录。"
# 执行删除
_del_query = delete(cls).where(cls.id == _record.id)
_del_count = (await cls.raw_execute(_del_query)).rowcount
echo_log(f'已删除延期申请(任务号:{_record.task_number}ID{_record.id}.')
return _record
@classmethod
async def modify(cls, id: Union[str, int], user: RbacUser = None, **kwargs):
"""
修改已有延期申请信息。
业务流程:
1. 将 id 添加到参数中
2. 处理字符串字段去除首尾空格
3. 使用 DcmApplyDelayForm 验证表单数据
4. 检查是否有其他记录使用了相同的 task_number(排除自身)
5. 查询原记录对象
6. 验证存在性
7. 更新字段并设置更新者
8. 保存到数据库
9. 返回更新后的对象
:param id: 要修改的延期申请ID
:param RbacUser user: 操作用户对象
:param kwargs: 需要更新的字段
:return: 修改后的延期申请对象
:rtype: DcmApplyPostpone
:raises AssertionError: 当记录不存在或任务号重复时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
# 表单验证
_form = DcmApplyPostponeForm(formdata=kwargs)
_form.validate_form()
# 检查是否与其他记录重复(排除自身)
_other = await cls.exist_other(id, _form.task_number.data)
assert _other is None, "该任务号已存在其他延期申请,不能重复修改。"
# 查询原记录
_record: cls = await cls.async_find_by_id(id)
assert _record, f'查无此延期申请记录。'
# 更新字段
_record.copy_from_dict(_form.data, skip_none=True).before_save()
_record.updated_by = user.username if user else "D3I"
await _record.async_save()
return _record
@classmethod
async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""
批量创建新的延期申请(传入数据应为全新记录,无需校验是否存在)。
:param data_df: 包含延期申请数据的 DataFrame,字段需与模型属性匹配(如 task_number, apply_act_id 等)
:param user: 操作用户对象,用于设置 created_by / updated_by
:return: 成功创建的数量
:rtype: int
"""
if data_df.empty:
return 0
# 向量化设置用户字段(无循环)
if user:
data_df['created_by'] = user.username
data_df['updated_by'] = user.username
else:
# 默认 D3I
data_df['created_by'] = "D3I"
data_df['updated_by'] = "D3I"
# 一次性转为字典列表(C 层高效)
records = data_df.to_dict('records')
# 用列表推导式构造对象
records = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records]
# 批量插入
session = cls.get_aio_session()
try:
session.add_all(records)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(records)} 条延期申请。")
return len(records)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""
批量修改已有延期申请。
:param data_df: 包含延期申请数据的 DataFrame,必须包含 id 列
:param user: 操作用户对象,用于设置 updated_by
:return: 成功更新的数量
:rtype: int
"""
if data_df.empty:
return 0
# 必须包含 id 列
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}' 列(主键)")
return 0
# 手动添加更新时间戳
data_df['updated_at'] = datetime.datetime.now()
# 添加更新者信息
if user:
data_df['updated_by'] = user.username
else:
data_df['updated_by'] = "D3I"
# 转换为字典列表
update_data = data_df.to_dict('records')
# 使用 bulk_update_mappings
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条延期申请。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""
批量保存数据,自动处理新建和更新。
:param data_df: 要保存的数据框架
:param user: 用户
:return: 新建和更新的数量
"""
# 筛选数据状态
_exists_df, _latest_df = await DcmApplyPostpone.exists_task_number(data_df)
# 保存到数据库
_created_count = await DcmApplyPostpone.create_batch(_latest_df, user)
_updated_count = await DcmApplyPostpone.modify_batch(_exists_df, user)
return _created_count, _updated_count