Files
2026-06-02 17:46:38 +08:00

222 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
from typing import Union
from sqlalchemy import select
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, IntegerField
from wtforms.validators import Length
from models.common_model import CommonModel
from models.db_models import TToken
from paste.core.logging import echo_log
from paste.rbac.rbac_user import RbacUser
from paste.web.form import ModelForm
class TTokenForm(ModelForm):
"""
Token 表单验证类(完全映射 TToken 字段)。
用于验证和处理认证令牌的创建/修改表单数据。
字段完全映射数据库表 t_token 的字段结构。
"""
# 主键
id = IntegerField('主键ID')
# 基础信息
platform = StringField('平台', validators=[Length(max=20, message='平台长度不能超过20字符')])
token = StringField('令牌', validators=[Length(max=500, message='令牌长度不能超过500字符')])
deleted = IntegerField('是否删除(0未删,1已删)')
# 创建与更新信息
creator = StringField('创建者', validators=[Length(max=64, message='创建者长度不能超过64字符')])
updater = StringField('更新者', validators=[Length(max=64, message='更新者长度不能超过64字符')])
def process(self, formdata=None, obj=None, **kwargs):
"""
处理表单数据,在数据绑定前进行预处理。
主要功能:
- 遍历所有表单字段
- 对字符串类型的值去除两端空白字符
- 调用父类的process方法继续处理
"""
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class TokenBase(TToken, CommonModel):
"""
Token 基础类(完全映射 TToken 字段)。
继承自数据库模型 TToken 和通用模型 CommonModel。
封装所有与认证令牌相关的通用操作方法。
"""
@classmethod
async def find_by_platform(cls, platform: str):
"""
根据平台查找 Token 记录。
:param platform: 平台
:return: Token 对象或 None
"""
_query = select(cls).where(cls.platform == platform, cls.deleted == 0)
_token: cls = await cls.query_first(_query)
assert _token, f'未找到可用 Token,平台:{platform}.'
return _token
@register_swagger_model
class TokenModel(TokenBase):
"""
Token 业务模型类(主业务类,完全继承 TToken 字段)。
---
description: 认证 Token
type: object
properties:
id:
description: 主键
type: integer
example: 1001
readOnly: true
platform:
description: 平台
type: string
example: "web"
maxLength: 20
token:
description: 令牌
type: string
example: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
maxLength: 500
deleted:
description: 是否删除(0未删,1已删)
type: integer
example: 0
creator:
description: 创建者
type: string
example: "admin"
maxLength: 64
create_time:
description: 创建时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-15 10:30:00"
readOnly: true
updater:
description: 更新者
type: string
example: "admin"
maxLength: 64
update_time:
description: 更新时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-16 14:25:00"
readOnly: true
"""
@classmethod
async def create(cls, user: RbacUser=None, **kwargs):
"""
创建新的 Token。
业务流程:
1. 使用 TTokenForm 验证表单数据完整性
2. 创建新 Token 对象
3. 设置创建者为当前用户
4. 保存到数据库
5. 返回创建的 Token 对象
:param RbacUser user: 操作用户对象
:param kwargs: Token 参数字典
:return: 新建 Token 对象
:rtype: TokenModel
:raises ValidationError: 当表单验证失败时抛出
"""
_token_form = TTokenForm(formdata=kwargs)
_token_form.validate_form()
# 创建 Token 对象
_token = cls().copy_from_dict(_token_form.data, skip_none=True).before_save()
if user:
_token.creator = user.username
_token.updater = user.username
else:
_token.creator = 'D3I'
_token.updater = 'D3I'
_token.deleted = 0
_token.create_time = datetime.datetime.now()
_token.update_time = datetime.datetime.now()
await _token.async_save()
return _token
@classmethod
async def delete(cls, token_id: Union[str, int]):
"""
逻辑删除 Token(设置 deleted=1)。
业务流程:
1. 根据ID查找 Token
2. 验证存在性
3. 设置 deleted=1
4. 保存更新
:param token_id: 要删除的 Token ID
:return: 更新后的 Token 对象
:rtype: TokenModel
:raises AssertionError: 当 Token 不存在时抛出
"""
_token: cls = await cls.async_find_by_id(token_id)
assert _token, f"根据 ID {token_id} 未找到 Token。"
_token.deleted = 1
_token.updater = "system"
await _token.async_save()
echo_log(f'已逻辑删除 TokenID{_token.id})。')
return _token
@classmethod
async def refresh(cls, platform: str, token: str, user: RbacUser=None):
"""
刷新 Token 信息(更新 token、updater、update_time)。
业务流程:
1. 使用 TTokenForm 验证更新字段
2. 查询原 Token 对象
3. 更新字段并设置更新者,更新后,删除状态自动变为可用
4. 保存到数据库
:param platform: 要刷新平台
:param token: 需要更新的 token
:param RbacUser user: 操作用户对象
:return: 更新后的 Token 对象
:rtype: TokenModel
:raises AssertionError: 当 Token 不存在时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
# 查询原 Token
_token: cls = await cls(platform=platform).async_find_first()
# 没找到,创建
if not _token:
_token = await cls.create(platform=platform, token=token, user=user)
return _token
# 找到,更新
_token.token = token
_token.deleted = 0
_token.update_time = datetime.datetime.now()
_token.updater = user.username if user else _token.updater
await _token.async_save()
return _token