Files
d3i-szct/models/govs_create_delay.py
2026-06-02 17:46:38 +08:00

347 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
import datetime
import random
from typing import Union
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, TextAreaField, IntegerField, DateTimeField
from wtforms.validators import Length
import models
from models.db_models import TD3iGovsApplicationForDelay
from models.common_model import CommonModel
from paste.core.logging import echo_log
from paste.rbac.rbac_user import RbacUser
from paste.util.pagination import Pagination
from paste.web.form import ModelForm
class GovsApplicationForDelayForm(ModelForm):
"""延时申请表单验证类"""
id = IntegerField('主键')
master_id = IntegerField('主表ID')
gd_id = StringField('代签收唯一标志', validators=[Length(max=64)])
finally_time_after_approve = StringField('延时申请通过后时间', validators=[Length(max=64)])
finally_time_before_approve = StringField('计划完成时间', validators=[Length(max=64)])
request_delay = StringField('申请延时时长', validators=[Length(max=64)])
is_nature_day = StringField('延时时长类型', validators=[Length(max=10)])
already_notify_order_user = StringField('是否已告知诉求人', validators=[Length(max=10)])
request_reason = TextAreaField('延时原因')
remarks = StringField('备注', validators=[Length(max=500)])
contact_name = StringField('何人', validators=[Length(max=100)])
contact_time = StringField('何时', validators=[Length(max=64)])
contact_type = StringField('何方式(编码)', validators=[Length(max=64)])
contact_type_name = StringField('何方式(名称)', validators=[Length(max=100)])
reply_script = TextAreaField('答复脚本')
file_id_str = TextAreaField('OA文件id')
order_no = StringField('工单号', validators=[Length(max=64)])
process_instance_id = StringField('流程实例ID', validators=[Length(max=64)])
request_delay_time = StringField('申请延时时长(字符串)', validators=[Length(max=64)])
save_id = StringField('提交数据ID', validators=[Length(max=64)])
order_id = StringField('工单ID', validators=[Length(max=64)])
save_status = IntegerField('提交状态')
oa_feedback_status = IntegerField('OA反馈状态')
flow_token = StringField('流令牌', validators=[Length(max=256)])
created_at = DateTimeField('创建时间')
created_by = StringField('创建者', validators=[Length(max=64)])
updated_at = DateTimeField('更新时间')
updated_by = StringField('更新者', validators=[Length(max=64)])
def process(self, formdata=None, obj=None, **kwargs):
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class GovsApplicationForDelayBase(TD3iGovsApplicationForDelay, CommonModel):
"""延时申请业务基类"""
FieldMapping = {
# 主键与关联
'id': 'id',
'master_id': 'gdId',
'order_id': 'orderId',
'order_no': 'orderNo',
'process_instance_id': 'processInstanceId',
'gd_id': 'gdId',
# 延时核心字段
'finally_time_after_approve': 'finallyTimeAfterApprove',
'finally_time_before_approve': 'finallyTimeBeforeApprove',
'request_delay': 'requestDelay',
'is_nature_day': 'isNatureDay',
'request_delay_time': 'requestDelayTime',
# 沟通告知字段
'already_notify_order_user': 'alreadyNotifyOrderUser',
'contact_name': 'contactName',
'contact_time': 'contactTime',
'contact_type': 'contactType',
'contact_type_name': 'contactTypeName',
'reply_script': 'replyScript',
# 原因与附件
'request_reason': 'requestReason',
'remarks': 'remarks',
'file_id_str': 'fileIdStr'
}
@classmethod
async def exist_other(cls, id: Union[str, int], master_id: Union[str, int] = None, order_id: str = None,
order_no: str = None):
"""检查是否存在除当前记录外的其他同唯一标识延时申请"""
_query = select(cls).where(cls.id != id)
if master_id:
_query = _query.where(cls.master_id == master_id)
if order_id:
_query = _query.where(cls.order_id == order_id)
if order_no:
_query = _query.where(cls.order_no == order_no)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def find_by_ids(cls, ids: list[Union[str, int]]):
"""根据ID列表批量查找延时申请"""
_query = select(cls).where(cls.id.in_(ids))
_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _list
@classmethod
async def is_exist(cls, master_id: Union[str, int] = None, order_id: str = None, order_no: str = None):
"""检查延时申请是否已经存在"""
_query = select(cls)
if master_id:
_query = _query.where(cls.master_id == master_id)
if order_id:
_query = _query.where(cls.order_id == order_id)
if order_no:
_query = _query.where(cls.order_no == order_no)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""按参数搜索延时申请的基础方法"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
_name_likes = {
cls.master_id.key: '%{}%',
cls.order_no.key: '%{}%'
}
_query = select(cls).where(
*cls.search_wheres(likes=_name_likes, **kwargs)
).group_by(cls.id)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.created_at.desc())
_df = await cls.query_as_df(_data_query)
if not _df.empty:
_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
_df[cls.id.key] = _df[cls.id.key].astype(str)
return _df, _paging
@classmethod
async def search(cls, **kwargs):
"""按参数搜索延时申请,返回分页格式数据"""
_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count,
'rows': _df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number,
'page_count': _paging.page_count,
'page_size': _paging.page_size,
},
}
@classmethod
async def exists_master_id(cls, data_df: pd.DataFrame):
"""根据 master_id 判断数据是否存在"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
master_ids = data_df[cls.master_id.key].unique().tolist()
if not master_ids:
return pd.DataFrame(), data_df.copy()
_query = select(cls.id, cls.master_id).where(cls.master_id.in_(master_ids))
existing_df = await cls.query_as_df(_query)
if existing_df.empty:
return pd.DataFrame(), data_df.copy()
master_id_to_id_map = dict(zip(existing_df[cls.master_id.key], existing_df[cls.id.key]))
mask_exists = data_df[cls.master_id.key].isin(existing_df[cls.master_id.key])
exists_df = data_df[mask_exists].copy()
exists_df[cls.id.key] = exists_df[cls.master_id.key].map(master_id_to_id_map)
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@classmethod
async def find_by_master_id(cls, master_id: Union[str, int]):
"""根据主表ID查找延时申请"""
_query = select(cls).where(cls.master_id == master_id)
return await cls.query_first(_query)
@classmethod
async def find_by_order_id(cls, order_id: str):
"""根据工单ID查找延时申请"""
_query = select(cls).where(cls.order_id == order_id)
return await cls.query_first(_query)
@classmethod
async def find_by_master_ids(cls, master_ids: list[Union[str, int]]):
"""根据主表ID列表批量查找延时申请"""
_query = select(cls).where(cls.master_id.in_(master_ids))
_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _list
@register_swagger_model
class GovsApplicationForDelay(GovsApplicationForDelayBase):
"""延时申请业务类"""
@classmethod
async def create(cls, user: RbacUser = None, **kwargs):
"""创建新延时申请"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsApplicationForDelayForm(formdata=kwargs)
_form.validate_form()
_existing = await cls.is_exist(
master_id=_form.master_id.data,
order_id=_form.order_id.data,
order_no=_form.order_no.data
)
assert _existing is None, "该任务已存在申请延期记录,不能重复创建。"
_delay = cls().copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_delay.created_by = user.username
_delay.updated_by = user.username
await _delay.async_save()
return _delay
@classmethod
async def delete(cls, delay_id: Union[str, int]):
"""删除延时申请"""
_delay: cls = await cls.async_find_by_id(delay_id)
assert _delay, f"根据 ID {delay_id} 未找到延时申请。"
_del_query = delete(cls).where(cls.id == _delay.id)
await cls.raw_execute(_del_query)
echo_log(f'已删除延时申请(工单ID{_delay.master_id}ID{_delay.id}.')
return _delay
@classmethod
async def modify(cls, delay_id: Union[str, int], user: RbacUser = None, **kwargs):
"""修改延时申请信息"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsApplicationForDelayForm(formdata=kwargs)
_form.validate_form()
_delay: cls = await cls.async_find_by_id(delay_id)
assert _delay, f'查无此延时申请信息。'
_delay.copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_delay.updated_by = user.username
await _delay.async_save()
return _delay
@classmethod
async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量创建延时申请"""
if data_df.empty:
return 0
if user:
data_df['created_by'] = user.username
data_df['updated_by'] = user.username
records = data_df.to_dict('records')
delays = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records]
session = cls.get_aio_session()
try:
session.add_all(delays)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(delays)} 条延时申请。")
return len(delays)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量修改延时申请"""
if data_df.empty:
return 0
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}'")
return 0
data_df['updated_at'] = datetime.datetime.now()
if user:
data_df['updated_by'] = user.username
update_data = data_df.to_dict('records')
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条延时申请。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量保存数据,自动处理新建和更新"""
_exists_df, _latest_df = await cls.exists_master_id(data_df)
_created_count = await cls.create_batch(_latest_df, user)
_updated_count = await cls.modify_batch(_exists_df, user)
return _created_count, _updated_count