import logging import os.path import socket import redis import sqlalchemy.exc from tornado.ioloop import IOLoop, PeriodicCallback from base import conn_pool from paste.core import config from paste.core.logging import echo_log, get_logger, set_logger_config from paste.db.baseadapter import BaseAdapter from paste.service.daemonize import DaemonizeService from paste.util import udict from paste.web.application import ApplicationSwagger logger_config_name = 'logger.api' """ 配置文件中日志配置字段名称。 """ callbacks: list[PeriodicCallback] = [] """ 回调函数对象。 """ current_io_loop = None """ Tornado 事件循环对象。 """ pid_file = os.path.join(os.path.curdir, 'api_service.pid') """ PID 文件路径。 """ service_name = 'D3I API 服务' """ 服务名称。 """ def current_loop() -> IOLoop: """ 这里必须采用方法,在适当的时间点创建事件循环对象,否则会导致服务无法启动。 :return: 事件循环对象 """ global current_io_loop if current_io_loop is None: current_io_loop = IOLoop.current() return current_io_loop def daemon(): """ 驻守线程,处理诸如 Websocket 推送等事务(已弃用)。 """ # 无事发生 # 启动回调列表 for _cb in callbacks: _cb.start() echo_log(f"合计启动了 {len(callbacks)} 个驻守任务.") def start_apps(): """ 启动配置文件中配置的各种服务。 """ apps_config: list[dict] = config.get_config('tornado.api', []) apps: list[ApplicationSwagger] = [] for _app_cfg in apps_config: handlers_pkg = udict.get_by_path(_app_cfg, 'handlers_pkg') app = ApplicationSwagger(**_app_cfg) port = udict.get_by_path(_app_cfg, 'port') address = udict.get_by_path(_app_cfg, 'address') app.listen(port, address) apps.append(app) echo_log(f"模块 {handlers_pkg} 已加载于:http://127.0.0.1:{port}") # 启动驻守线程 # daemon() return apps def start_service(): set_logger_config(logger_config_name) echo_log(f"正在启动{service_name}...") # 启动服务时,重置加密密钥,意味着所有登录 Token 均失效 # echo_log(json_token.reset_secret_key()) try: # 检测 MySQL 服务是否正常 echo_log('检测数据库服务...') # 绑定连接池监听器 conn_pool.bind_listener() # 检测数据连接 BaseAdapter.ping() echo_log('数据库服务正常.') # 启动服务 start_apps() echo_log(f"{service_name}启动成功.") current_loop().start() except (redis.exceptions.TimeoutError, socket.timeout): echo_log('Redis 服务异常.', level=logging.ERROR, is_log_exc=True) echo_log(f"{service_name}启动失败.") except sqlalchemy.exc.OperationalError: echo_log('Database 服务异常.', level=logging.ERROR, is_log_exc=True) echo_log(f"{service_name}启动失败.") except KeyboardInterrupt: echo_log(msg='KeyboardInterrupt') stop_service() except Exception as e: echo_log(msg=e, level=logging.ERROR, is_log_exc=True) def stop_service(): # 停止回调列表 for _cb in callbacks: _cb.stop() current_loop().stop() echo_log(f"{service_name}已停止.") def start(): """ 以驻内存方式启动服务。 """ set_logger_config(logger_config_name) get_logger() ds = DaemonizeService(pid_file=pid_file, name=service_name) ds.set_start_callback(start_service) ds.set_term_callback(stop_service) ds.start() def stop(): """ 停止驻内存服务。 """ set_logger_config(logger_config_name) get_logger() ds = DaemonizeService(pid_file=pid_file, name=service_name) ds.set_start_callback(start_service) ds.set_term_callback(stop_service) ds.stop()