148 lines
5.5 KiB
Python
148 lines
5.5 KiB
Python
import importlib
|
||
import pickle
|
||
|
||
from sqlalchemy import select, text, delete
|
||
|
||
from paste.rbac.rbac_item import RbacItem
|
||
from paste.rbac.rbac_models import RbacRuleModel
|
||
from paste.rbac.rbac_permission import RbacPermission
|
||
from paste.rbac.rbac_user import RbacUser
|
||
|
||
|
||
class RbacRule(RbacRuleModel):
|
||
"""
|
||
规则是授权项的附带验证条件。在验证授权项时,只能判断是否具有某个授权项,无法对具体数据执行进一步验证。比如
|
||
验证某一项数据是否允许某个用户执行某操作,此时就需要用到规则。
|
||
|
||
规则是在单独定义的验证方法,这个方法被持久化保存在数据库中,具体执行某个需要鉴权的操作时,若该权限配置了规
|
||
则,那么规则方法会一并参与到鉴权过程中去,以确定用户是否有对具体数据执行操作的权限。
|
||
|
||
必须允许多个规则应用于一个授权项,但事实上是一个授权项自身只能绑定一个规则,解决方案是将一系列需要规则鉴权
|
||
的操作作为规则权限的子授权项。这样,在对这个权限进行鉴权操作时,会自底向上逐个检查父权限是否有规则,若有规
|
||
则,那么会进入这个父权限的规则方法,执行,并校验其返回值。
|
||
|
||
因此要允许手动创建授权项,且允许手动为授权项绑定规则,然后将其他同样需要执行该规则的权限配置为该权限的子权
|
||
限。
|
||
|
||
由于在规则执行系统中,不知道未来将会编写和配置的规则,因此无法调用到对应的规则,只能将来编写好规则后,通过
|
||
持久化对象到数据库中,通过查库还原将来的规则对象后再执行 run 方法。
|
||
"""
|
||
|
||
@classmethod
|
||
async def create(cls, full_class_name: str):
|
||
"""
|
||
添加规则。
|
||
|
||
:param full_class_name: 规则类
|
||
:return: 保存状态、当前规则
|
||
"""
|
||
_rule_cls = cls.load_rule_class(full_class_name)
|
||
assert _rule_cls, f"未找到名称为:{full_class_name} 的规则类."
|
||
|
||
_rule_model = _rule_cls()
|
||
_rule_model.data = _rule_model.dumps()
|
||
if not _rule_model.name:
|
||
_rule_model.name = _rule_cls.__name__
|
||
|
||
await _rule_model.async_save()
|
||
return _rule_model
|
||
|
||
@classmethod
|
||
async def delete(cls, name: str):
|
||
"""
|
||
删除规则。
|
||
|
||
:param name: 要删除的规则名称
|
||
:return: 是否删除成功、删除的行数
|
||
"""
|
||
assert name not in ('', None), '必须提供规则名称.'
|
||
_query = delete(cls).where(cls.name == name)
|
||
_result = await cls.raw_execute(query=_query)
|
||
_rowcount = _result.rowcount if isinstance(_result.rowcount, int) else 0
|
||
return _rowcount > 0, _rowcount
|
||
|
||
@classmethod
|
||
async def modify(cls, name: str, full_class_name: str):
|
||
"""
|
||
编辑规则。
|
||
|
||
:param name: 名称
|
||
:param full_class_name: 规则类
|
||
:return: 保存状态、当前规则
|
||
"""
|
||
_rule_cls = cls.load_rule_class(full_class_name)
|
||
assert _rule_cls, f"未找到名称为:{full_class_name} 的规则类."
|
||
_rule_model = _rule_cls()
|
||
|
||
assert name not in ('', None), '必须提供规则名称.'
|
||
_rule: cls = await cls(name=name).async_find_first()
|
||
assert _rule, f"未找到名称为:{name} 的规则."
|
||
|
||
_rule.data = _rule_model.dumps()
|
||
_rule.name = _rule_model.name
|
||
if not _rule.name:
|
||
_rule.name = _rule_cls.__name__
|
||
|
||
await _rule.async_save()
|
||
return _rule
|
||
|
||
@classmethod
|
||
async def find_by_item_names(cls, item_names: set[str]):
|
||
"""
|
||
根据授权项名称(权限名称或角色名称)取得所有角色。
|
||
|
||
:param item_names: 授权项名称列表
|
||
:return: 规则列表
|
||
"""
|
||
# 取出所有授权项中的规则,忽略没有规则的
|
||
_query = select(cls).join(
|
||
RbacItem, RbacItem.rule_name == cls.name
|
||
).where(
|
||
RbacItem.name.in_(item_names),
|
||
text(f"ifnull({RbacItem.rule_name.key},'')!=''")
|
||
)
|
||
_rule_list: list[cls] = await cls.query_all(_query)
|
||
return _rule_list
|
||
|
||
@classmethod
|
||
def load_rule_class(cls, full_class_name: str):
|
||
"""
|
||
通过规则类名称,加载规则类。若类所在的模块不存在,则报异常。
|
||
|
||
:param full_class_name: 完整规则名称,从顶层模块名称开始,直到类名称。
|
||
:return: 找到的规则类,找不到返回 None
|
||
"""
|
||
_full_paths = full_class_name.split('.')
|
||
_cls_name = _full_paths[-1]
|
||
_mod_name = '.'.join(_full_paths[:-1])
|
||
|
||
try:
|
||
_module = importlib.import_module(_mod_name)
|
||
# 迭代模块成员
|
||
for _n in dir(_module):
|
||
_cls = getattr(_module, _n)
|
||
if isinstance(_cls, type) and issubclass(_cls, RbacRule) and _cls.__name__ == _cls_name:
|
||
return _cls
|
||
except Exception:
|
||
return None
|
||
|
||
return None
|
||
|
||
def run(self, rbac_user: RbacUser, rbac_permission: RbacPermission, *args, **kwargs) -> bool:
|
||
"""
|
||
运行规则。当用户在执行具体操作时,该规则会自动被唤起执行。
|
||
|
||
:param rbac_user: 用户
|
||
:param rbac_permission: 权限对象
|
||
:return: 允许执行返回 True, 否则返回 False
|
||
"""
|
||
return True
|
||
|
||
def dumps(self):
|
||
"""
|
||
序列化为可持久化文本。
|
||
|
||
:return: 可持久化文本
|
||
"""
|
||
return pickle.dumps(self)
|