Files
d3i-szct/models/govs_save_sign.py
2026-06-02 17:46:38 +08:00

322 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
import datetime
import random
from typing import Union
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, IntegerField, DateTimeField
from wtforms.validators import Length
import models
from models.db_models import TD3iGovsSaveSign
from models.common_model import CommonModel
from paste.core.logging import echo_log
from paste.rbac.rbac_user import RbacUser
from paste.util.pagination import Pagination
from paste.web.form import ModelForm
class GovsSaveSignForm(ModelForm):
"""工单签收表单验证类"""
id = IntegerField('主键')
master_id = IntegerField('主表ID')
master_id = StringField('代签收唯一标志', validators=[Length(max=64)])
order_id = StringField('工单ID', validators=[Length(max=64)])
order_no = StringField('工单号', validators=[Length(max=64)])
order_process_id = StringField('工单流程ID', validators=[Length(max=64)])
task_id = StringField('任务ID', validators=[Length(max=64)])
flag = StringField('签收标识', validators=[Length(max=64)])
save_status = IntegerField('提交状态')
oa_feedback_status = IntegerField('OA反馈状态')
flow_token = StringField('流令牌', validators=[Length(max=256)])
created_at = DateTimeField('创建时间')
created_by = StringField('创建者', validators=[Length(max=64)])
updated_at = DateTimeField('更新时间')
updated_by = StringField('更新者', validators=[Length(max=64)])
def process(self, formdata=None, obj=None, **kwargs):
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class GovsSaveSignBase(TD3iGovsSaveSign, CommonModel):
"""工单签收业务基类"""
FieldMapping = {
# 主键 & 关联ID
'id': 'id',
'master_id': 'gdId',
'gd_id': 'gdId',
'order_id': 'orderId',
'order_no': 'orderNo',
'order_process_id': 'orderProcessId',
'task_id': 'taskId',
# 业务标识
'flag': 'flag',
# 令牌
'flow_token': 'flowToken',
}
@classmethod
async def exist_other(cls, id: Union[str, int], master_id: Union[int, str] = None, order_id: str = None,
order_no: str = None):
"""检查是否存在除当前记录外的同唯一标识工单签收记录"""
_query = select(cls).where(cls.id != id)
if master_id:
_query = _query.where(cls.master_id == master_id)
if order_id:
_query = _query.where(cls.order_id == order_id)
if order_no:
_query = _query.where(cls.order_no == order_no)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def find_by_ids(cls, ids: list[Union[str, int]]):
"""根据ID列表批量查询工单签收记录"""
_query = select(cls).where(cls.id.in_(ids))
_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _list
@classmethod
async def is_exist(cls, master_id: Union[int, str] = None, order_id: str = None, order_no: str = None):
"""检查工单签收记录是否已存在"""
_query = select(cls)
if master_id:
_query = _query.where(cls.master_id == master_id)
if order_id:
_query = _query.where(cls.order_id == order_id)
if order_no:
_query = _query.where(cls.order_no == order_no)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""工单签收基础搜索(分页/不分页)"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
# 模糊查询字段配置
_name_likes = {
cls.master_id.key: '%{}%',
cls.order_no.key: '%{}%',
cls.master_id.key: '%{}%',
}
_query = select(cls).where(
*cls.search_wheres(likes=_name_likes, **kwargs)
).group_by(cls.id)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
# 排序处理
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.created_at.desc())
_df = await cls.query_as_df(_data_query)
if not _df.empty:
_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
_df[cls.id.key] = _df[cls.id.key].astype(str)
return _df, _paging
@classmethod
async def search(cls, **kwargs):
"""分页搜索工单签收记录,返回标准分页结构"""
_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count,
'rows': _df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number,
'page_count': _paging.page_count,
'page_size': _paging.page_size,
},
}
@classmethod
async def exists_master_id(cls, data_df: pd.DataFrame):
"""根据 master_id 区分已有数据/新增数据(批量保存用)"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
master_ids = data_df[cls.master_id.key].unique().tolist()
if not master_ids:
return pd.DataFrame(), data_df.copy()
_query = select(cls.id, cls.master_id).where(cls.master_id.in_(master_ids))
existing_df = await cls.query_as_df(_query)
if existing_df.empty:
return pd.DataFrame(), data_df.copy()
master_id_to_id_map = dict(zip(existing_df[cls.master_id.key], existing_df[cls.id.key]))
mask_exists = data_df[cls.master_id.key].isin(existing_df[cls.master_id.key])
exists_df = data_df[mask_exists].copy()
exists_df[cls.id.key] = exists_df[cls.master_id.key].map(master_id_to_id_map)
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@classmethod
async def find_by_master_id(cls, master_id: Union[str, int]):
"""根据主表ID查询单条签收记录"""
_query = select(cls).where(cls.master_id == master_id)
return await cls.query_first(_query)
@classmethod
async def find_by_order_id(cls, order_id: str):
"""根据工单ID查询签收记录"""
_query = select(cls).where(cls.order_id == order_id)
return await cls.query_first(_query)
@classmethod
async def find_by_master_ids(cls, master_ids: list[Union[str, int]]):
"""根据主表ID列表批量查询签收记录"""
_query = select(cls).where(cls.master_id.in_(master_ids))
_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _list
@register_swagger_model
class GovsSaveSign(GovsSaveSignBase):
"""工单签收业务操作类"""
@classmethod
async def create(cls, user: RbacUser = None, **kwargs):
"""新增工单签收记录"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsSaveSignForm(formdata=kwargs)
_form.validate_form()
_existing = await cls.is_exist(
master_id=_form.master_id.data,
order_id=_form.order_id.data,
order_no=_form.order_no.data
)
assert _existing is None, "该任务已存在确认签收记录,无法重复创建。"
_sign_info = cls().copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_sign_info.created_by = user.username
_sign_info.updated_by = user.username
await _sign_info.async_save()
return _sign_info
@classmethod
async def delete(cls, sign_id: Union[str, int]):
"""删除工单签收记录"""
_sign_info: cls = await cls.async_find_by_id(sign_id)
assert _sign_info, f"根据 ID {sign_id} 未找到工单签收记录。"
_del_query = delete(cls).where(cls.id == _sign_info.id)
await cls.raw_execute(_del_query)
echo_log(f'已删除工单签收记录(工单ID{_sign_info.master_id}ID{_sign_info.id}.')
return _sign_info
@classmethod
async def modify(cls, sign_id: Union[str, int], user: RbacUser = None, **kwargs):
"""修改工单签收记录"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsSaveSignForm(formdata=kwargs)
_form.validate_form()
_sign_info: cls = await cls.async_find_by_id(sign_id)
assert _sign_info, f'查无此工单签收记录。'
_sign_info.copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_sign_info.updated_by = user.username
await _sign_info.async_save()
return _sign_info
@classmethod
async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量新增工单签收记录"""
if data_df.empty:
return 0
if user:
data_df['created_by'] = user.username
data_df['updated_by'] = user.username
records = data_df.to_dict('records')
sign_list = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records]
session = cls.get_aio_session()
try:
session.add_all(sign_list)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(sign_list)} 条工单签收记录。")
return len(sign_list)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量修改工单签收记录"""
if data_df.empty:
return 0
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}'")
return 0
data_df['updated_at'] = datetime.datetime.now()
if user:
data_df['updated_by'] = user.username
update_data = data_df.to_dict('records')
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条工单签收记录。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量保存(自动区分新增/更新)"""
_exists_df, _latest_df = await cls.exists_master_id(data_df)
_created_count = await cls.create_batch(_latest_df, user)
_updated_count = await cls.modify_batch(_exists_df, user)
return _created_count, _updated_count