import importlib import pickle from sqlalchemy import select, text, delete from paste.rbac.rbac_item import RbacItem from paste.rbac.rbac_models import RbacRuleModel from paste.rbac.rbac_permission import RbacPermission from paste.rbac.rbac_user import RbacUser class RbacRule(RbacRuleModel): """ 规则是授权项的附带验证条件。在验证授权项时,只能判断是否具有某个授权项,无法对具体数据执行进一步验证。比如 验证某一项数据是否允许某个用户执行某操作,此时就需要用到规则。 规则是在单独定义的验证方法,这个方法被持久化保存在数据库中,具体执行某个需要鉴权的操作时,若该权限配置了规 则,那么规则方法会一并参与到鉴权过程中去,以确定用户是否有对具体数据执行操作的权限。 必须允许多个规则应用于一个授权项,但事实上是一个授权项自身只能绑定一个规则,解决方案是将一系列需要规则鉴权 的操作作为规则权限的子授权项。这样,在对这个权限进行鉴权操作时,会自底向上逐个检查父权限是否有规则,若有规 则,那么会进入这个父权限的规则方法,执行,并校验其返回值。 因此要允许手动创建授权项,且允许手动为授权项绑定规则,然后将其他同样需要执行该规则的权限配置为该权限的子权 限。 由于在规则执行系统中,不知道未来将会编写和配置的规则,因此无法调用到对应的规则,只能将来编写好规则后,通过 持久化对象到数据库中,通过查库还原将来的规则对象后再执行 run 方法。 """ @classmethod async def create(cls, full_class_name: str): """ 添加规则。 :param full_class_name: 规则类 :return: 保存状态、当前规则 """ _rule_cls = cls.load_rule_class(full_class_name) assert _rule_cls, f"未找到名称为:{full_class_name} 的规则类." _rule_model = _rule_cls() _rule_model.data = _rule_model.dumps() if not _rule_model.name: _rule_model.name = _rule_cls.__name__ await _rule_model.async_save() return _rule_model @classmethod async def delete(cls, name: str): """ 删除规则。 :param name: 要删除的规则名称 :return: 是否删除成功、删除的行数 """ assert name not in ('', None), '必须提供规则名称.' _query = delete(cls).where(cls.name == name) _result = await cls.raw_execute(query=_query) _rowcount = _result.rowcount if isinstance(_result.rowcount, int) else 0 return _rowcount > 0, _rowcount @classmethod async def modify(cls, name: str, full_class_name: str): """ 编辑规则。 :param name: 名称 :param full_class_name: 规则类 :return: 保存状态、当前规则 """ _rule_cls = cls.load_rule_class(full_class_name) assert _rule_cls, f"未找到名称为:{full_class_name} 的规则类." _rule_model = _rule_cls() assert name not in ('', None), '必须提供规则名称.' _rule: cls = await cls(name=name).async_find_first() assert _rule, f"未找到名称为:{name} 的规则." _rule.data = _rule_model.dumps() _rule.name = _rule_model.name if not _rule.name: _rule.name = _rule_cls.__name__ await _rule.async_save() return _rule @classmethod async def find_by_item_names(cls, item_names: set[str]): """ 根据授权项名称(权限名称或角色名称)取得所有角色。 :param item_names: 授权项名称列表 :return: 规则列表 """ # 取出所有授权项中的规则,忽略没有规则的 _query = select(cls).join( RbacItem, RbacItem.rule_name == cls.name ).where( RbacItem.name.in_(item_names), text(f"ifnull({RbacItem.rule_name.key},'')!=''") ) _rule_list: list[cls] = await cls.query_all(_query) return _rule_list @classmethod def load_rule_class(cls, full_class_name: str): """ 通过规则类名称,加载规则类。若类所在的模块不存在,则报异常。 :param full_class_name: 完整规则名称,从顶层模块名称开始,直到类名称。 :return: 找到的规则类,找不到返回 None """ _full_paths = full_class_name.split('.') _cls_name = _full_paths[-1] _mod_name = '.'.join(_full_paths[:-1]) try: _module = importlib.import_module(_mod_name) # 迭代模块成员 for _n in dir(_module): _cls = getattr(_module, _n) if isinstance(_cls, type) and issubclass(_cls, RbacRule) and _cls.__name__ == _cls_name: return _cls except Exception: return None return None def run(self, rbac_user: RbacUser, rbac_permission: RbacPermission, *args, **kwargs) -> bool: """ 运行规则。当用户在执行具体操作时,该规则会自动被唤起执行。 :param rbac_user: 用户 :param rbac_permission: 权限对象 :return: 允许执行返回 True, 否则返回 False """ return True def dumps(self): """ 序列化为可持久化文本。 :return: 可持久化文本 """ return pickle.dumps(self)