F5-TTS-pt-br / services /queue_manager.py
fuuuzzy's picture
Upload folder using huggingface_hub
7c71fa7 verified
import json
import logging
import time
import uuid
from typing import Dict, Optional, Any
import redis
from redis import ConnectionPool
logger = logging.getLogger("services")
def _calculate_score(priority: int) -> float:
"""
计算任务得分 (优先级)
"""
timestamp = time.time()
score = (6 - priority) * timestamp
return score
class QueueManager:
"""Redis 队列管理器"""
def __init__(self, redis_config: Dict[str, Any]):
"""
初始化队列管理器
Args:
redis_config: Redis 配置字典
"""
# 提取 Redis 连接参数
host = redis_config['host']
port = redis_config['port']
db = redis_config['db']
password = redis_config.get('password')
max_connections = redis_config.get('max_connections', 10)
# 1. 创建 Redis 连接池
self.pool = ConnectionPool(
host=host,
port=port,
db=db,
password=password,
max_connections=max_connections,
decode_responses=True
)
self.redis_client = redis.Redis(connection_pool=self.pool)
self.process_queue_key = redis_config['queue_key']
self.process_hash_queue_key = redis_config['queue_key'] + '_hash'
self.upload_queue_key = redis_config['upload_queue_key']
logger.info(
f"QueueManager initialized with Connection Pool (Max={max_connections}): "
f"Process Queue={self.process_queue_key}, Upload Queue={self.upload_queue_key}")
try:
self.redis_client.ping()
logger.info("Redis connection successful.")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
raise
def add_task(self, task_data: Dict[str, Any], priority: int = 3) -> str:
"""
添加任务到处理队列
Args:
task_data: 任务数据 (包含 hook_url, text, etc.)
priority: 优先级 (1-5),1 为最高优先级
Returns:
str: 任务 ID
"""
task_id = str(uuid.uuid4())
task = {
'task_id': task_id,
'priority': priority,
'created_at': time.time(),
'data': task_data
}
score = _calculate_score(priority)
# 管道操作,一次网络请求完成两件事(原子性 + 性能高)
pipe = self.redis_client.pipeline()
# 1. ZSET:用于优先级排序和弹出
pipe.zadd(self.process_queue_key, {task_id: score})
# 2. HASH:专门存 uuid -> 完整 JSON(支持快速读取/修改/删除)
pipe.hset(self.process_hash_queue_key, task_id, json.dumps(task))
# 执行
pipe.execute()
logger.info(f"Task {task_id} added to process queue with priority {priority}, score {score}")
return task_id
def get_process_task(self) -> Optional[Dict[str, Any]]:
"""
从处理队列 (ZSET) 中获取下一个任务
Returns:
Optional[Dict]: 任务数据,如果队列为空返回 None
"""
# 使用 ZPOPMAX 获取得分最高的任务
result = self.redis_client.zpopmax(self.process_queue_key, 1)
if not result:
return None
task_id, _ = result[0]
pipe = self.redis_client.pipeline()
pipe.hget(self.process_hash_queue_key, task_id)
pipe.hdel(self.process_hash_queue_key, task_id)
task_json, _ = pipe.execute()
task = json.loads(task_json)
logger.info(f"Task {task.get('task_id', 'Unknown')} retrieved from process queue.")
return task
def push_upload_task(self, task_result: Dict[str, Any]):
"""
将处理结果推送到上传队列 (List)
Args:
task_result: 任务处理结果 (包含 task_id, output_paths, hook_url等)
"""
self.redis_client.lpush(self.upload_queue_key, json.dumps(task_result))
logger.info(f"Task {task_result['task_id']} pushed to upload queue.")
def get_upload_task(self, timeout: int = 5) -> Optional[Dict[str, Any]]:
"""
从上传队列 (List) 中获取任务,阻塞等待
Args:
timeout: 阻塞等待时间 (秒)
Returns:
Optional[Dict]: 任务数据,如果超时返回 None
"""
# 使用 BRPOP 阻塞弹出
result = self.redis_client.brpop(self.upload_queue_key, timeout)
if not result:
return None
# BRPOP 返回 (key, value)
task_json = result[1]
task = json.loads(task_json)
logger.debug(f"Task {task.get('task_id', 'Unknown')} retrieved from upload queue.")
return task
def get_process_queue_stats(self) -> Dict[str, Any]:
"""
获取处理队列统计信息
"""
queued_count = self.redis_client.zcard(self.process_queue_key)
upload_count = self.redis_client.llen(self.upload_queue_key)
return {
'process_queued': queued_count,
'upload_queued': upload_count,
'timestamp': time.time()
}
def delete_process_task(self, task_id: str) -> bool:
"""
根据 task_id 安全、快速、原子地删除任务(推荐生产写法)
"""
pipe = self.redis_client.pipeline()
# 1. 先查 HASH 中是否存在(O(1))
pipe.hget(self.process_hash_queue_key, task_id)
# 2. 同时从 ZSET 删除(即使 HASH 已不存在也能删干净)
pipe.zrem(self.process_queue_key, task_id)
json_str, zrem_count = pipe.execute()
if json_str is not None:
# 任务存在于 HASH,说明之前没被消费,真正需要删除
self.redis_client.hdel(self.process_hash_queue_key, task_id)
logger.warning(f"Task {task_id} successfully removed from queue (cancelled).")
return True
if zrem_count > 0:
# 任务可能已经被消费了,但 ZSET 里还有残留(异常情况),也算清理成功
logger.info(f"Task {task_id} only existed in ZSET (stale), cleaned up.")
return True
logger.info(f"Task {task_id} not found in queue.")
return False