File size: 6,341 Bytes
7c71fa7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
import json
import logging
import time
import uuid
from typing import Dict, Optional, Any
import redis
from redis import ConnectionPool
logger = logging.getLogger("services")
def _calculate_score(priority: int) -> float:
"""
计算任务得分 (优先级)
"""
timestamp = time.time()
score = (6 - priority) * timestamp
return score
class QueueManager:
"""Redis 队列管理器"""
def __init__(self, redis_config: Dict[str, Any]):
"""
初始化队列管理器
Args:
redis_config: Redis 配置字典
"""
# 提取 Redis 连接参数
host = redis_config['host']
port = redis_config['port']
db = redis_config['db']
password = redis_config.get('password')
max_connections = redis_config.get('max_connections', 10)
# 1. 创建 Redis 连接池
self.pool = ConnectionPool(
host=host,
port=port,
db=db,
password=password,
max_connections=max_connections,
decode_responses=True
)
self.redis_client = redis.Redis(connection_pool=self.pool)
self.process_queue_key = redis_config['queue_key']
self.process_hash_queue_key = redis_config['queue_key'] + '_hash'
self.upload_queue_key = redis_config['upload_queue_key']
logger.info(
f"QueueManager initialized with Connection Pool (Max={max_connections}): "
f"Process Queue={self.process_queue_key}, Upload Queue={self.upload_queue_key}")
try:
self.redis_client.ping()
logger.info("Redis connection successful.")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
raise
def add_task(self, task_data: Dict[str, Any], priority: int = 3) -> str:
"""
添加任务到处理队列
Args:
task_data: 任务数据 (包含 hook_url, text, etc.)
priority: 优先级 (1-5),1 为最高优先级
Returns:
str: 任务 ID
"""
task_id = str(uuid.uuid4())
task = {
'task_id': task_id,
'priority': priority,
'created_at': time.time(),
'data': task_data
}
score = _calculate_score(priority)
# 管道操作,一次网络请求完成两件事(原子性 + 性能高)
pipe = self.redis_client.pipeline()
# 1. ZSET:用于优先级排序和弹出
pipe.zadd(self.process_queue_key, {task_id: score})
# 2. HASH:专门存 uuid -> 完整 JSON(支持快速读取/修改/删除)
pipe.hset(self.process_hash_queue_key, task_id, json.dumps(task))
# 执行
pipe.execute()
logger.info(f"Task {task_id} added to process queue with priority {priority}, score {score}")
return task_id
def get_process_task(self) -> Optional[Dict[str, Any]]:
"""
从处理队列 (ZSET) 中获取下一个任务
Returns:
Optional[Dict]: 任务数据,如果队列为空返回 None
"""
# 使用 ZPOPMAX 获取得分最高的任务
result = self.redis_client.zpopmax(self.process_queue_key, 1)
if not result:
return None
task_id, _ = result[0]
pipe = self.redis_client.pipeline()
pipe.hget(self.process_hash_queue_key, task_id)
pipe.hdel(self.process_hash_queue_key, task_id)
task_json, _ = pipe.execute()
task = json.loads(task_json)
logger.info(f"Task {task.get('task_id', 'Unknown')} retrieved from process queue.")
return task
def push_upload_task(self, task_result: Dict[str, Any]):
"""
将处理结果推送到上传队列 (List)
Args:
task_result: 任务处理结果 (包含 task_id, output_paths, hook_url等)
"""
self.redis_client.lpush(self.upload_queue_key, json.dumps(task_result))
logger.info(f"Task {task_result['task_id']} pushed to upload queue.")
def get_upload_task(self, timeout: int = 5) -> Optional[Dict[str, Any]]:
"""
从上传队列 (List) 中获取任务,阻塞等待
Args:
timeout: 阻塞等待时间 (秒)
Returns:
Optional[Dict]: 任务数据,如果超时返回 None
"""
# 使用 BRPOP 阻塞弹出
result = self.redis_client.brpop(self.upload_queue_key, timeout)
if not result:
return None
# BRPOP 返回 (key, value)
task_json = result[1]
task = json.loads(task_json)
logger.debug(f"Task {task.get('task_id', 'Unknown')} retrieved from upload queue.")
return task
def get_process_queue_stats(self) -> Dict[str, Any]:
"""
获取处理队列统计信息
"""
queued_count = self.redis_client.zcard(self.process_queue_key)
upload_count = self.redis_client.llen(self.upload_queue_key)
return {
'process_queued': queued_count,
'upload_queued': upload_count,
'timestamp': time.time()
}
def delete_process_task(self, task_id: str) -> bool:
"""
根据 task_id 安全、快速、原子地删除任务(推荐生产写法)
"""
pipe = self.redis_client.pipeline()
# 1. 先查 HASH 中是否存在(O(1))
pipe.hget(self.process_hash_queue_key, task_id)
# 2. 同时从 ZSET 删除(即使 HASH 已不存在也能删干净)
pipe.zrem(self.process_queue_key, task_id)
json_str, zrem_count = pipe.execute()
if json_str is not None:
# 任务存在于 HASH,说明之前没被消费,真正需要删除
self.redis_client.hdel(self.process_hash_queue_key, task_id)
logger.warning(f"Task {task_id} successfully removed from queue (cancelled).")
return True
if zrem_count > 0:
# 任务可能已经被消费了,但 ZSET 里还有残留(异常情况),也算清理成功
logger.info(f"Task {task_id} only existed in ZSET (stale), cleaned up.")
return True
logger.info(f"Task {task_id} not found in queue.")
return False
|