Files
d3i-szct/paste-framework/paste/rbac/rbac_assignment.py
T

88 lines
3.3 KiB
Python

from typing import Union
from sqlalchemy import select, delete
from paste.rbac.rbac_models import RbacAssignmentModel
class RbacAssignment(RbacAssignmentModel):
"""
权限分配器,负责用户权限的分配。
允许为用户分配角色,也允许为用户直接分配权限。
"""
@classmethod
def new(cls, user_id: Union[str, int], item_name: str):
"""
新建角色分配对象。不检查用户是否存在,也不保存数据库。
:param user_id: 用户 ID
:param item_name: 授权项名称
:return: 新的分配对象
"""
return cls(user_id=user_id, item_name=item_name).before_save()
@classmethod
async def new_batch(cls, user_id: int, item_names: set[str]):
"""
为用户创建角色或权限,自动剔除已经分配的角色或授权项。注意:仅创建不保存。
:param user_id: 用户名
:param item_names: 授权项名称列表,这里允许是角色名称,也允许是权限名称
"""
from paste.rbac.rbac_user import RbacUser, Supervisors
# 确认用户存在,且不是超级用户
_mod_user: RbacUser = await RbacUser(**{RbacUser.id.key: user_id}).async_find_first()
assert _mod_user is not None, f"ID 为 {user_id} 的用户不存在."
assert _mod_user.username not in Supervisors, f"ID 为 {user_id} 的用户 {_mod_user.username} 禁止设置权限."
# 查库,过滤已经存在的权限分配
_query = select(cls.item_name).where(cls.user_id == user_id, cls.item_name.in_(item_names))
_rows = await cls.query_all(_query)
_exist_names: set[str] = set([_r[cls.item_name.key] for _r in _rows])
item_names = set([_name for _name in item_names if _name not in _exist_names])
# 创建模型列表
_new_assignments = [cls(**{cls.user_id.key: user_id, cls.item_name.key: name}) for name in item_names]
return _new_assignments
@classmethod
async def assign(cls, user_id: int, item_names: set[str]):
"""
为用户分配角色或权限,自动剔除已经分配的角色或授权项。
:param user_id: 用户名
:param item_names: 授权项名称列表,这里允许是角色名称,也允许是权限名称
"""
# 创建模型列表
_new_assignments = await cls.new_batch(user_id=user_id, item_names=item_names)
# 保存数据
_session = cls.get_aio_session()
try:
_session.add_all(_new_assignments)
await _session.commit()
except Exception as e:
await _session.rollback()
raise e
finally:
await _session.close()
@classmethod
async def delete(cls, user_id: Union[str, int], item_name: str):
"""
删除授权。
:param user_id: 用户 ID
:param item_name: 授权项
:return: 操作状态,游标返回对象
"""
assert user_id not in ('', None), '必须提供用户 ID.'
assert item_name not in ('', None), '必须提供权限或角色名称.'
_query = delete(cls).where(cls.user_id == user_id, cls.item_name == item_name)
_result = await cls.raw_execute(query=_query)
_rowcount = _result.rowcount if isinstance(_result.rowcount, int) else 0
return _rowcount > 0, _result