Files
d3i-szct/models/govs_order_user.py
2026-06-02 17:46:38 +08:00

329 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
import datetime
from typing import Union
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, IntegerField
from wtforms.validators import Length
from models.common_model import CommonModel
from models.db_models import TD3iGovsOrderUser
from paste.core.logging import echo_log
from paste.rbac.rbac_user import RbacUser
from paste.web.form import ModelForm
class GovsOrderUserForm(ModelForm):
"""服务对象信息表单验证类"""
id = IntegerField('服务对象唯一ID')
master_id = IntegerField('关联工单主表ID')
order_id = StringField('工单编号', validators=[Length(max=50)])
order_no = StringField('工单号', validators=[Length(max=50)])
customer_id = StringField('服务对象ID', validators=[Length(max=50)])
customer_name = StringField('姓名', validators=[Length(max=50)])
customer_sex = StringField('性别', validators=[Length(max=10)])
customer_connect_phone = StringField('联系电话', validators=[Length(max=20)])
customer_credentials_type = StringField('证件类型', validators=[Length(max=20)])
customer_credentials_no = StringField('证件号码', validators=[Length(max=50)])
customer_address = StringField('联系地址', validators=[Length(max=200)])
customer_email = StringField('电子邮箱', validators=[Length(max=100)])
customer_age = IntegerField('年龄')
customer_occupation = StringField('职业', validators=[Length(max=50)])
def process(self, formdata=None, obj=None, **kwargs):
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class GovsOrderUserBase(TD3iGovsOrderUser, CommonModel):
"""服务对象信息业务基类"""
FieldMapping = {
# ==================== 主键与关联 ====================
'id': 'id', # 服务对象唯一ID
'master_id': 'masterId', # 关联工单主表ID(需从外部传入)
'order_id': 'orderId', # 工单编号
'tenant_id': 'tenantId', # 租户ID
'area_code': 'areaCode', # 区域代码
# ==================== 基本信息 ====================
'customer_name': 'customerName', # 姓名
'raw_customer_name': 'rawCustomerName', # 原始姓名
'customer_sex': 'customerSex', # 性别(男/女/未知)
'customer_type': 'customerType', # 客户类型(个人/企业)
'customer_age_range': 'customerAgeRange', # 年龄段
# ==================== 联系方式 ====================
'customer_connect_phone': 'customerConnectPhone', # 联系电话
'raw_customer_connect_phone': 'rawCustomerConnectPhone', # 原始联系电话
'customer_incoming_phone': 'customerIncomingPhone', # 来电号码
'raw_customer_incoming_phone': 'rawCustomerIncomingPhone', # 原始来电号码
'customer_phone_backup': 'customerPhoneBackup', # 备用电话
'raw_customer_phone_backup': 'rawCustomerPhoneBackup', # 原始备用电话
'customer_phone_backup_for_dh': 'customerPhoneBackupForDH', # 备用电话(脱敏)
'customer_internet_nickname': 'customerInternetNickname', # 网名
'customer_email': 'customerEmail', # 电子邮箱
# ==================== 证件信息 ====================
'customer_credentials_type': 'customerCredentialsType', # 证件类型(如:身份证、护照)
'customer_credentials_no': 'customerCredentialsNo', # 证件号码
'raw_customer_credentials_no': 'rawCustomerCredentialsNo', # 原始证件号码
# ==================== 企业信息 ====================
'enterprise_type': 'enterpriseType', # 企业类型
'enterprise_name': 'enterpriseName', # 企业名称
'enterprise_register_address': 'enterpriseRegisterAddress', # 企业注册地址
'enterprise_address': 'enterpriseAddress', # 企业地址
'enterprise_credit_code': 'enterpriseCreditCode', # 企业信用代码
# ==================== 系统字段 ====================
'delete_flag': 'deleteFlag', # 删除标志(0-未删除,1-已删除)
# ==================== 系统信息 ====================
'created_at': 'createDate',
'created_by': 'createBy',
'updated_at': 'updateDate',
'updated_by': 'updateBy',
}
@classmethod
async def find_by_order_id(cls, order_id: str):
"""根据工单编号查找服务对象"""
_query = select(cls).where(cls.order_id == order_id)
return await cls.query_first(_query)
@classmethod
async def find_by_master_id(cls, master_id: Union[str, int]):
"""根据主工单ID查找服务对象"""
_query = select(cls).where(cls.master_id == master_id)
return await cls.query_first(_query)
@classmethod
async def find_by_customer_id(cls, customer_id: str):
"""根据服务对象ID查找"""
_query = select(cls).where(cls.customer_id == customer_id)
return (await cls.orm_execute_scalars(_query)).all()
@classmethod
async def find_by_phone(cls, phone: str):
"""根据联系电话查找"""
_query = select(cls).where(cls.customer_connect_phone == phone)
return (await cls.orm_execute_scalars(_query)).all()
@classmethod
async def exists_order_id(cls, data_df: pd.DataFrame):
"""根据 order_id 判断数据是否存在
:param data_df: 输入的数据框架,必须包含 id 和 order_id 列
:return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame)
- exists_df: 在数据库中存在的记录(附带数据库中的 id)
- latest_df: 在数据库中不存在的记录
"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
# 获取待查询的 (id, order_id) 组合
pairs = data_df[[cls.id.key, cls.order_id.key]].drop_duplicates().values.tolist()
if not pairs:
return pd.DataFrame(), data_df.copy()
# 查询数据库中已存在的记录(使用 IN 批量查询)
_query = select(cls.id, cls.order_id).where(
(cls.id.in_([p[0] for p in pairs])) &
(cls.order_id.in_([p[1] for p in pairs]))
)
exists_db_df = await cls.query_as_df(_query)
if exists_db_df.empty:
return pd.DataFrame(), data_df.copy()
exists_db_df[cls.id.key] = exists_db_df[cls.id.key].astype(str)
exists_db_df[cls.order_id.key] = exists_db_df[cls.order_id.key].astype(str)
# 构建 (id, order_id) -> id 的映射(用于快速查找)
key_to_id_map = dict(zip(
zip(exists_db_df[cls.id.key], exists_db_df[cls.order_id.key]),
exists_db_df[cls.id.key]
))
# 标记 data_df 中哪些行在数据库中存在
mask_exists = data_df.apply(
lambda row: (row[cls.id.key], row[cls.order_id.key]) in key_to_id_map,
axis=1
)
# 提取存在的记录,并补充数据库中的 id(虽然输入中已有 id,但为一致性保留)
exists_df = data_df[mask_exists].copy()
exists_df[cls.id.key] = exists_df.apply(
lambda row: key_to_id_map[(row[cls.id.key], row[cls.order_id.key])],
axis=1
)
# 提取不存在的记录
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@register_swagger_model
class GovsOrderUser(GovsOrderUserBase):
"""服务对象信息业务类"""
@classmethod
async def create(cls, user: RbacUser = None, **kwargs):
"""创建服务对象信息"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsOrderUserForm(formdata=kwargs)
_form.validate_form()
# 检查是否已存在(根据 order_id)
_existing = await cls.find_by_order_id(_form.order_id.data)
if _existing:
# 更新已有记录
_existing.copy_from_dict(_form.data, skip_none=True)
if user:
_existing.updated_by = user.username
await _existing.async_save()
return _existing
_obj = cls().copy_from_dict(_form.data, skip_none=True)
if user:
_obj.created_by = user.username
_obj.updated_by = user.username
await _obj.async_save()
return _obj
@classmethod
async def delete(cls, obj_id: Union[str, int]):
"""删除服务对象信息"""
_obj: cls = await cls.async_find_by_id(obj_id)
assert _obj, f"根据 ID {obj_id} 未找到服务对象信息。"
_del_query = delete(cls).where(cls.id == _obj.id)
await cls.raw_execute(_del_query)
echo_log(f'已删除服务对象信息(order_id{_obj.order_id}ID{_obj.id}.')
return _obj
@classmethod
async def modify(cls, obj_id: Union[str, int], user: RbacUser = None, **kwargs):
"""修改服务对象信息"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsOrderUserForm(formdata=kwargs)
_form.validate_form()
_obj: cls = await cls.async_find_by_id(obj_id)
assert _obj, f'查无此服务对象信息。'
_obj.copy_from_dict(_form.data, skip_none=True)
if user:
_obj.updated_by = user.username
await _obj.async_save()
return _obj
@classmethod
async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量创建服务对象信息"""
if data_df.empty:
return 0
if user:
data_df['created_by'] = user.username
data_df['updated_by'] = user.username
records = data_df.to_dict('records')
objs = [cls().copy_from_dict(record, skip_none=True) for record in records]
session = cls.get_aio_session()
try:
session.add_all(objs)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(objs)} 条服务对象信息。")
return len(objs)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量修改服务对象信息"""
if data_df.empty:
return 0
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}'")
return 0
data_df['updated_at'] = datetime.datetime.now()
if user:
data_df['updated_by'] = user.username
update_data = data_df.to_dict('records')
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条服务对象信息。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None):
"""批量保存数据,自动处理新建和更新"""
_exists_df, _latest_df = await cls.exists_order_id(data_df)
_created_count = await cls.create_batch(_latest_df, user)
_updated_count = await cls.modify_batch(_exists_df, user)
return _created_count, _updated_count
@classmethod
async def create_or_update_by_order_id(cls, user: RbacUser = None, **kwargs):
"""根据 order_id 创建或更新服务对象信息"""
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = GovsOrderUserForm(formdata=kwargs)
_form.validate_form()
_existing = await cls.find_by_order_id(_form.order_id.data)
if _existing:
_existing.copy_from_dict(_form.data, skip_none=True)
if user:
_existing.updated_by = user.username
await _existing.async_save()
return _existing
_obj = cls().copy_from_dict(_form.data, skip_none=True)
if user:
_obj.created_by = user.username
_obj.updated_by = user.username
await _obj.async_save()
return _obj
@classmethod
async def delete_by_order_id(cls, order_id: str):
"""根据工单编号删除服务对象信息"""
_obj = await cls.find_by_order_id(order_id)
if _obj:
await cls.delete(_obj.id)
return _obj