import json import logging import time import uuid from typing import Dict, Optional, Any import redis from redis import ConnectionPool logger = logging.getLogger("services") def _calculate_score(priority: int) -> float: """ 计算任务得分 (优先级) """ timestamp = time.time() score = (6 - priority) * timestamp return score class QueueManager: """Redis 队列管理器""" def __init__(self, redis_config: Dict[str, Any]): """ 初始化队列管理器 Args: redis_config: Redis 配置字典 """ # 提取 Redis 连接参数 host = redis_config['host'] port = redis_config['port'] db = redis_config['db'] password = redis_config.get('password') max_connections = redis_config.get('max_connections', 10) # 1. 创建 Redis 连接池 self.pool = ConnectionPool( host=host, port=port, db=db, password=password, max_connections=max_connections, decode_responses=True ) self.redis_client = redis.Redis(connection_pool=self.pool) self.process_queue_key = redis_config['queue_key'] self.process_hash_queue_key = redis_config['queue_key'] + '_hash' self.upload_queue_key = redis_config['upload_queue_key'] logger.info( f"QueueManager initialized with Connection Pool (Max={max_connections}): " f"Process Queue={self.process_queue_key}, Upload Queue={self.upload_queue_key}") try: self.redis_client.ping() logger.info("Redis connection successful.") except Exception as e: logger.error(f"Failed to connect to Redis: {e}") raise def add_task(self, task_data: Dict[str, Any], priority: int = 3) -> str: """ 添加任务到处理队列 Args: task_data: 任务数据 (包含 hook_url, text, etc.) priority: 优先级 (1-5),1 为最高优先级 Returns: str: 任务 ID """ task_id = str(uuid.uuid4()) task = { 'task_id': task_id, 'priority': priority, 'created_at': time.time(), 'data': task_data } score = _calculate_score(priority) # 管道操作,一次网络请求完成两件事(原子性 + 性能高) pipe = self.redis_client.pipeline() # 1. ZSET:用于优先级排序和弹出 pipe.zadd(self.process_queue_key, {task_id: score}) # 2. HASH:专门存 uuid -> 完整 JSON(支持快速读取/修改/删除) pipe.hset(self.process_hash_queue_key, task_id, json.dumps(task)) # 执行 pipe.execute() logger.info(f"Task {task_id} added to process queue with priority {priority}, score {score}") return task_id def get_process_task(self) -> Optional[Dict[str, Any]]: """ 从处理队列 (ZSET) 中获取下一个任务 Returns: Optional[Dict]: 任务数据,如果队列为空返回 None """ # 使用 ZPOPMAX 获取得分最高的任务 result = self.redis_client.zpopmax(self.process_queue_key, 1) if not result: return None task_id, _ = result[0] pipe = self.redis_client.pipeline() pipe.hget(self.process_hash_queue_key, task_id) pipe.hdel(self.process_hash_queue_key, task_id) task_json, _ = pipe.execute() task = json.loads(task_json) logger.info(f"Task {task.get('task_id', 'Unknown')} retrieved from process queue.") return task def push_upload_task(self, task_result: Dict[str, Any]): """ 将处理结果推送到上传队列 (List) Args: task_result: 任务处理结果 (包含 task_id, output_paths, hook_url等) """ self.redis_client.lpush(self.upload_queue_key, json.dumps(task_result)) logger.info(f"Task {task_result['task_id']} pushed to upload queue.") def get_upload_task(self, timeout: int = 5) -> Optional[Dict[str, Any]]: """ 从上传队列 (List) 中获取任务,阻塞等待 Args: timeout: 阻塞等待时间 (秒) Returns: Optional[Dict]: 任务数据,如果超时返回 None """ # 使用 BRPOP 阻塞弹出 result = self.redis_client.brpop(self.upload_queue_key, timeout) if not result: return None # BRPOP 返回 (key, value) task_json = result[1] task = json.loads(task_json) logger.debug(f"Task {task.get('task_id', 'Unknown')} retrieved from upload queue.") return task def get_process_queue_stats(self) -> Dict[str, Any]: """ 获取处理队列统计信息 """ queued_count = self.redis_client.zcard(self.process_queue_key) upload_count = self.redis_client.llen(self.upload_queue_key) return { 'process_queued': queued_count, 'upload_queued': upload_count, 'timestamp': time.time() } def delete_process_task(self, task_id: str) -> bool: """ 根据 task_id 安全、快速、原子地删除任务(推荐生产写法) """ pipe = self.redis_client.pipeline() # 1. 先查 HASH 中是否存在(O(1)) pipe.hget(self.process_hash_queue_key, task_id) # 2. 同时从 ZSET 删除(即使 HASH 已不存在也能删干净) pipe.zrem(self.process_queue_key, task_id) json_str, zrem_count = pipe.execute() if json_str is not None: # 任务存在于 HASH,说明之前没被消费,真正需要删除 self.redis_client.hdel(self.process_hash_queue_key, task_id) logger.warning(f"Task {task_id} successfully removed from queue (cancelled).") return True if zrem_count > 0: # 任务可能已经被消费了,但 ZSET 里还有残留(异常情况),也算清理成功 logger.info(f"Task {task_id} only existed in ZSET (stale), cleaned up.") return True logger.info(f"Task {task_id} not found in queue.") return False