import pickle from typing import Optional, Awaitable from sqlalchemy import select from paste.rbac.rbac_assignment import RbacAssignment from paste.rbac.rbac_item import RbacItem from paste.rbac.rbac_models import RbacUserModel from paste.rbac.rbac_permission import RbacPermission from paste.rbac.rbac_role import RbacRole from paste.security import shash Supervisors = ('administrator', 'root', 'supervisor') """ 超级管理员名称,这些名称不能用于一般用户。 """ class RbacUser(RbacUserModel): """ RBAC 用户类。 """ STATUS_DEFAULT = 0b00000000000000000000000000000 """ 用户默认状态:0。 """ STATUS_ENABLED = 0b00000000000000000000000000001 """ 用户激活状态:1。 """ STATUS_DISABLED = 0b00000000000000000000000000010 """ 用户禁用状态:2。 """ STATUS_DELETED = 0b00000000000000000000000000100 """ 用户删除状态:4。 """ STATUS_LIST = [STATUS_DEFAULT, STATUS_ENABLED, STATUS_DISABLED, STATUS_DELETED] """ 允许的所有用户状态。 """ STATUS_DESCRIPTION = { STATUS_DEFAULT: '默认', STATUS_ENABLED: '激活', STATUS_DISABLED: '已禁用', STATUS_DELETED: '已删除', } """ 用户状态描述。 """ TYPE_USER = 'user' """ 用户类型:用户。 """ TYPE_ADMINISTRATOR = 'admin' """ 用户类型:管理员。 """ @classmethod async def import_supervisors(cls): """ 导入超级用户。 :return: 初始化状态 """ # 查出已经有的超级用户 _user_model_list: list[cls] = await cls(**{cls.username.key: Supervisors}).async_find() _user_model_dict: dict[str: cls] = {_user.username: _user for _user in _user_model_list} init_status: bool = True for _username in Supervisors: _user: cls = _user_model_dict.get(_username, None) if _user is None: # 用户不存在,创建超级用户 _save_status, _ = await cls.create(username=_username, password=_username) return init_status @classmethod async def create(cls, username: str, password: str): """ 创建用户。 :param username: 用户名 :param password: 密码 :return: 保存状态 """ assert username is not None and password is not None, '必须提供用户名和密码.' _usr_model = cls() _usr_model.before_save() _usr_model.username = username _usr_model.password_hash = shash.generate_password_hash(pwd=password) _status = await _usr_model.async_save() return _status, _usr_model @classmethod async def find_by_username(cls, username: str): """ 根据用户名查找用户。 :param username: 用户名 :return: 用户对象 """ query = select(cls).where(cls.username == username) model = await cls.query_first(query) return model @classmethod def status_description(cls, status): """ 取得状态描述。 :param status: :return: """ if status in cls.STATUS_DESCRIPTION: return cls.STATUS_DESCRIPTION[status] else: return '状态未知' async def assign(self, item_names: set[str]): """ 为用户分配权限。这里调用了权限分配器的分配方法,自动剔除已经分配过的角色或权限。 :param item_names: 授权项名称列表 """ await RbacAssignment.assign(user_id=self.id, item_names=item_names) async def can(self, permission_name: str, **kwargs) -> bool: """ 验证用户是否具有 permission_name 权限。 该方法主要是调用 :class:`RbacRule` 的 run() 方法,执行规则检验。 在执行规则验证的时候是自底向上,查询父 RbacItem 中的规则,并调用其 run 方法。调用时不分先后,随机执行。 只要有一个规则返回 False,则后续规则方法不再继续执行。 :param permission_name: 权限名称 :param kwargs: 可选参数,传递给规则的 run 方法的参数 :return: 验证状态 """ from paste.rbac.rbac_rule import RbacRule # 如果用户没有初始化,直接禁止 if self.username is None: return False # 如果是超级用户,直接返回 True if self.is_supervisors(): return True # 取得所有授权项,然后取得所有对应规则 _item_names = await self.get_all_permissions() _rule_list: list[RbacRule] = await RbacRule.find_by_item_names(_item_names) # 取出授权项 _permission: RbacPermission = await RbacPermission.find_by_name(permission_name) # 遍历执行 run 方法 for _rule in _rule_list: # 还原持久化数据到对象 _rule: RbacRule = pickle.loads(_rule.data) # 执行 run 方法,若是协程,则继续等待协程完成 _result = _rule.run(rbac_user=self, rbac_permission=_permission, **kwargs) if isinstance(_result, Awaitable): _result = await _result if not _result: return False return True async def get_all_roles(self): """ 取得所有角色名称。 :return: 角色名称列表 """ _roles = await self.roles() _role_names = [_r.name for _r in _roles] return _role_names async def get_all_permissions(self): """ 取得所有授权项的名称,包含角色和权限。 :return: 授权项名称列表 """ _dir_perms = await self.get_direct_permissions() _inh_perms = await self.get_inherit_permissions(direct_perms=_dir_perms) _dir_perms = _dir_perms.union(_inh_perms) return _dir_perms async def get_direct_permissions(self): """ 通过权限分配器查询用户直接拥有的角色和权限。 :return: 授权项名称列表 """ _query = select(RbacAssignment.item_name, RbacAssignment.user_id).where(RbacAssignment.user_id == self.id) _item_names = await RbacAssignment.query_all(_query) return set(_item_names) async def get_inherit_permissions(self, direct_perms: Optional[set[str]] = None): """ 通过直接拥有的权限查询继承而来的角色和权限。 继承而来的权限包括用户所有角色的权限及其子权限。 :param direct_perms: 用户直接拥有的权限名称列表 :return: 授权项名称列表 """ # 若不提供用户直接拥有的权限,则查询 if direct_perms is None: direct_perms = await self.get_direct_permissions() _perm_names = await RbacItem.all_children(item_names=direct_perms) return _perm_names async def has_permission(self, permission_name: str): """ 验证用户是否有执行某个路由的权限。 :param permission_name: 路由模式,一般为 route_pattern :return: 是否有权限 """ _permissions = await self.get_all_permissions() return permission_name in _permissions async def roles(self): """ 通过中间关系直接查询用户的所有角色。 :return: 角色列表 """ _query = select(RbacRole).join( RbacAssignment, RbacAssignment.item_name == RbacRole.name ).where( RbacRole.type == RbacRole.TYPE_ROLE, RbacAssignment.user_id == self.id ) _roles: list[RbacRole] = await RbacRole.query_all(_query) return _roles def is_supervisors(self): """ 检查是否是超级用户。 :return: 是超级用户放回 True,否则返回 False """ return self.username in Supervisors