Files
d3i-szct/models/govs_order_detail.py
2026-06-02 17:46:38 +08:00

601 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
import datetime
import random
from typing import Union
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, TextAreaField, IntegerField, DateTimeField
from wtforms.validators import Length
import models
from models.common_model import CommonModel
from models.db_models import TD3iGovsOrderDetail
from paste.core.logging import echo_log
from paste.rbac.rbac_user import RbacUser
from paste.util.pagination import Pagination
from paste.web.form import ModelForm
class GovsOrderDetailForm(ModelForm):
"""省12345工单详情表单验证类"""
id = IntegerField('详情记录唯一ID')
master_id = IntegerField('关联工单主表ID')
order_id = StringField('工单编号', validators=[Length(max=50)])
order_no = StringField('工单号', validators=[Length(max=50)])
master_id_origin = StringField('原始主表ID', validators=[Length(max=50)])
tenant_id = StringField('租户ID', validators=[Length(max=50)])
tenant_name = StringField('租户名称', validators=[Length(max=50)])
order_status = StringField('工单状态码', validators=[Length(max=10)])
order_status_for_view = StringField('工单状态显示值', validators=[Length(max=50)])
claim_status = StringField('签收状态', validators=[Length(max=10)])
over_due = StringField('是否超期', validators=[Length(max=10)])
is_supervise = StringField('是否督办', validators=[Length(max=10)])
first_order_status = StringField('一级状态编码', validators=[Length(max=10)])
secord_order_status = StringField('二级状态编码', validators=[Length(max=10)])
atomic_order_status = StringField('原子状态编码', validators=[Length(max=10)])
order_invalid_type = TextAreaField('工单作废原因')
order_finish_time = DateTimeField('工单完成时间')
case_content = TextAreaField('诉求内容')
case_goal = TextAreaField('诉求目的')
title = StringField('工单标题', validators=[Length(max=500)])
case_labels = TextAreaField('工单标签列表')
case_public = StringField('是否公开', validators=[Length(max=10)])
hotspot = StringField('是否热点事件', validators=[Length(max=10)])
case_is_urgent = StringField('紧急程度', validators=[Length(max=10)])
case_is_visit = StringField('是否回访', validators=[Length(max=10)])
info_protect = StringField('信息保护', validators=[Length(max=10)])
case_accord_type_one_name = StringField('诉求归口一级', validators=[Length(max=50)])
case_accord_type_two_name = StringField('诉求归口二级', validators=[Length(max=50)])
case_accord_type_three_name = StringField('诉求归口三级', validators=[Length(max=50)])
case_accord_type_four_name = StringField('诉求归口四级', validators=[Length(max=50)])
case_accord_type_five_name = StringField('诉求归口五级', validators=[Length(max=50)])
case_accord_code = StringField('事项编码', validators=[Length(max=50)])
first_level_affiliation = TextAreaField('一级归属单位')
second_level_affiliation = TextAreaField('二级归属单位')
third_level_affiliation = TextAreaField('三级归属单位')
fourth_level_affiliation = TextAreaField('四级归属单位')
fifth_level_affiliation = TextAreaField('五级归属单位')
sixth_level_affiliation = TextAreaField('六级归属单位')
seventh_level_affiliation = TextAreaField('七级归属单位')
appeal_dept = StringField('诉求部门', validators=[Length(max=100)])
order_source = StringField('诉求来源', validators=[Length(max=50)])
order_source_detail = StringField('诉求来源详情', validators=[Length(max=50)])
order_source_for_view = StringField('诉求来源显示值', validators=[Length(max=50)])
belong_platform = StringField('所属平台代码', validators=[Length(max=50)])
belong_platform_name = StringField('受理平台名称', validators=[Length(max=50)])
current_processing_platform = TextAreaField('当前处理平台')
service_object_type = StringField('服务对象类型', validators=[Length(max=50)])
order_type = StringField('表单类型', validators=[Length(max=50)])
form_type = StringField('表单类型代码', validators=[Length(max=50)])
area_code = StringField('区域代码', validators=[Length(max=10)])
area_code_city = StringField('市区域代码', validators=[Length(max=50)])
area_code_area = StringField('区区域代码', validators=[Length(max=50)])
area_code_street = StringField('街道区域代码', validators=[Length(max=50)])
address_detail = StringField('详细地址', validators=[Length(max=500)])
case_lnglat = StringField('地理坐标', validators=[Length(max=100)])
call_number = StringField('来电号码', validators=[Length(max=20)])
call_number_for_dh = StringField('来电号码(脱敏)', validators=[Length(max=20)])
raw_call_numer = StringField('原始来电号码', validators=[Length(max=20)])
contact_number = StringField('联系电话', validators=[Length(max=20)])
raw_contact_number = StringField('原始联系电话', validators=[Length(max=20)])
contact_number_for_dh = StringField('联系电话(脱敏)', validators=[Length(max=20)])
call_time = DateTimeField('来电时间')
order_sound_record_id = StringField('通话记录ID', validators=[Length(max=50)])
create_date = DateTimeField('创建日期')
update_date = DateTimeField('更新日期')
plan_finish_time = DateTimeField('计划完成时间')
plan_sign_time = DateTimeField('计划签收时间')
judgment_flag = StringField('判定标志', validators=[Length(max=10)])
is_coordination = StringField('是否协调', validators=[Length(max=10)])
coordination_time = DateTimeField('协调时间')
thrid_order_id = TextAreaField('第三方工单ID')
relate_order_ids = TextAreaField('关联工单ID列表')
relate_order_count = IntegerField('关联工单数量')
order_user_id = StringField('用户ID', validators=[Length(max=50)])
user_word = TextAreaField('用户反馈')
show_flag = StringField('显示标志', validators=[Length(max=10)])
origin_show = IntegerField('原始显示标志')
order_user = TextAreaField('诉求人信息(JSON对象)')
order_phone_dto = TextAreaField('电话号码信息(JSON对象)')
order_attachment_list = TextAreaField('附件列表(JSON数组)')
pre_process_list = TextAreaField('预处理流程列表(JSON数组)')
tripartite_call_records = TextAreaField('三方通话记录(JSON对象)')
tripartite_call_records_list = TextAreaField('三方通话记录列表(JSON数组)')
order_custom_form_fields = TextAreaField('自定义表单字段(JSON数组)')
knowledge_references = TextAreaField('知识参考(JSON对象)')
sound_recording_address_list = TextAreaField('录音文件路径列表(JSON数组)')
active_dept_ids = TextAreaField('当前处理部门ID列表')
attachment_ids = TextAreaField('附件ID列表')
attachment_list = TextAreaField('附件列表JSON')
contactor_list = TextAreaField('联系人列表(JSON数组)')
tsjb_entry_info = TextAreaField('投诉举报入口信息(JSON对象)')
order_erge_revoke_plug_dto_list = TextAreaField('撤销插件信息(JSON数组)')
order_environmental = TextAreaField('环境信息(JSON对象)')
order_demands_dto = TextAreaField('诉求DTOJSON对象)')
order_appeal_list = TextAreaField('申诉列表(JSON数组)')
torder_process_list = TextAreaField('流程列表(JSON数组)')
pre_process = TextAreaField('预处理信息(JSON对象)')
extension = TextAreaField('扩展字段')
remark = TextAreaField('备注')
file_exist = IntegerField('是否存在附件')
exist_quoto_info = TextAreaField('是否存在引用信息')
residue_date = TextAreaField('剩余天数')
whether_approval = StringField('是否审批', validators=[Length(max=10)])
over_time_warning_flag = StringField('超时预警标志', validators=[Length(max=10)])
create_no = StringField('创建编号', validators=[Length(max=20)])
return_visit_reason = TextAreaField('回访原因')
back_count = StringField('回退次数', validators=[Length(max=100)])
visit_adv_content = TextAreaField('走访建议内容')
is_dispatch_accurate = StringField('是否精准分派', validators=[Length(max=10)])
process_instance_id = StringField('流程实例ID', validators=[Length(max=100)])
knowledge_quote = TextAreaField('知识引用')
special_type = TextAreaField('特殊类型')
supervise_type = TextAreaField('监督类型')
leader_indicate = TextAreaField('领导批示')
case_solve = TextAreaField('处理结果')
result_satisfied = TextAreaField('结果满意度')
first_vist_satisfied = TextAreaField('首次走访满意度')
contact_timely = StringField('是否及时联系', validators=[Length(max=50)])
distribute_type = StringField('分派类型', validators=[Length(max=50)])
dept_type = TextAreaField('部门类型')
dept_name = TextAreaField('部门名称')
active_dept_name = StringField('当前处理部门名称', validators=[Length(max=50)])
org_id = StringField('组织ID', validators=[Length(max=50)])
org_name = TextAreaField('组织名称')
snapshot_time = DateTimeField('快照抓取时间')
def process(self, formdata=None, obj=None, **kwargs):
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class GovsOrderDetailBase(TD3iGovsOrderDetail, CommonModel):
"""省12345工单详情业务基类"""
FieldMapping = {
# ==================== 主键与关联 ====================
'master_id': 'masterId', # 关联工单主表ID(从外部传入)
'order_id': 'orderId', # 工单编号
'order_no': 'orderNo', # 工单号
'tenant_id': 'tenantId', # 租户ID
# ==================== 工单状态 ====================
'order_status': 'orderStatus', # 工单状态码
'order_status_for_view': 'orderStatusForView', # 工单状态显示值
'first_order_status': 'firstOrderStatus', # 一级状态编码
'secord_order_status': 'secordOrderStatus', # 二级状态编码
'atomic_order_status': 'atomicOrderStatus', # 原子状态编码
'order_invalid_type': 'orderInvalidType', # 工单作废原因
'order_finish_time': 'orderFinishTime', # 工单完成时间
# ==================== 诉求内容 ====================
'case_content': 'caseContent', # 诉求内容
'case_goal': 'caseGoal', # 诉求目的
'title': 'title', # 工单标题
'case_labels': 'caseLabels', # 工单标签列表
'case_public': 'casePublic', # 是否公开
'hotspot': 'hotspot', # 是否热点事件
# ==================== 紧急与保护 ====================
'case_is_urgent': 'caseIsUrgent', # 紧急程度(一般/紧急/特急)
'case_is_visit': 'caseIsVisit', # 是否回访(是/否)
'info_protect': 'infoProtect', # 信息保护(是/否)
# ==================== 诉求归口分类 ====================
'case_accord_type_one_name': 'caseAccordTypeOneName', # 诉求归口一级
'case_accord_type_two_name': 'caseAccordTypeTwoName', # 诉求归口二级
'case_accord_type_three_name': 'caseAccordTypeThreeName', # 诉求归口三级
'case_accord_type_four_name': 'caseAccordTypeFourName', # 诉求归口四级
'case_accord_type_five_name': 'caseAccordTypeFiveName', # 诉求归口五级
'case_accord_code': 'caseAccordCode', # 事项编码
# ==================== 归属单位 ====================
'first_level_affiliation': 'firstLevelAffiliation', # 一级归属单位
'second_level_affiliation': 'secondLevelAffiliation', # 二级归属单位
'third_level_affiliation': 'thirdLevelAffiliation', # 三级归属单位
'fourth_level_affiliation': 'fourthLevelAffiliation', # 四级归属单位
'fifth_level_affiliation': 'fifthLevelAffiliation', # 五级归属单位
'sixth_level_affiliation': 'sixthLevelAffiliation', # 六级归属单位
'seventh_level_affiliation': 'seventhLevelAffiliation', # 七级归属单位
'appeal_dept': 'appealDept', # 诉求部门
# ==================== 来源与平台 ====================
'order_source': 'orderSource', # 诉求来源(电话/互联网)
'order_source_detail': 'orderSourceDetail', # 诉求来源详情(12345/随手拍)
'order_source_for_view': 'orderSourceForView', # 诉求来源显示值
'belong_platform': 'belongPlatform', # 所属平台代码
'belong_platform_name': 'belongPlatformName', # 受理平台名称
'current_processing_platform': 'currentProcessingPlatform', # 当前处理平台
# ==================== 服务对象与类型 ====================
'service_object_type': 'serviceObjectType', # 服务对象类型(投诉举报/咨询/建议等)
'order_type': 'orderType', # 表单类型(个人/企业/其他)
'form_type': 'formType', # 表单类型代码
# ==================== 区域信息 ====================
'area_code_city': 'areaCodeCity', # 市区域代码
'area_code_area': 'areaCodeArea', # 区区域代码
'area_code_street': 'areaCodeStreet', # 街道区域代码
'address_detail': 'addressDetail', # 详细地址
'case_lnglat': 'caseLnglat', # 地理坐标
# ==================== 联系方式 ====================
'call_number': 'callNumber', # 来电号码
'call_number_for_dh': 'callNumberForDH', # 来电号码(脱敏)
'raw_call_numer': 'rawCallNumer', # 原始来电号码
'contact_number': 'contactNumber', # 联系电话
'raw_contact_number': 'rawContactNumber', # 原始联系电话
'contact_number_for_dh': 'contactNumberForDH', # 联系电话(脱敏)
'call_time': 'callTime', # 来电时间
'order_sound_record_id': 'orderSoundRecordId', # 通话记录ID
# ==================== 时间节点 ====================
'create_date': 'createDate', # 创建日期
'update_date': 'updateDate', # 更新日期
'plan_finish_time': 'planFinishTime', # 计划完成时间
'plan_sign_time': 'planSignTime', # 计划签收时间
# ==================== 判定与协调 ====================
'judgment_flag': 'judgmentFlag', # 判定标志
'is_coordination': 'isCoordination', # 是否协调
'coordination_time': 'coordinationTime', # 协调时间
# ==================== 第三方关联 ====================
'thrid_order_id': 'thridOrderId', # 第三方工单ID
'relate_order_ids': 'relateOrderIds', # 关联工单ID列表
'relate_order_count': 'relateOrderCount', # 关联工单数量
# ==================== 用户相关 ====================
'order_user_id': 'orderUserId', # 用户ID(身份证号)
'user_word': 'userWord', # 用户反馈
'show_flag': 'showFlag', # 显示标志
'origin_show': 'originShow', # 原始显示标志
# ==================== JSON 对象字段(需序列化存储) ====================
'order_user': 'orderUser', # 诉求人信息(JSON对象)
'order_phone_dto': 'orderPhoneDTO', # 电话号码信息(JSON对象)
'order_attachment_list': 'orderAttachmentList', # 附件列表(JSON数组)
'pre_process_list': 'preProcessList', # 预处理流程列表(JSON数组)
'tripartite_call_records': 'tripartiteCallRecords', # 三方通话记录(JSON对象)
'tripartite_call_records_list': 'tripartiteCallRecordsList', # 三方通话记录列表(JSON数组)
'order_custom_form_fields': 'orderCustomFormFields', # 自定义表单字段(JSON数组)
'knowledge_references': 'knowledgeReferences', # 知识参考(JSON对象)
'sound_recording_address_list': 'soundRecordingAddressList', # 录音文件路径列表(JSON数组)
'active_dept_ids': 'activeDeptIds', # 当前处理部门ID列表
'attachment_ids': 'attachmentIds', # 附件ID列表
'attachment_list': 'attachmentList', # 附件列表JSON
'contactor_list': 'contactorList', # 联系人列表(JSON数组)
'tsjb_entry_info': 'tsjbEntryInfo', # 投诉举报入口信息(JSON对象)
'order_erge_revoke_plug_dto_list': 'orderErgeRevokePlugDTOList', # 撤销插件信息(JSON数组)
'order_environmental': 'orderEnvironmental', # 环境信息(JSON对象)
'order_demands_dto': 'orderDemandsDTO', # 诉求DTOJSON对象)
'order_appeal_list': 'orderAppealList', # 申诉列表(JSON数组)
'torder_process_list': 'torderProcessList', # 流程列表(JSON数组)
'pre_process': 'preProcess', # 预处理信息(JSON对象)
'extension': 'extension', # 扩展字段
'remark': 'remark', # 备注
# ==================== 其他字段 ====================
'file_exist': None, # 是否存在附件(根据 orderAttachmentList 计算)
'exist_quoto_info': 'existQuotoInfo', # 是否存在引用信息
'residue_date': 'residueDate', # 剩余天数
'whether_approval': 'whetherApproval', # 是否审批
'over_time_warning_flag': 'overTimeWarningFlag', # 超时预警标志
'create_no': 'createNo', # 创建编号
'return_visit_reason': 'returnVisitReason', # 回访原因
'back_count': 'backCount', # 回退次数
'visit_adv_content': 'visitAdvContent', # 走访建议内容
'is_dispatch_accurate': 'isDispatchAccurate', # 是否精准分派
'process_instance_id': 'processInstanceId', # 流程实例ID
'knowledge_quote': 'knowledgeQuote', # 知识引用
'special_type': 'specialType', # 特殊类型
'supervise_type': 'superviseType', # 监督类型
'leader_indicate': 'leaderIndicate', # 领导批示
'case_solve': 'caseSolve', # 处理结果
'result_satisfied': 'resultSatisfied', # 结果满意度
'first_vist_satisfied': 'firstVistSatisfied', # 首次走访满意度
'contact_timely': 'contactTimely', # 是否及时联系
'distribute_type': 'distributeType', # 分派类型
'dept_type': 'deptType', # 部门类型
'dept_name': 'deptName', # 部门名称
'active_dept_name': 'activeDeptName', # 当前处理部门名称
'org_id': 'orgId', # 组织ID
'org_name': 'orgName', # 组织名称
}
@classmethod
async def exist_other(cls, id: Union[str, int], order_id: str = None, order_no: str = None):
"""检查是否存在除当前详情外的其他同编号工单详情"""
_query = select(cls).where(cls.id != id)
if order_id:
_query = _query.where(cls.order_id == order_id)
if order_no:
_query = _query.where(cls.order_no == order_no)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def find_by_ids(cls, ids: list[Union[str, int]]):
"""根据ID列表批量查找工单详情"""
_query = select(cls).where(cls.id.in_(ids))
_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _list
@classmethod
async def is_exist(cls, order_id: str = None, order_no: str = None):
"""检查工单详情是否已经存在"""
_query = select(cls)
if order_id:
_query = _query.where(cls.order_id == order_id)
if order_no:
_query = _query.where(cls.order_no == order_no)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""按参数搜索工单详情的基础方法"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
_name_likes = {
cls.order_status.key: '%{}%',
cls.order_source.key: '%{}%',
cls.case_accord_type_one_name.key: '%{}%',
cls.belong_platform_name.key: '%{}%',
}
_query = select(cls).where(
*cls.search_wheres(likes=_name_likes, **kwargs)
).group_by(cls.id)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.create_date.desc())
_df = await cls.query_as_df(_data_query)
if not _df.empty:
_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
_df[cls.id.key] = _df[cls.id.key].astype(str)
return _df, _paging
@classmethod
async def search(cls, **kwargs):
"""按参数搜索工单详情,返回分页格式数据"""
_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count,
'rows': _df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number,
'page_count': _paging.page_count,
'page_size': _paging.page_size,
},
}
@classmethod
async def exists_order_id(cls, data_df: pd.DataFrame):
"""根据 order_id 判断数据是否存在"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
order_ids = data_df[cls.order_id.key].unique().tolist()
if not order_ids:
return pd.DataFrame(), data_df.copy()
_query = select(cls.id, cls.order_id).where(cls.order_id.in_(order_ids))
existing_df = await cls.query_as_df(_query)
if existing_df.empty:
return pd.DataFrame(), data_df.copy()
order_id_to_id_map = dict(zip(existing_df[cls.order_id.key], existing_df[cls.id.key]))
mask_exists = data_df[cls.order_id.key].isin(existing_df[cls.order_id.key])
exists_df = data_df[mask_exists].copy()
exists_df[cls.id.key] = exists_df[cls.order_id.key].map(order_id_to_id_map)
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@classmethod
async def find_by_master_id(cls, master_id: Union[str, int]):
"""根据主表ID查找工单详情"""
_query = select(cls).where(cls.master_id == master_id)
return await cls.query_first(_query)
@classmethod
async def find_by_order_id(cls, order_id: str):
"""根据工单编号查找详情"""
_query = select(cls).where(cls.order_id == order_id)
return await cls.query_first(_query)
@classmethod
async def find_latest_snapshot(cls, order_id: str):
"""查找最新的快照"""
_query = select(cls).where(cls.order_id == order_id).order_by(cls.snapshot_time.desc())
return await cls.query_first(_query)
@classmethod
async def find_by_master_ids(cls, master_ids: list[Union[str, int]]):
"""根据主表ID列表批量查找工单详情"""
_query = select(cls).where(cls.master_id.in_(master_ids))
_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _list
@register_swagger_model
class GovsOrderDetail(GovsOrderDetailBase):
"""省12345工单详情业务类"""
@classmethod
async def create(cls, user: RbacUser = None, **kwargs):
"""创建新工单详情"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsOrderDetailForm(formdata=kwargs)
_form.validate_form()
_existing = await cls.is_exist(
order_id=_form.order_id.data,
order_no=_form.order_no.data
)
assert _existing is None, "工单编号或工单号已存在,不能重复创建。"
_detail = cls().copy_from_dict(_form.data, skip_none=True).before_save()
_detail.snapshot_time = datetime.datetime.now()
if user:
_detail.created_by = user.username
_detail.updated_by = user.username
await _detail.async_save()
return _detail
@classmethod
async def delete(cls, detail_id: Union[str, int]):
"""删除工单详情"""
_detail: cls = await cls.async_find_by_id(detail_id)
assert _detail, f"根据 ID {detail_id} 未找到工单详情。"
_del_query = delete(cls).where(cls.id == _detail.id)
await cls.raw_execute(_del_query)
echo_log(f'已删除工单详情(工单号:{_detail.order_no}ID{_detail.id}.')
return _detail
@classmethod
async def modify(cls, detail_id: Union[str, int], user: RbacUser = None, **kwargs):
"""修改工单详情信息"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsOrderDetailForm(formdata=kwargs)
_form.validate_form()
_other = await cls.exist_other(
detail_id,
order_id=_form.order_id.data,
order_no=_form.order_no.data
)
assert _other is None, "工单编号或工单号已存在,不能重复修改。"
_detail: cls = await cls.async_find_by_id(detail_id)
assert _detail, f'查无此工单详情信息。'
_detail.copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_detail.updated_by = user.username
await _detail.async_save()
return _detail
@classmethod
async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量创建工单详情"""
if data_df.empty:
return 0
if user:
data_df['created_by'] = user.username
data_df['updated_by'] = user.username
records = data_df.to_dict('records')
details = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records]
session = cls.get_aio_session()
try:
session.add_all(details)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(details)} 条工单详情。")
return len(details)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量修改工单详情"""
if data_df.empty:
return 0
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}'")
return 0
data_df['updated_at'] = datetime.datetime.now()
if user:
data_df['updated_by'] = user.username
update_data = data_df.to_dict('records')
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条工单详情。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量保存数据,自动处理新建和更新"""
_exists_df, _latest_df = await cls.exists_order_id(data_df)
_created_count = await cls.create_batch(_latest_df, user)
_updated_count = await cls.modify_batch(_exists_df, user)
return _created_count, _updated_count
@classmethod
async def create_or_update(cls, user: RbacUser = None, **kwargs):
"""创建或更新工单详情(单条)"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsOrderDetailForm(formdata=kwargs)
_form.validate_form()
_existing = await cls.find_by_order_id(_form.order_id.data)
if _existing:
_existing.copy_from_dict(_form.data, skip_none=True).before_save()
_existing.snapshot_time = datetime.datetime.now()
if user:
_existing.updated_by = user.username
await _existing.async_save()
return _existing
_detail = cls().copy_from_dict(_form.data, skip_none=True).before_save()
_detail.snapshot_time = datetime.datetime.now()
if user:
_detail.created_by = user.username
_detail.updated_by = user.username
await _detail.async_save()
return _detail