Files
d3i-szct/models/govs_push_status.py
2026-06-02 17:46:38 +08:00

402 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
from typing import Union, Optional, Callable
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from paste.core.logging import echo_log
from paste.util.pagination import Pagination
import models
from models.common_model import CommonModel
from models.db_models import TD3iGovsPushStatu
class GovsPushStatuBase(TD3iGovsPushStatu, CommonModel):
"""
推送状态基础类(完全映射 TD3iGovsPushStatu 字段)。
封装所有与推送OA状态相关的通用操作方法。
"""
# 无字段名映射需求,保持原样
FieldMapping = {}
@classmethod
async def exist_other(cls, id: Union[str, int], govs_task_id: Union[str, int]):
"""
检查是否存在除当前记录外的其他同任务ID的推送状态记录。
:param id: 当前记录ID
:param govs_task_id: 任务ID(唯一标志)
:return: 存在返回记录对象,不存在返回None
"""
_query = select(cls).where(cls.id != id, cls.master_id == govs_task_id)
_record: cls = await cls.query_first(_query)
return _record
@classmethod
async def find_by_ids(cls, ids: list[Union[str, int]]):
"""
根据ID列表批量查找推送状态数据。
"""
_query = select(cls).where(cls.id.in_(ids))
_record_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _record_list
@classmethod
async def is_exist(cls, govs_task_id: Union[str, int]):
"""
检查推送状态是否已经存在(根据任务ID)。
"""
_query = select(cls).where(cls.master_id == govs_task_id)
_record: cls = await cls.query_first(_query)
return _record
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""
按参数搜索推送状态数据的基础方法。
支持字段:
- govs_task_id, push_order_status, push_order_detail_status, ...
- 不支持模糊匹配(均为整型状态码)
:param is_paging: 是否分页
:param kwargs: 查询参数
:key int page_number: 页码(缺省随机1~100
:key int page_size: 每页数量(缺省20
:key dict sort_clause: 排序配置,如 {'updated_at': 'desc'}
:key int govs_task_id: 精确匹配任务ID
:key int push_order_status: 精确匹配推送待办工单状态
:key int push_order_attachment_status: 精确匹配附件状态
:key int push_order_detail_status: 精确匹配扩展信息状态
:key int push_order_process_status: 精确匹配文件上传状态
:return: (DataFrame, Pagination)
"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
# 无模糊字段,仅精确匹配
_query = select(cls).where(
*cls.search_wheres(**kwargs)
)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.updated_at.desc())
_record_df = await cls.query_as_df(_data_query)
if not _record_df.empty:
_record_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
return _record_df, _paging
@classmethod
async def search(cls, **kwargs):
"""
按参数搜索推送状态数据,返回分页格式数据。
"""
_record_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count,
'rows': _record_df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number,
'page_count': _paging.page_count,
'page_size': _paging.page_size,
},
}
@classmethod
async def exists_relation(cls, data_df: pd.DataFrame):
"""
查找 data_df 中在数据库中已存在和不存在的记录。根据 govs_task_id 判断。
:param data_df: 输入的数据框架,必须包含 govs_task_id 列
:return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame)
- exists_df: 在数据库中存在的记录
- latest_df: 在数据库中不存在的记录
"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
# 获取待查询的 govs_task_id 组合
task_ids = data_df[cls.master_id.key].unique().tolist()
if not task_ids:
return pd.DataFrame(), data_df.copy()
# 查询数据库中已存在的记录
_query = select(cls.id, cls.master_id).where(cls.master_id.in_(task_ids))
exists_df = await cls.query_as_df(_query)
exists_df[cls.master_id.key] = exists_df[cls.master_id.key].astype(str)
if exists_df.empty:
return pd.DataFrame(), data_df.copy()
# 构建 govs_task_id -> id 的映射
key_to_id_map = dict(zip(exists_df[cls.master_id.key], exists_df[cls.id.key]))
# 根据 govs_task_id 是否在数据库中划分数据
mask_exists = data_df[cls.master_id.key].isin(exists_df[cls.master_id.key])
exists_df = data_df[mask_exists].copy()
exists_df[cls.id.key] = exists_df[cls.master_id.key].map(key_to_id_map)
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@register_swagger_model
class GovsPushStatus(GovsPushStatuBase):
"""
推送状态业务模型类(主业务类,完全继承 TD3iGovsPushStatu 字段)。
"""
@classmethod
async def create(cls, **kwargs):
"""
创建新的推送状态记录。
业务流程:
1. 使用 kwargs 直接构造对象(无需表单验证,因无前端交互)
2. 检查是否已存在相同任务ID的记录(避免重复)
3. 创建新记录对象
4. 设置创建者和更新者为 'D3I'
5. 保存到数据库
6. 返回创建的对象
:param kwargs: 推送状态参数字典
:return: 新建推送状态对象
:rtype: TD3iGovsPushStatu
:raises AssertionError: 当记录已存在时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
# 检查是否已存在同任务ID的记录
_record: cls = await cls.is_exist(kwargs.get('govs_task_id'))
assert _record is None, "相同任务ID的推送状态已存在,不能重复创建。"
# 创建记录对象
_record = cls().copy_from_dict(kwargs, skip_none=True).before_save()
# 强制设置创建者和更新者为 'D3I'
_record.created_by = 'D3I'
_record.updated_by = 'D3I'
await _record.async_save()
return _record
@classmethod
async def delete(cls, id: Union[str, int]):
"""
删除推送状态记录(软删除,不实际删除,仅用于逻辑隔离)。
注意:此系统建议保留历史记录,删除操作仅为标记。
业务流程:
1. 根据ID查找记录
2. 验证存在性
3. 执行物理删除(因无软删除字段,此处直接删除)
:param id: 要删除的记录ID
:return: 删除的记录ID
:rtype: int
:raises AssertionError: 当记录不存在时抛出
"""
_record: cls = await cls.async_find_by_id(id)
assert _record, f"根据 ID {id} 未找到推送状态记录。"
# 执行物理删除
_del_query = delete(cls).where(cls.id == id)
_del_count = (await cls.raw_execute(_del_query)).rowcount
echo_log(f'已删除推送状态记录(ID{id}.')
return _del_count
@classmethod
async def modify(cls, id: Union[str, int], **kwargs):
"""
修改已有推送状态信息。
注意:仅允许更新状态码字段,不允许修改 id、created_at、created_by 等系统字段。
业务流程:
1. 处理字符串字段去除空格
2. 查询原记录
3. 验证存在性
4. 更新允许字段
5. 设置 updated_by = 'D3I'
6. 保存到数据库
7. 返回更新后的对象
:param id: 要修改的记录ID
:param kwargs: 需要更新的字段(仅限状态字段)
:return: 修改后的推送状态对象
:rtype: GovsPushStatus
:raises AssertionError: 当记录不存在时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
# 查询原记录
_record: cls = await cls.async_find_by_id(id)
assert _record, f"根据 ID {id} 未找到推送状态记录。"
# 允许更新的字段(仅状态码)
allowed_fields = {
'push_order_status',
'push_order_detail_status',
'push_order_attachment_status',
'push_order_process_status'
}
# 过滤合法字段
update_data = {k: v for k, v in kwargs.items() if k in allowed_fields and v is not None}
if not update_data:
return _record
# 更新字段
_record.copy_from_dict(update_data, skip_none=True)
_record.updated_by = 'D3I'
await _record.async_save()
return _record
@classmethod
async def create_batch(cls, data_df: pd.DataFrame):
"""
批量创建新推送状态记录(传入数据应为全新记录,无需校验是否存在)。
:param data_df: 包含推送状态数据的 DataFrame,字段需与模型属性匹配(如 govs_task_id, push_order_status 等)
:return: 成功创建的记录数量
:rtype: int
"""
if data_df.empty:
return 0
# 一次性转为字典列表(C 层高效)
records = data_df.to_dict('records')
# 用列表推导式构造对象
records = [
cls().copy_from_dict(record, skip_none=True).before_save()
for record in records
]
# 批量插入
session = cls.get_aio_session()
try:
session.add_all(records)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(records)} 条推送状态记录。")
return len(records)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame):
"""
批量修改已有推送状态记录。
:param data_df: 包含推送状态数据的 DataFrame,必须包含 id 列
:return: 成功更新的记录数量
:rtype: int
"""
if data_df.empty:
return 0
# 必须包含 id 列
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}' 列(主键)")
return 0
# 转换为字典列表
update_data = data_df.to_dict('records')
# 使用 bulk_update_mappings
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条推送状态记录。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame):
"""
批量保存数据,自动处理新建和更新。
:param data_df: 要保存的数据框架
:return 新建和更新的数量
"""
# 筛选数据状态
_exists_df, _latest_df = await GovsPushStatus.exists_relation(data_df)
# 保存到数据库
_created_count = await GovsPushStatus.create_batch(_latest_df)
_updated_count = await GovsPushStatus.modify_batch(_exists_df)
return _created_count, _updated_count
@classmethod
async def set_push_order_status(cls, govs_task_id: Union[str, int], status: int = 1):
govs_task: cls = await cls(master_id=govs_task_id).async_find_first()
if govs_task:
govs_task.push_order_status = status
else:
govs_task = cls(master_id=govs_task_id, push_order_status=status)
# 保存数据
await govs_task.async_save()
@classmethod
async def set_push_order_detail_status(cls, govs_task_id: Union[str, int], status: int = 1):
govs_task: cls = await cls(master_id=govs_task_id).async_find_first()
if govs_task:
govs_task.push_order_detail_status = status
else:
govs_task = cls(master_id=govs_task_id, push_order_detail_status=status)
# 保存数据
await govs_task.async_save()
@classmethod
async def set_push_order_attachment_status(cls, govs_task_id: Union[str, int], status: int = 1):
govs_task: cls = await cls(master_id=govs_task_id).async_find_first()
if govs_task:
govs_task.push_order_attachment_status = status
else:
govs_task = cls(master_id=govs_task_id, push_order_attachment_status=status)
# 保存数据
await govs_task.async_save()
@classmethod
async def set_push_order_process_status(cls, govs_task_id: Union[str, int], status: int = 1):
govs_task: cls = await cls(master_id=govs_task_id).async_find_first()
if govs_task:
govs_task.push_order_process_status = status
else:
govs_task = cls(master_id=govs_task_id, push_order_process_status=status)
# 保存数据
await govs_task.async_save()