Files
2026-06-02 16:26:10 +08:00

213 lines
7.7 KiB
Python

import ast
import asyncio
import hashlib
import os
import threading
from typing import Tuple, List, Any, Optional, Dict
from tornado.template import Loader, Template
from tornado.web import UIModule
class ParamAwareUIModuleDataWarehouse:
"""
预处理数据仓库。
数据用唯一调用 ID 作为 Key 存储。
"""
def __init__(self):
self._store = {}
self._lock = threading.Lock()
def prepare(self, module_name: str, call_id: str, data: Any):
"""存储预处理数据"""
with self._lock:
self._store.setdefault(module_name, {})[call_id] = data
def fetch(self, module_name: str, call_id: str) -> Any:
"""获取预处理数据"""
with self._lock:
return self._store.get(module_name, {}).get(call_id)
warehouse = ParamAwareUIModuleDataWarehouse()
"""
全局单例,参数感知预处理仓库。
"""
class ParamAwareUIModule(UIModule):
"""
参数感知 UIModule 父类。
1、子类应当实现 async_prepare 方法完成数据预处理,该方法在 Handler 执行过程中会根据模板文件的配置调用完成数据初始化,模板中配置的参数会传给该方法。
2、原有的 render 方法作为从数据仓库中获取数据,调用 render_with_data 方法完成渲染,已无需在子类中实现,模板中配置的参数也会传给该方法。
3、子类应当实现 render_with_data 方法完成渲染,预处理数据通过参数 prepared_data 传入。
"""
@classmethod
def generate_call_id(cls, module_name: str, kwargs: dict) -> str:
"""根据模块名和参数生成唯一调用ID"""
param_str = ",".join(f"{k}={v}" for k, v in sorted(kwargs.items()))
return hashlib.md5(f"{module_name}|{param_str}".encode()).hexdigest()
async def async_prepare(self, **kwargs) -> Any:
"""子类实现异步数据加载,用静态方法避免参数缺失"""
raise NotImplementedError
def render(self, **kwargs):
"""自动关联预处理数据"""
call_id = self.generate_call_id(self.__class__.__name__, kwargs)
prepared_data = warehouse.fetch(self.__class__.__name__, call_id)
return self.render_with_data(prepared_data, **kwargs)
def render_with_data(self, prepared_data: Any, **kwargs):
"""子类实现具体渲染逻辑"""
raise NotImplementedError
class UIModuleCallAnalyzer(ast.NodeVisitor):
"""
用于分析 Tornado 模板生成的 Python 代码,从中解析出对 UIModule 的名称和实际调用参数。
"""
def __init__(self):
self.calls = [] # 存储 (module_class_name, kwargs)
def visit_Assign(self, node):
"""
匹配 _tt_tmp = _tt_modules.XxxModule(...) 模式。
:param node:
:return:
"""
if (isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Attribute)
and isinstance(node.value.func.value, ast.Name) and node.value.func.value.id == '_tt_modules'):
module_class = node.value.func.attr
kwargs = self._extract_kwargs(node.value)
self.calls.append((module_class, kwargs))
@classmethod
def _extract_kwargs(cls, call_node: ast.Call) -> dict:
"""
安全提取调用参数。
:param call_node: 调用节点
:return: 实际参数
"""
kwargs = {}
# 处理位置参数 (Tornado不会生成这种情况)
for arg in call_node.args:
if isinstance(arg, ast.Constant):
kwargs.setdefault('_pos_args', []).append(arg.s)
# 处理关键字参数
for kw in call_node.keywords:
if isinstance(kw.value, (ast.Constant, ast.Constant, ast.Constant)):
kwargs[kw.arg] = ast.literal_eval(ast.unparse(kw.value))
elif isinstance(kw.value, ast.Name) and kw.value.id in ('True', 'False', 'None'):
kwargs[kw.arg] = ast.literal_eval(kw.value.id)
return kwargs
@classmethod
def ui_module_calls(cls, template_code: str) -> List[Tuple[str, dict]]:
"""
从模板生成的 Python 代码中提取 UIModule 的调用。
:param template_code: 模板生成的函数。
:return:
"""
try:
tree = ast.parse(template_code)
analyzer = cls()
analyzer.visit(tree)
return analyzer.calls
except:
return []
class ParamAwareTemplate(Template):
"""
参数感知模板。
重写 _generate_python 方法,从 Tornado 模板编译生成的 Python 代码中分析出 UIModule 调用参数。
提供 prepare_ui_modules 方法在 Handler 中 load 完成后预处理数据,预处理得到的数据会保存在数据仓库中。
"""
def __init__(self, *args, **kwargs):
self.ui_module_calls = []
super().__init__(*args, **kwargs)
def _generate_python(self, *args, **kwargs):
code = super()._generate_python(*args, **kwargs)
self.ui_module_calls = UIModuleCallAnalyzer.ui_module_calls(code)
return code
async def prepare_ui_modules(self, template: 'ParamAwareTemplate', ui_modules: dict[UIModule]):
"""执行模板中所有UIModule的异步预处理"""
tasks = []
for module_name, kwargs in template.ui_module_calls:
module_class = ui_modules.get(module_name)
if not hasattr(module_class, 'async_prepare'):
continue
call_id = ParamAwareUIModule.generate_call_id(module_name, kwargs)
task = asyncio.create_task(
self._prepare_single(module_class, call_id, kwargs)
)
tasks.append(task)
await asyncio.gather(*tasks)
async def _prepare_single(self, module_class, call_id, kwargs):
"""单个模块的预处理流程"""
try:
_ui_modulr: ParamAwareUIModule = module_class(handler=self.namespace.get('handler'))
data = await _ui_modulr.async_prepare(**kwargs)
warehouse.prepare(module_class.__name__, call_id, data)
except Exception as e:
warehouse.prepare(module_class.__name__, call_id, {
"__error__": str(e)
})
class ParamAwareLoader(Loader):
"""
参数感知装载器,也是本代码文件中主要对外开放的类。
重写 _create_template 方法,用参数感知模板替换原有模板。
重写 load 明确返回参数感知模板。
"""
def __init__(self, root_directory: str, **kwargs: Any) -> None:
super().__init__(root_directory, **kwargs)
self.templates = {} # type: Dict[str, ParamAwareTemplate]
def _create_template(self, name: str) -> ParamAwareTemplate:
path = os.path.join(self.root, name)
with open(path, "rb") as f:
template = ParamAwareTemplate(f.read(), name=name, loader=self)
return template
def load(self, name: str, parent_path: Optional[str] = None) -> ParamAwareTemplate:
"""Loads a template."""
name = self.resolve_path(name, parent_path=parent_path)
with self.lock:
if name not in self.templates:
self.templates[name] = self._create_template(name)
return self.templates[name]
async def load_with_prepare(self, name: str) -> ParamAwareTemplate:
"""
加载模板,并完成数据准备。
:param name: 模板名称
:return: 完成数据准备的模板
"""
template = self.load(name)
_modules = self.namespace.get('modules', None)
if _modules and hasattr(_modules, 'ui_modules'):
_ui_modules = _modules.ui_modules
await template.prepare_ui_modules(template, _ui_modules)
return template