Files
d3i-szct/models/govc_task.py
T
2026-06-02 17:46:38 +08:00

637 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
import random
from typing import Union
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, TextAreaField, IntegerField, DateTimeField
from wtforms.validators import Length
import models
from models.common_model import CommonModel
from models.db_models import TD3iGovcTask
from paste.core.logging import echo_log
from paste.rbac.rbac_user import RbacUser
from paste.util.pagination import Pagination
from paste.web.form import ModelForm
class GovcTaskForm(ModelForm):
"""
市12345工单表单验证类(完全映射 TD3iGovcTask 字段)。
用于验证和处理市12345工单主表的创建/修改表单数据。
字段完全映射数据库表 t_d3i_govc_task 的字段结构。
"""
# 基础信息
id = IntegerField('记录ID')
evl_result = StringField('结果满意度', validators=[Length(max=64, message='结果满意度长度不能超过64字符')])
finish_result = TextAreaField('办结结果', validators=[Length(max=65535, message='办结结果长度不能超过65535字符')])
serial_num = StringField('工单编号', validators=[Length(max=64, message='工单编号长度不能超过64字符')])
t_status = StringField('任务单状态', validators=[Length(max=64, message='任务单状态长度不能超过64字符')])
accord_type = StringField('归口类型', validators=[Length(max=255, message='归口类型长度不能超过255字符')])
create_date = DateTimeField('交办时间')
back_time_bf = DateTimeField('拒绝时限')
sub_handle_ou_name = StringField('子处办单位',
validators=[Length(max=255, message='子处办单位长度不能超过255字符')])
sign_time_bf = IntegerField('签收时限时间戳')
is_leaf = StringField('是否叶子节点', validators=[Length(max=32, message='是否叶子节点长度不能超过32字符')])
row_guid = StringField('rowguid', validators=[Length(max=64, message='rowguid长度不能超过64字符')])
c_guid = StringField('查询详情使用guid', validators=[Length(max=64, message='查询详情使用guid长度不能超过64字符')])
finish_time = IntegerField('办结时间戳')
sign_time = IntegerField('签收时间戳')
is_secret = StringField('是否保密', validators=[Length(max=32, message='是否保密长度不能超过32字符')])
finish_time_bf = DateTimeField('办结时限')
link_number = StringField('联系号码', validators=[Length(max=64, message='联系号码长度不能超过64字符')])
pvi_guid = StringField('查询详情使用pviguid',
validators=[Length(max=64, message='查询详情使用pviguid长度不能超过64字符')])
rqst_type = StringField('诉求类型', validators=[Length(max=64, message='诉求类型长度不能超过64字符')])
rqst_content = TextAreaField('诉求内容', validators=[Length(max=65535, message='诉求内容长度不能超过65535字符')])
handle_ou_name = StringField('处办单位', validators=[Length(max=255, message='处办单位长度不能超过255字符')])
rqst_title = StringField('标题', validators=[Length(max=500, message='标题长度不能超过500字符')])
sign_person = StringField('签收人', validators=[Length(max=128, message='签收人长度不能超过128字符')])
rqst_person = StringField('诉求人', validators=[Length(max=128, message='诉求人长度不能超过128字符')])
rqs_channel = StringField('渠道来源', validators=[Length(max=64, message='渠道来源长度不能超过64字符')])
t_type = StringField('工单类型', validators=[Length(max=64, message='工单类型长度不能超过64字符')])
solve_situation = StringField('解决情况', validators=[Length(max=64, message='解决情况长度不能超过64字符')])
evl_style = StringField('态度满意度', validators=[Length(max=64, message='态度满意度长度不能超过64字符')])
send_opinion = TextAreaField('派送意见', validators=[Length(max=65535, message='派送意见长度不能超过65535字符')])
created_at = DateTimeField('创建时间')
created_by = StringField('创建者', validators=[Length(max=64, message='创建者长度不能超过64字符')])
updated_at = DateTimeField('更新时间')
updated_by = StringField('更新者', validators=[Length(max=64, message='更新者长度不能超过64字符')])
def process(self, formdata=None, obj=None, **kwargs):
"""
处理表单数据,在数据绑定前进行预处理。
主要功能:
- 遍历所有表单字段
- 对字符串类型的值去除两端空白字符
- 调用父类的process方法继续处理
"""
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class GovcTaskBase(TD3iGovcTask, CommonModel):
"""
市12345工单基础类(完全映射 TD3iGovcTask 字段)。
继承自数据库模型 TD3iGovcTask 和通用模型 CommonModel。
封装所有与市12345工单相关的通用操作方法。
"""
FieldMapping = {
'id': 'id',
'evl_result': 'evlresult',
'finish_result': 'finishresult',
'serial_num': 'serialnum',
't_status': 'tstatus',
'accord_type': 'accordtype',
'create_date': 'createdate',
'back_time_bf': 'backtime_bf',
'sub_handle_ou_name': 'subhandleouname',
'sign_time_bf': 'signtime_bf',
'is_leaf': 'isLeaf',
'row_guid': 'rowguid',
'c_guid': 'cguid',
'finish_time': 'finishtime',
'sign_time': 'signtime',
'is_secret': 'issecret',
'finish_time_bf': 'finishtime_bf',
'link_number': 'linknumber',
'pvi_guid': 'pviguid',
'rqst_type': 'rqsttype',
'rqst_content': 'rqstcontent',
'handle_ou_name': 'handleouname',
'rqst_title': 'rqsttitle',
'sign_person': 'signperson',
'rqst_person': 'rqstperson',
'rqs_channel': 'rqschannel',
't_type': 'ttype',
'solve_situation': 'solvesituation',
'evl_style': 'evlstyle',
'send_opinion': 'sendopinion'
}
"""
工单数据映射
"""
@classmethod
async def is_exist(cls, serial_num: str):
"""
检查工单记录是否已存在(根据工单编号)。
:param serial_num: 工单编号
:return: 存在返回对象,不存在返回None
"""
_query = select(cls).where(cls.serial_num == serial_num)
_task: cls = await cls.query_first(_query)
return _task
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""
按参数搜索工单数据的基础方法。
支持字段:
- serial_num, row_guid, c_guid, pvi_guid, t_status, t_type, is_leaf, is_secret
- 支持模糊匹配:rqst_content, finish_result, send_opinion, rqst_title
- 支持精确匹配:t_status, t_type, is_leaf, is_secret
:param is_paging: 是否分页
:param kwargs: 查询参数
:key int page_number: 页码(缺省随机1~100
:key int page_size: 每页数量(缺省20
:key dict sort_clause: 排序配置,如 {'serial_num': 'asc'}
:key str serial_num: 精确匹配工单编号
:key str row_guid: 精确匹配rowguid
:key str c_guid: 精确匹配查询详情使用guid
:key str pvi_guid: 精确匹配查询详情使用pviguid
:key str t_status: 精确匹配任务单状态
:key str t_type: 精确匹配工单类型
:key str is_leaf: 精确匹配是否叶子节点
:key str is_secret: 精确匹配是否保密
:key str rqst_content: 模糊匹配诉求内容
:key str finish_result: 模糊匹配办结结果
:key str send_opinion: 模糊匹配派送意见
:key str rqst_title: 模糊匹配标题
:return: (DataFrame, Pagination)
"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
# 模糊查询字段
_name_likes = {
cls.rqst_content.key: '%{}%',
cls.finish_result.key: '%{}%',
cls.send_opinion.key: '%{}%',
cls.rqst_title.key: '%{}%',
}
_query = select(cls).where(
*cls.search_wheres(likes=_name_likes, **kwargs)
).group_by(cls.id)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.serial_num, cls.id)
_task_df = await cls.query_as_df(_data_query)
if not _task_df.empty:
_task_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
_task_df[cls.id.key] = _task_df[cls.id.key].astype(str)
return _task_df, _paging
@classmethod
async def search(cls, **kwargs):
"""
按参数搜索工单数据,返回分页格式数据。
"""
_task_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count,
'rows': _task_df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number,
'page_count': _paging.page_count,
'page_size': _paging.page_size,
},
}
@classmethod
async def exists_serial_num(cls, data_df: pd.DataFrame):
"""
查找 data_df 中在数据库中已存在和不存在的记录。根据 serial_num 字段判断。
:param data_df: 输入的数据框架,必须包含 serial_num 列
:return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame)
- exists_df: 在数据库中存在的记录
- latest_df: 在数据库中不存在的记录
"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
# 获取待查询的 serial_num 列表(去重)
serial_nums = data_df[cls.serial_num.key].unique().tolist()
if not serial_nums:
return pd.DataFrame(), data_df.copy()
# 查询数据库中已存在的 serial_num
_query = select(cls.id, cls.serial_num).where(cls.serial_num.in_(serial_nums))
serial_nums_df = await cls.query_as_df(_query)
if serial_nums_df.empty:
return pd.DataFrame(), data_df.copy()
# 构建 serial_num -> id 的映射字典
serial_num_to_id_map = dict(zip(serial_nums_df[cls.serial_num.key], serial_nums_df[cls.id.key]))
# 根据 serial_num 是否在数据库中,划分数据
mask_exists = data_df[cls.serial_num.key].isin(serial_nums_df[cls.serial_num.key])
# 数据库已经有的记录
exists_df = data_df[mask_exists].copy()
# 自动补充从数据库查到的 id 字段
exists_df[cls.id.key] = exists_df[cls.serial_num.key].map(serial_num_to_id_map)
# 新的数据
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@register_swagger_model
class GovcTask(GovcTaskBase):
"""
市12345工单模型类(主业务类,完全继承 TD3iGovcTask 字段)。
---
description: 市12345工单主表接口
type: object
properties:
id:
description: 主键ID
type: integer
example: 1001
readOnly: true
evl_result:
description: 结果满意度
type: string
example: "满意"
maxLength: 64
finish_result:
description: 办结结果
type: string
example: "已完成诉求处理,用户反馈满意"
maxLength: 65535
serial_num:
description: 工单编号
type: string
example: "GOV20240501001"
maxLength: 64
t_status:
description: 任务单状态
type: string
example: "已办结"
maxLength: 64
accord_type:
description: 归口类型
type: string
example: "城市管理"
maxLength: 255
create_date:
description: 交办时间
type: string
format: date-time
example: "2024-01-15 10:30:00"
back_time_bf:
description: 拒绝时限
type: string
format: date-time
example: "2024-01-20 18:00:00"
sub_handle_ou_name:
description: 子处办单位
type: string
example: "XX街道办事处"
maxLength: 255
sign_time_bf:
description: 签收时限时间戳
type: integer
example: 1705324800000
is_leaf:
description: 是否叶子节点
type: string
example: "1"
maxLength: 32
row_guid:
description: rowguid
type: string
example: "8f9e7d6c-5b4a-3210-9876-abcdef123456"
maxLength: 64
c_guid:
description: 查询详情使用guid
type: string
example: "7e8d9c0b-1a2b-3c4d-5e6f-7890abcdef12"
maxLength: 64
finish_time:
description: 办结时间戳
type: integer
example: 1705843200000
sign_time:
description: 签收时间戳
type: integer
example: 1705411200000
is_secret:
description: 是否保密
type: string
example: "0"
maxLength: 32
finish_time_bf:
description: 办结时限
type: string
format: date-time
example: "2024-01-25 18:00:00"
link_number:
description: 联系号码
type: string
example: "13800138000"
maxLength: 64
pvi_guid:
description: 查询详情使用pviguid
type: string
example: "9d8c7b6a-5f4e-3d2c-1b0a-9876543210fe"
maxLength: 64
rqst_type:
description: 诉求类型
type: string
example: "投诉"
maxLength: 64
rqst_content:
description: 诉求内容
type: string
example: "XX小区垃圾堆积未及时清理,影响居民生活"
maxLength: 65535
handle_ou_name:
description: 处办单位
type: string
example: "XX区城市管理局"
maxLength: 255
rqst_title:
description: 标题
type: string
example: "XX小区垃圾清理问题"
maxLength: 500
sign_person:
description: 签收人
type: string
example: "张三"
maxLength: 128
rqst_person:
description: 诉求人
type: string
example: "李四"
maxLength: 128
rqs_channel:
description: 渠道来源
type: string
example: "12345热线"
maxLength: 64
t_type:
description: 工单类型
type: string
example: "民生类"
maxLength: 64
solve_situation:
description: 解决情况
type: string
example: "已解决"
maxLength: 64
evl_style:
description: 态度满意度
type: string
example: "满意"
maxLength: 64
send_opinion:
description: 派送意见
type: string
example: "请XX街道办事处尽快处理"
maxLength: 65535
created_at:
description: 创建时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-15 10:30:00"
readOnly: true
created_by:
description: 创建者用户名
type: string
example: "admin"
readOnly: true
updated_at:
description: 修改时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-16 14:25:00"
readOnly: true
updated_by:
description: 修改者用户名
type: string
example: "editor"
readOnly: true
"""
@classmethod
async def create(cls, user: RbacUser = None, **kwargs):
"""
创建新的工单记录。
业务流程:
1. 使用 GovcTaskForm 验证表单数据完整性
2. 检查是否已存在相同 serial_num 的记录(避免重复提交)
3. 创建新工单对象
4. 设置创建者和更新者为当前用户
5. 保存到数据库
6. 返回创建的对象
:param RbacUser user: 操作用户对象
:param kwargs: 工单参数字典
:return: 新建工单对象
:rtype: GovcTask
:raises AssertionError: 当记录已存在时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovcTaskForm(formdata=kwargs)
_form.validate_form()
# 检查是否已存在相同 serial_num 的记录
_existing = await cls.is_exist(_form.serial_num.data)
assert _existing is None, "该工单编号已存在记录,不能重复提交。"
# 创建对象
_task = cls().copy_from_dict(_form.data, skip_none=True).before_save()
if user:
_task.created_by = user.username
_task.updated_by = user.username
await _task.async_save()
return _task
@classmethod
async def delete(cls, task_id: Union[str, int]):
"""
删除工单记录。
业务流程:
1. 根据ID查找记录
2. 验证存在性
3. 执行删除
:param task_id: 要删除的工单记录ID
:return: 删除的记录对象
:rtype: GovcTask
:raises AssertionError: 当记录不存在时抛出
"""
_task: cls = await cls.async_find_by_id(task_id)
assert _task, f"根据 ID {task_id} 未找到工单记录。"
_del_query = delete(cls).where(cls.id == _task.id)
_del_count = (await cls.raw_execute(_del_query)).rowcount
echo_log(f'已删除工单记录(工单编号:{_task.serial_num}ID{_task.id}.')
return _task
@classmethod
async def modify(cls, task_id: Union[str, int], user: RbacUser = None, **kwargs):
"""
修改已有工单记录。
业务流程:
1. 将 task_id 添加到参数中
2. 处理字符串字段去除首尾空格
3. 使用 GovcTaskForm 验证表单数据
4. 查询原记录
5. 验证存在性
6. 更新字段并设置更新者
7. 保存到数据库
8. 返回更新后的对象
:param task_id: 要修改的工单记录ID
:param RbacUser user: 操作用户对象
:param kwargs: 需要更新的字段
:return: 修改后的工单对象
:rtype: GovcTask
:raises AssertionError: 当记录不存在时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
# 表单验证
_form = GovcTaskForm(formdata=kwargs)
_form.validate_form()
# 查询原记录
_task: cls = await cls.async_find_by_id(task_id)
assert _task, f'查无此工单信息。'
# 更新字段
_task.copy_from_dict(_form.data, skip_none=True).before_save()
_task.updated_by = user.username
await _task.async_save()
return _task
@classmethod
async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""
批量创建工单记录(传入数据应为全新记录)。
:param data_df: 包含工单数据的 DataFrame
:param user: 操作用户对象,用于设置 created_by / updated_by
:return: 成功创建的数量
:rtype: int
"""
if data_df.empty:
return 0
if user:
data_df['created_by'] = user.username
data_df['updated_by'] = user.username
records = data_df.to_dict('records')
tasks = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records]
session = cls.get_aio_session()
try:
session.add_all(tasks)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(tasks)} 条工单记录。")
return len(tasks)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""
批量修改已有工单记录。
:param data_df: 包含工单数据的 DataFrame(必须包含 id 列)
:param user: 操作用户对象,用于设置 updated_by
:return: 成功更新的数量
:rtype: int
"""
if data_df.empty:
return 0
# 必须包含 id 列
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}' 列(主键)")
return 0
# 手动添加更新时间戳
data_df['updated_at'] = datetime.datetime.now()
# 添加更新者信息
if user:
data_df['updated_by'] = user.username
# 转换为字典列表
update_data = data_df.to_dict('records')
# 使用 bulk_update_mappings
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条工单记录。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""
批量保存工单数据,自动处理新建和更新。
:param data_df: 要保存的数据框架
:param user: 用户
:return: 新建和更新的数量
"""
# 筛选数据状态
_exists_df, _latest_df = await GovcTask.exists_serial_num(data_df)
# 保存到数据库
_created_count = await GovcTask.create_batch(_latest_df, user)
_updated_count = await GovcTask.modify_batch(_exists_df, user)
return _created_count, _updated_count