import ast import asyncio import hashlib import os import threading from typing import Tuple, List, Any, Optional, Dict from tornado.template import Loader, Template from tornado.web import UIModule class ParamAwareUIModuleDataWarehouse: """ 预处理数据仓库。 数据用唯一调用 ID 作为 Key 存储。 """ def __init__(self): self._store = {} self._lock = threading.Lock() def prepare(self, module_name: str, call_id: str, data: Any): """存储预处理数据""" with self._lock: self._store.setdefault(module_name, {})[call_id] = data def fetch(self, module_name: str, call_id: str) -> Any: """获取预处理数据""" with self._lock: return self._store.get(module_name, {}).get(call_id) warehouse = ParamAwareUIModuleDataWarehouse() """ 全局单例,参数感知预处理仓库。 """ class ParamAwareUIModule(UIModule): """ 参数感知 UIModule 父类。 1、子类应当实现 async_prepare 方法完成数据预处理,该方法在 Handler 执行过程中会根据模板文件的配置调用完成数据初始化,模板中配置的参数会传给该方法。 2、原有的 render 方法作为从数据仓库中获取数据,调用 render_with_data 方法完成渲染,已无需在子类中实现,模板中配置的参数也会传给该方法。 3、子类应当实现 render_with_data 方法完成渲染,预处理数据通过参数 prepared_data 传入。 """ @classmethod def generate_call_id(cls, module_name: str, kwargs: dict) -> str: """根据模块名和参数生成唯一调用ID""" param_str = ",".join(f"{k}={v}" for k, v in sorted(kwargs.items())) return hashlib.md5(f"{module_name}|{param_str}".encode()).hexdigest() async def async_prepare(self, **kwargs) -> Any: """子类实现异步数据加载,用静态方法避免参数缺失""" raise NotImplementedError def render(self, **kwargs): """自动关联预处理数据""" call_id = self.generate_call_id(self.__class__.__name__, kwargs) prepared_data = warehouse.fetch(self.__class__.__name__, call_id) return self.render_with_data(prepared_data, **kwargs) def render_with_data(self, prepared_data: Any, **kwargs): """子类实现具体渲染逻辑""" raise NotImplementedError class UIModuleCallAnalyzer(ast.NodeVisitor): """ 用于分析 Tornado 模板生成的 Python 代码,从中解析出对 UIModule 的名称和实际调用参数。 """ def __init__(self): self.calls = [] # 存储 (module_class_name, kwargs) def visit_Assign(self, node): """ 匹配 _tt_tmp = _tt_modules.XxxModule(...) 模式。 :param node: :return: """ if (isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Attribute) and isinstance(node.value.func.value, ast.Name) and node.value.func.value.id == '_tt_modules'): module_class = node.value.func.attr kwargs = self._extract_kwargs(node.value) self.calls.append((module_class, kwargs)) @classmethod def _extract_kwargs(cls, call_node: ast.Call) -> dict: """ 安全提取调用参数。 :param call_node: 调用节点 :return: 实际参数 """ kwargs = {} # 处理位置参数 (Tornado不会生成这种情况) for arg in call_node.args: if isinstance(arg, ast.Constant): kwargs.setdefault('_pos_args', []).append(arg.s) # 处理关键字参数 for kw in call_node.keywords: if isinstance(kw.value, (ast.Constant, ast.Constant, ast.Constant)): kwargs[kw.arg] = ast.literal_eval(ast.unparse(kw.value)) elif isinstance(kw.value, ast.Name) and kw.value.id in ('True', 'False', 'None'): kwargs[kw.arg] = ast.literal_eval(kw.value.id) return kwargs @classmethod def ui_module_calls(cls, template_code: str) -> List[Tuple[str, dict]]: """ 从模板生成的 Python 代码中提取 UIModule 的调用。 :param template_code: 模板生成的函数。 :return: """ try: tree = ast.parse(template_code) analyzer = cls() analyzer.visit(tree) return analyzer.calls except: return [] class ParamAwareTemplate(Template): """ 参数感知模板。 重写 _generate_python 方法,从 Tornado 模板编译生成的 Python 代码中分析出 UIModule 调用参数。 提供 prepare_ui_modules 方法在 Handler 中 load 完成后预处理数据,预处理得到的数据会保存在数据仓库中。 """ def __init__(self, *args, **kwargs): self.ui_module_calls = [] super().__init__(*args, **kwargs) def _generate_python(self, *args, **kwargs): code = super()._generate_python(*args, **kwargs) self.ui_module_calls = UIModuleCallAnalyzer.ui_module_calls(code) return code async def prepare_ui_modules(self, template: 'ParamAwareTemplate', ui_modules: dict[UIModule]): """执行模板中所有UIModule的异步预处理""" tasks = [] for module_name, kwargs in template.ui_module_calls: module_class = ui_modules.get(module_name) if not hasattr(module_class, 'async_prepare'): continue call_id = ParamAwareUIModule.generate_call_id(module_name, kwargs) task = asyncio.create_task( self._prepare_single(module_class, call_id, kwargs) ) tasks.append(task) await asyncio.gather(*tasks) async def _prepare_single(self, module_class, call_id, kwargs): """单个模块的预处理流程""" try: _ui_modulr: ParamAwareUIModule = module_class(handler=self.namespace.get('handler')) data = await _ui_modulr.async_prepare(**kwargs) warehouse.prepare(module_class.__name__, call_id, data) except Exception as e: warehouse.prepare(module_class.__name__, call_id, { "__error__": str(e) }) class ParamAwareLoader(Loader): """ 参数感知装载器,也是本代码文件中主要对外开放的类。 重写 _create_template 方法,用参数感知模板替换原有模板。 重写 load 明确返回参数感知模板。 """ def __init__(self, root_directory: str, **kwargs: Any) -> None: super().__init__(root_directory, **kwargs) self.templates = {} # type: Dict[str, ParamAwareTemplate] def _create_template(self, name: str) -> ParamAwareTemplate: path = os.path.join(self.root, name) with open(path, "rb") as f: template = ParamAwareTemplate(f.read(), name=name, loader=self) return template def load(self, name: str, parent_path: Optional[str] = None) -> ParamAwareTemplate: """Loads a template.""" name = self.resolve_path(name, parent_path=parent_path) with self.lock: if name not in self.templates: self.templates[name] = self._create_template(name) return self.templates[name] async def load_with_prepare(self, name: str) -> ParamAwareTemplate: """ 加载模板,并完成数据准备。 :param name: 模板名称 :return: 完成数据准备的模板 """ template = self.load(name) _modules = self.namespace.get('modules', None) if _modules and hasattr(_modules, 'ui_modules'): _ui_modules = _modules.ui_modules await template.prepare_ui_modules(template, _ui_modules) return template