Files
2026-06-02 16:30:48 +08:00

152 lines
4.0 KiB
Python

"""
演示 Redis Stream 消息队列服务。
"""
import asyncio
import logging
import os
import socket
import sys
from typing import Optional
import redis
from paste.core import aio_pool
from paste.core.logging import set_logger_config, echo_log, get_logger
from paste.db.redis import StreamActor
from paste.service.daemonize import DaemonizeService
logger_config_name = 'logger.default'
"""
配置文件中日志配置字段名称。
"""
current_event_loop = None
"""
事件循环对象。
"""
pid_file = os.path.join(os.path.curdir, 'stream_service.pid')
"""
PID 文件路径。
"""
service_name = 'Redis Stream 消息队列服务'
"""
服务名称。
"""
# 配置路径:从 config.json 中读取
stream_config_path = "redis.streams.demo"
# 创建 StreamActor 实例
stream_actor: Optional[StreamActor] = None
async def process_message(data: dict):
"""
业务回调:处理每条消息
"""
user_id = data.get("user_id", "unknown")
event = data.get("event", "")
stream_data = data.get("data", "")
timestamp = data.get("timestamp", "")
echo_log(f"消费消息: user_id={user_id}, event='{event}', stream_data='{stream_data}', time={timestamp}")
# 模拟处理:写入数据库、发送邮件、更新缓存...
# 示例:记录日志 + 模拟耗时
for i in range(10):
echo_log(f"后台任务开始执行-{i}...")
await asyncio.sleep(0.8)
echo_log(f"消息处理完成: {user_id}")
return True
def current_loop() -> asyncio.AbstractEventLoop:
"""
这里必须采用方法,在适当的时间点创建事件循环对象,否则会导致服务无法启动。
:return: 事件循环对象
"""
global current_event_loop
if current_event_loop is None:
current_event_loop = asyncio.new_event_loop()
return current_event_loop
def start_service():
"""
控制台服务方式启动。
"""
set_logger_config(logger_config_name)
echo_log(f"正在启动{service_name}...")
try:
# 检测 Redis 连接
echo_log('检测 Redis 服务...')
_runner = aio_pool.get_aio_runner()
_runner(StreamActor.ping())
echo_log('Redis 服务正常.')
# 创建 StreamActor 监听服务
global stream_actor
stream_actor = StreamActor.new_actor(stream_config_path)
echo_log(f"{service_name}已启动,正在监听{service_name}...")
_runner(stream_actor.run_forever(process_message, is_delete=True))
except (redis.exceptions.TimeoutError, socket.timeout):
echo_log('Redis 服务异常.', level=logging.ERROR, is_log_exc=True)
echo_log(f"{service_name}启动失败.")
except KeyboardInterrupt:
echo_log(msg='KeyboardInterrupt')
stop_service()
except Exception as e:
echo_log(msg=e, level=logging.ERROR, is_log_exc=True)
echo_log(f"{service_name}因未知异常启动失败.")
def stop_service():
"""
停止服务。
"""
echo_log(f"正在停止{service_name}...")
# 停止监听
stream_actor.subscribe_stop()
# 停止事件循环
current_loop().stop()
echo_log(f"{service_name}已停止.")
def start():
"""
驻内存服务方式启动。
"""
set_logger_config(logger_config_name)
get_logger()
ds = DaemonizeService(pid_file=pid_file, name=service_name)
ds.set_start_callback(start_service)
ds.set_term_callback(stop_service)
ds.start()
def stop():
"""
驻内存服务方式停止。
"""
set_logger_config(logger_config_name)
get_logger()
ds = DaemonizeService(pid_file=pid_file, name=service_name)
ds.set_start_callback(start_service)
ds.set_term_callback(stop_service)
ds.stop()
if __name__ == "__main__":
if len(sys.argv) > 1:
if sys.argv[1] == "start":
start_service()
elif sys.argv[1] == "stop":
stop_service()
else:
print("用法: python service/stream_service.py start")
else:
start_service()