Files
d3i-szct/models/govs_order_process.py
T
2026-06-02 17:46:38 +08:00

520 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
import datetime
import random
from typing import Union,Optional,Callable
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, TextAreaField, IntegerField, DateTimeField
from wtforms.validators import Length
import models
from models.common_model import CommonModel
from models.db_models import TD3iGovsOrderProces
from paste.core.logging import echo_log
from paste.rbac.rbac_user import RbacUser
from paste.util.pagination import Pagination
from paste.web.form import ModelForm
class GovsOrderProcessForm(ModelForm):
"""省12345工单处理流程表单验证类"""
id = IntegerField('工单处理记录唯一ID')
master_id = IntegerField('主工单ID')
tenant_id = IntegerField('租户ID')
plan_sign_time = DateTimeField('计划签收时间')
plan_finish_time = DateTimeField('计划完成时间')
plan_back_time = DateTimeField('计划退回时间')
deal_date = DateTimeField('实际处理时间')
hand_over_time = StringField('交接时间', validators=[Length(max=20)])
sign_over_time = StringField('签收超时时间', validators=[Length(max=20)])
origin_plan_finish_time = DateTimeField('原始计划完成时间')
origin_plan_sign_time = DateTimeField('原始计划签收时间')
order_id = StringField('工单编号', validators=[Length(max=50)])
order_no = StringField('工单流水号', validators=[Length(max=100)])
process_instance_id = StringField('流程实例ID', validators=[Length(max=64)])
order_status = StringField('工单状态编码', validators=[Length(max=10)])
is_over_time = IntegerField('是否超期')
is_sign_over_time = IntegerField('是否签收超时')
action_name = StringField('当前操作动作名称', validators=[Length(max=100)])
deal_type = StringField('处理类型', validators=[Length(max=100)])
task_id = StringField('当前任务ID', validators=[Length(max=64)])
next_task_id = StringField('下一任务ID', validators=[Length(max=64)])
next_action_name = StringField('下一处理动作名称', validators=[Length(max=100)])
next_handle = StringField('下一处理动作名称', validators=[Length(max=50)])
next_handle_name = StringField('下一处理动作详细名称', validators=[Length(max=100)])
handler_user_ids = StringField('当前处理人ID列表', validators=[Length(max=500)])
handler_user_names = StringField('当前处理人姓名列表', validators=[Length(max=500)])
handler_org_ids = StringField('当前处理部门ID列表', validators=[Length(max=1000)])
handler_org_names = StringField('当前处理部门名称列表', validators=[Length(max=500)])
next_handler_user_ids = StringField('下一处理人ID列表', validators=[Length(max=500)])
next_handler_user_names = StringField('下一处理人姓名列表', validators=[Length(max=500)])
next_org_ids = StringField('下一处理部门ID列表', validators=[Length(max=500)])
next_org_names = StringField('下一处理部门名称列表', validators=[Length(max=500)])
dispatch_order_id = StringField('派发工单ID', validators=[Length(max=100)])
to_master_id = IntegerField('目标主表ID')
to_tenant_id = IntegerField('目标租户ID')
to_area_code = StringField('目标区域代码', validators=[Length(max=20)])
to_dept_id = IntegerField('目标部门ID')
dispatch_value = StringField('派发值', validators=[Length(max=20)])
has_dispatch_process = IntegerField('是否有派发流程')
contact_name = StringField('联系人姓名', validators=[Length(max=100)])
contact_time = DateTimeField('联系时间')
contact_type = StringField('联系类型', validators=[Length(max=20)])
adv_content = TextAreaField('处理建议')
remarks = TextAreaField('备注信息')
formal_reply = TextAreaField('正式回复内容')
reply_to_people = StringField('回复对象', validators=[Length(max=100)])
return_reason = StringField('退回原因', validators=[Length(max=500)])
notice_org_id = IntegerField('通知组织ID')
line_key = StringField('线路标识', validators=[Length(max=100)])
current_task_status = StringField('当前任务状态', validators=[Length(max=50)])
visit_type = StringField('访问类型', validators=[Length(max=50)])
attachment_dto_list = TextAreaField('附件列表JSON')
child_order_processes = TextAreaField('子流程处理记录JSON')
order_process_index_list = TextAreaField('工单流程索引列表JSON')
created_at = DateTimeField('创建时间')
created_by = StringField('创建者', validators=[Length(max=64)])
updated_at = DateTimeField('更新时间')
updated_by = StringField('更新者', validators=[Length(max=64)])
def process(self, formdata=None, obj=None, **kwargs):
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class GovsOrderProcessBase(TD3iGovsOrderProces, CommonModel):
"""省12345工单处理流程业务基类"""
FieldMapping = {
# ==================== 主键与关联 ====================
'id': 'id',
'master_id': 'masterId',
'tenant_id': 'tenantId',
# ==================== 时间节点 ====================
'plan_sign_time': 'planSignTime',
'plan_finish_time': 'planFinishTime',
'plan_back_time': 'planBackTime',
'deal_date': 'dealDate',
'hand_over_time': 'handOverTime',
'sign_over_time': 'signOverTime',
'origin_plan_finish_time': 'originPlanFinishTime',
'origin_plan_sign_time': 'originPlanSignTime',
# ==================== 工单标识 ====================
'order_id': 'orderId',
'order_no': 'orderNo',
'process_instance_id': 'processInstanceId',
# ==================== 工单状态 ====================
'order_status': 'orderStatus',
'is_over_time': 'isOverTime',
'is_sign_over_time': 'isSignOverTime',
# ==================== 处理动作 ====================
'action_name': 'actionName',
'deal_type': 'dealType',
'task_id': 'taskId',
'next_task_id': 'nextTaskId',
'next_action_name': 'nextActionName',
'next_handle': 'nextHandle',
'next_handle_name': 'nextHandleName',
# ==================== 当前处理人/部门 ====================
'handler_user_ids': 'handlerUserIds',
'handler_user_names': 'handlerUserNames',
'handler_org_ids': 'handlerOrgIds',
'handler_org_names': 'handlerOrgNames',
# ==================== 下一处理人/部门 ====================
'next_handler_user_ids': 'nextHandlerUserIds',
'next_handler_user_names': 'nextHandlerUserNames',
'next_org_ids': 'nextOrgIds',
'next_org_names': 'nextOrgNames',
# ==================== 派发信息 ====================
'dispatch_order_id': 'dispatchOrderId',
'to_master_id': 'toMasterId',
'to_tenant_id': 'toTenantId',
'to_area_code': 'toAreaCode',
'to_dept_id': 'toDeptId',
'dispatch_value': 'dispatchValue',
'has_dispatch_process': 'hasDispatchProcess',
# ==================== 联系信息 ====================
'contact_name': 'contactName',
'contact_time': 'contactTime',
'contact_type': 'contactType',
# ==================== 内容字段 ====================
'adv_content': 'advContent',
'remarks': 'remarks',
'formal_reply': 'formalReply',
'reply_to_people': 'replyToPeople',
'return_reason': 'returnReason',
# ==================== 其他字段 ====================
'notice_org_id': 'noticeOrgId',
'line_key': 'lineKey',
'current_task_status': 'currentTaskStatus',
'visit_type': 'visitType',
# ==================== JSON 嵌套字段 ====================
'attachment_dto_list': 'attachmentDTOList',
'child_order_processes': 'childOrderProcesses',
}
@classmethod
async def exist_other(cls, id: Union[str, int], order_id: str = None, order_no: str = None):
"""检查是否存在除当前记录外的其他同编号处理流程"""
_query = select(cls).where(cls.id != id)
if order_id:
_query = _query.where(cls.order_id == order_id)
if order_no:
_query = _query.where(cls.order_no == order_no)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def find_by_ids(cls, ids: list[Union[str, int]]):
"""根据ID列表批量查找处理流程"""
_query = select(cls).where(cls.id.in_(ids))
_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _list
@classmethod
async def is_exist(cls, order_id: str = None, order_no: str = None):
"""检查处理流程是否已经存在"""
_query = select(cls)
if order_id:
_query = _query.where(cls.order_id == order_id)
if order_no:
_query = _query.where(cls.order_no == order_no)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""按参数搜索处理流程的基础方法"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
_name_likes = {
cls.order_status.key: '%{}%',
cls.action_name.key: '%{}%',
cls.deal_type.key: '%{}%',
}
_query = select(cls).where(
*cls.search_wheres(likes=_name_likes, **kwargs)
).group_by(cls.id)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.created_at.desc())
_df = await cls.query_as_df(_data_query)
if not _df.empty:
_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
_df[cls.id.key] = _df[cls.id.key].astype(str)
return _df, _paging
@classmethod
async def search(cls, **kwargs):
"""按参数搜索处理流程,返回分页格式数据"""
_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count,
'rows': _df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number,
'page_count': _paging.page_count,
'page_size': _paging.page_size,
},
}
@classmethod
async def exists_master_id(cls, data_df: pd.DataFrame):
"""根据 master_id 判断数据是否存在
:param data_df: 输入的数据框架,必须包含 id 和 master_id 列
:return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame)
- exists_df: 在数据库中存在的记录(附带数据库中的 id)
- latest_df: 在数据库中不存在的记录
"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
# 获取待查询的 (id, master_id) 组合
pairs = data_df[[cls.id.key, cls.master_id.key]].drop_duplicates().values.tolist()
if not pairs:
return pd.DataFrame(), data_df.copy()
# 查询数据库中已存在的记录(使用 IN 批量查询)
_query = select(cls.id, cls.master_id).where(
(cls.id.in_([p[0] for p in pairs])) &
(cls.master_id.in_([p[1] for p in pairs]))
)
exists_db_df = await cls.query_as_df(_query)
if exists_db_df.empty:
return pd.DataFrame(), data_df.copy()
exists_db_df[cls.id.key] = exists_db_df[cls.id.key].astype(str)
exists_db_df[cls.master_id.key] = exists_db_df[cls.master_id.key].astype(str)
# 构建 (id, master_id) -> id 的映射(用于快速查找)
key_to_id_map = dict(zip(
zip(exists_db_df[cls.id.key], exists_db_df[cls.master_id.key]),
exists_db_df[cls.id.key]
))
# 标记 data_df 中哪些行在数据库中存在
mask_exists = data_df.apply(
lambda row: (row[cls.id.key], row[cls.master_id.key]) in key_to_id_map,
axis=1
)
# 提取存在的记录,并补充数据库中的 id(虽然输入中已有 id,但为一致性保留)
exists_df = data_df[mask_exists].copy()
exists_df[cls.id.key] = exists_df.apply(
lambda row: key_to_id_map[(row[cls.id.key], row[cls.master_id.key])],
axis=1
)
# 提取不存在的记录
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@classmethod
async def find_by_order_id(cls, order_id: str):
"""根据工单编号查找处理流程"""
_query = select(cls).where(cls.order_id == order_id)
return (await cls.orm_execute_scalars(_query)).all()
@classmethod
async def find_by_master_id(cls, master_id: Union[str, int]):
"""根据主工单ID查找处理流程"""
_query = select(cls).where(cls.master_id == master_id)
return (await cls.orm_execute_scalars(_query)).all()
@classmethod
async def find_latest_by_order_id(cls, order_id: str):
"""根据工单编号查找最新的处理流程"""
_query = select(cls).where(cls.order_id == order_id).order_by(cls.deal_date.desc())
return await cls.query_first(_query)
@classmethod
async def fill_process_info(cls, data_df: pd.DataFrame, index_field: str = 'id',
column_name: str = 'process_infos',
preprocessing: Optional[Callable] = None):
"""
填充办理过程数据到数据框架。
用于在查询结果中添加关联的办理过程信息。
:param pandas.DataFrame data_df: 待填充的数据框架
:param str index_field: 索引字段,一般是任务ID
:param str column_name: 填充时,新增加的列名称,默认为`process_infos`
:param preprocessing: 预处理,注意预处理必须要返回处理后的结果
:return: 办理过程数据框架(已填充)
:rtype: pandas.DataFrame
"""
if data_df.empty:
return pd.DataFrame()
_task_ids = list(set(data_df[index_field].unique().tolist()))
if not _task_ids:
return pd.DataFrame()
_query = select(cls).where(cls.master_id.in_(_task_ids))
_info_df: pd.DataFrame = await cls.query_as_df(_query)
if not _info_df.empty:
_info_df.replace(models.EmptyInDF+models.EmptyDatetimeInDF, '', inplace=True)
# 整理输出数据类型
_info_df[cls.id.key] = _info_df[cls.id.key].astype(str)
_info_df[cls.master_id.key] = _info_df[cls.master_id.key].astype(str)
# 设置索引
_info_df['index_id'] = _info_df[cls.master_id.key]
_info_df.set_index(['index_id'], inplace=True)
# 对数据进行预处理
if isinstance(preprocessing, Callable):
_info_df = preprocessing(_info_df)
# 增加数据填充列
data_df[column_name] = data_df[index_field].apply(
lambda x: _info_df.query(f"{cls.master_id.key}=='{x}'").to_dict('records')
)
else:
data_df[column_name] = [[] for _ in range(len(data_df))]
return _info_df
@register_swagger_model
class GovsOrderProcess(GovsOrderProcessBase):
"""省12345工单处理流程业务类"""
@classmethod
async def create(cls, user: RbacUser = None, **kwargs):
"""创建新处理流程记录"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsOrderProcessForm(formdata=kwargs)
_form.validate_form()
_existing = await cls.is_exist(
order_id=_form.order_id.data,
order_no=_form.order_no.data
)
if _existing:
# 如果存在相同 order_id 的记录,更新它
return await cls.modify(_existing.id, user, **kwargs)
_task = cls().copy_from_dict(_form.data, skip_none=True)
if user:
_task.created_by = user.username
_task.updated_by = user.username
await _task.async_save()
return _task
@classmethod
async def delete(cls, task_id: Union[str, int]):
"""删除处理流程记录"""
_task: cls = await cls.async_find_by_id(task_id)
assert _task, f"根据 ID {task_id} 未找到处理流程记录。"
_del_query = delete(cls).where(cls.id == _task.id)
await cls.raw_execute(_del_query)
echo_log(f'已删除处理流程记录(工单号:{_task.order_no}ID{_task.id}.')
return _task
@classmethod
async def modify(cls, task_id: Union[str, int], user: RbacUser = None, **kwargs):
"""修改处理流程信息"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsOrderProcessForm(formdata=kwargs)
_form.validate_form()
_other = await cls.exist_other(
task_id,
order_id=_form.order_id.data,
order_no=_form.order_no.data
)
if _other:
# 如果存在其他相同编号的记录,不重复创建
echo_log(f"处理流程记录已存在(工单号:{_form.order_no.data}),跳过创建。")
return _other
_task: cls = await cls.async_find_by_id(task_id)
assert _task, f'查无此处理流程信息。'
_task.copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_task.updated_by = user.username
await _task.async_save()
return _task
@classmethod
async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量创建处理流程记录"""
if data_df.empty:
return 0
if user:
data_df['created_by'] = user.username
data_df['updated_by'] = user.username
records = data_df.to_dict('records')
tasks = [cls().copy_from_dict(record, skip_none=True) for record in records]
session = cls.get_aio_session()
try:
session.add_all(tasks)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(tasks)} 条处理流程记录。")
return len(tasks)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量修改处理流程记录"""
if data_df.empty:
return 0
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}'")
return 0
data_df['updated_at'] = datetime.datetime.now()
if user:
data_df['updated_by'] = user.username
update_data = data_df.to_dict('records')
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条处理流程记录。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量保存数据,自动处理新建和更新"""
_exists_df, _latest_df = await cls.exists_master_id(data_df)
_created_count = await cls.create_batch(_latest_df, user)
_updated_count = await cls.modify_batch(_exists_df, user)
return _created_count, _updated_count
@classmethod
async def delete_by_order_id(cls, order_id: str):
"""根据工单编号删除处理流程记录"""
_del_query = delete(cls).where(cls.order_id == order_id)
result = await cls.raw_execute(_del_query)
echo_log(f'已删除工单 {order_id} 的处理流程记录,共 {result.rowcount} 条.')
return result.rowcount
@classmethod
async def delete_by_master_id(cls, master_id: Union[str, int]):
"""根据主工单ID删除处理流程记录"""
_del_query = delete(cls).where(cls.master_id == master_id)
result = await cls.raw_execute(_del_query)
echo_log(f'已删除主工单 {master_id} 的处理流程记录,共 {result.rowcount} 条.')
return result.rowcount