Files
paste-framework/paste/rbac/rbac_permission.py
T
2026-06-02 16:26:10 +08:00

150 lines
5.9 KiB
Python

import importlib
from sqlalchemy import delete
from paste.core import config
from paste.rbac.rbac_item import RbacItem
from paste.web.application import Application
class RbacPermission(RbacItem):
"""
权限。
大多数情况下,权限都对应着一个具体的请求操作,且经由导入期自动导入。
权限的 name 属性为请求控制器 RequestHandler 的 route_pattern 属性值,
权限的 description 属性对应于 RequestHandler 类的文档注释的第一行。
权限可以分配给用户,也可以分配给角色或其他权限。
此外允许手动创建权限,主要是为规则创建一个权限载体,当其他的权限属于这个权限的子权限时,相当于同时拥有了这个权限的规则。
"""
@classmethod
async def create(cls, name: str, description: str = None, rule_name: str = None):
"""
创建权限。
:param name: 权限名称
:param rule_name: 规则名称
:param description: 权限描述
:return: 权限对象
"""
assert name not in ('', None), '必须提供权限名称.'
_permission = cls(name=name, description=description, type=cls.TYPE_PERMISSION)
_permission.rule_name = rule_name if rule_name else _permission.rule_name
await _permission.async_save()
return _permission
@classmethod
async def delete(cls, name: str):
"""
删除权限。
:param name: 授权项名称
:return: 操作状态,游标返回对象
"""
assert name not in ('', None), '必须提供权限名称.'
_query = delete(cls).where(cls.name == name)
_result = await cls.raw_execute(query=_query)
_rowcount = _result.rowcount if isinstance(_result.rowcount, int) else 0
return _rowcount > 0, _result
@classmethod
async def modify(cls, name: str, description: str = None, rule_name: str = None):
"""
编辑权限。
:param name: 名称
:param description: 描述
:param rule_name: 规则名称
:return: 权限对象
"""
assert name not in ('', None), '必须提供权限名称.'
_permission: cls = await cls(name=name).async_find_first()
assert _permission, f"未找到名称为:{name} 的权限."
_permission.description = description if description else _permission.description
_permission.rule_name = rule_name if rule_name else _permission.rule_name
await _permission.async_save()
return _permission
@classmethod
def identify_permission(cls):
"""
根据应用程序配置,从应用程序目录中识别所有的控制器,及其对应的路由。
读取配置文件中关于 tornado 部分的配置,扫描配置包中的所有 Handler 类。
识别需要授权的接口,即包含 auth_permission 装饰器的接口。
忽略无需授权的接口,如:部分 OpenAPI 或 FrontendAPI。
:return: 识别到的控制器列表,注意列表中是 tuple(route, handler_type)
"""
apps_config: dict = config.get_config('tornado')
_handlers: list[tuple[str, type]] = []
for _n, _app_cfgs in apps_config.items():
for app_cfg in _app_cfgs:
_handlers_pkg = app_cfg.get('handlers_pkg')
_modules_itr = Application.modules_iterator(package=_handlers_pkg)
for file_finder, handler_name, is_package in _modules_itr:
if is_package:
continue
_module = importlib.import_module(handler_name)
_hls_list = Application.fetch_handlers(module=_module)
for _hls in _hls_list:
_uri, _hdl = _hls
# 检查 post 或 get 是否被 auth_permission 装饰
for method_name in ['post', 'get']:
method = getattr(_hdl, method_name, None)
if method and callable(method):
if getattr(method, '__auth_permission__', False):
_handlers.append(_hls)
break
return _handlers
@classmethod
async def import_permissions(cls):
"""
导入所有的可分配权限到数据库,若已经在数据库存在,则更新。
"""
_handlers = cls.identify_permission()
# 根据所有的路由信息,查出已经有的权限数据
_routes: list[str] = [rc[0] for rc in _handlers]
_item_model_list: list[cls] = await cls(**{cls.name.key: _routes}).async_find()
# 利用路由 Key 建立索引
_item_model_dict: dict[str: cls] = {_item.name: _item for _item in _item_model_list}
_permissions: list[cls] = []
for _route, _cls in _handlers:
# 取得类描述
_desc = f"{_cls.__doc__}".strip().split('\n')[0].strip(),
# 利用路由取出模型
_perm_item: cls = _item_model_dict.get(_route, None)
if _perm_item is None:
# 未得到模型,创建
_perm_item = cls(**{cls.name.key: _route, cls.description.key: _desc})
else:
# 已得到模型,更新
_perm_item.description = _desc
_perm_item.close_session()
# 加入列表,批量保存
_permissions.append(_perm_item)
# 保存数据
_session = cls.get_aio_session()
try:
_session.add_all(_permissions)
await _session.commit()
except Exception as e:
await _session.rollback()
raise e
finally:
await _session.close()
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.type = self.TYPE_PERMISSION