|
|
import json |
|
|
import logging |
|
|
import time |
|
|
import uuid |
|
|
from typing import Dict, Optional, Any |
|
|
|
|
|
import redis |
|
|
from redis import ConnectionPool |
|
|
|
|
|
logger = logging.getLogger("services") |
|
|
|
|
|
|
|
|
def _calculate_score(priority: int) -> float: |
|
|
""" |
|
|
计算任务得分 (优先级) |
|
|
""" |
|
|
timestamp = time.time() |
|
|
score = (6 - priority) * timestamp |
|
|
return score |
|
|
|
|
|
|
|
|
class QueueManager: |
|
|
"""Redis 队列管理器""" |
|
|
|
|
|
def __init__(self, redis_config: Dict[str, Any]): |
|
|
""" |
|
|
初始化队列管理器 |
|
|
|
|
|
Args: |
|
|
redis_config: Redis 配置字典 |
|
|
""" |
|
|
|
|
|
host = redis_config['host'] |
|
|
port = redis_config['port'] |
|
|
db = redis_config['db'] |
|
|
password = redis_config.get('password') |
|
|
|
|
|
max_connections = redis_config.get('max_connections', 10) |
|
|
|
|
|
|
|
|
self.pool = ConnectionPool( |
|
|
host=host, |
|
|
port=port, |
|
|
db=db, |
|
|
password=password, |
|
|
max_connections=max_connections, |
|
|
decode_responses=True |
|
|
) |
|
|
|
|
|
self.redis_client = redis.Redis(connection_pool=self.pool) |
|
|
|
|
|
self.process_queue_key = redis_config['queue_key'] |
|
|
|
|
|
self.process_hash_queue_key = redis_config['queue_key'] + '_hash' |
|
|
|
|
|
self.upload_queue_key = redis_config['upload_queue_key'] |
|
|
|
|
|
logger.info( |
|
|
f"QueueManager initialized with Connection Pool (Max={max_connections}): " |
|
|
f"Process Queue={self.process_queue_key}, Upload Queue={self.upload_queue_key}") |
|
|
|
|
|
try: |
|
|
self.redis_client.ping() |
|
|
logger.info("Redis connection successful.") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to connect to Redis: {e}") |
|
|
raise |
|
|
|
|
|
def add_task(self, task_data: Dict[str, Any], priority: int = 3) -> str: |
|
|
""" |
|
|
添加任务到处理队列 |
|
|
|
|
|
Args: |
|
|
task_data: 任务数据 (包含 hook_url, text, etc.) |
|
|
priority: 优先级 (1-5),1 为最高优先级 |
|
|
|
|
|
Returns: |
|
|
str: 任务 ID |
|
|
""" |
|
|
task_id = str(uuid.uuid4()) |
|
|
|
|
|
task = { |
|
|
'task_id': task_id, |
|
|
'priority': priority, |
|
|
'created_at': time.time(), |
|
|
'data': task_data |
|
|
} |
|
|
|
|
|
score = _calculate_score(priority) |
|
|
|
|
|
|
|
|
pipe = self.redis_client.pipeline() |
|
|
|
|
|
|
|
|
pipe.zadd(self.process_queue_key, {task_id: score}) |
|
|
|
|
|
|
|
|
pipe.hset(self.process_hash_queue_key, task_id, json.dumps(task)) |
|
|
|
|
|
|
|
|
pipe.execute() |
|
|
|
|
|
logger.info(f"Task {task_id} added to process queue with priority {priority}, score {score}") |
|
|
|
|
|
return task_id |
|
|
|
|
|
def get_process_task(self) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
从处理队列 (ZSET) 中获取下一个任务 |
|
|
|
|
|
Returns: |
|
|
Optional[Dict]: 任务数据,如果队列为空返回 None |
|
|
""" |
|
|
|
|
|
result = self.redis_client.zpopmax(self.process_queue_key, 1) |
|
|
|
|
|
if not result: |
|
|
return None |
|
|
|
|
|
task_id, _ = result[0] |
|
|
|
|
|
pipe = self.redis_client.pipeline() |
|
|
pipe.hget(self.process_hash_queue_key, task_id) |
|
|
pipe.hdel(self.process_hash_queue_key, task_id) |
|
|
|
|
|
task_json, _ = pipe.execute() |
|
|
|
|
|
task = json.loads(task_json) |
|
|
|
|
|
logger.info(f"Task {task.get('task_id', 'Unknown')} retrieved from process queue.") |
|
|
return task |
|
|
|
|
|
def push_upload_task(self, task_result: Dict[str, Any]): |
|
|
""" |
|
|
将处理结果推送到上传队列 (List) |
|
|
|
|
|
Args: |
|
|
task_result: 任务处理结果 (包含 task_id, output_paths, hook_url等) |
|
|
""" |
|
|
self.redis_client.lpush(self.upload_queue_key, json.dumps(task_result)) |
|
|
logger.info(f"Task {task_result['task_id']} pushed to upload queue.") |
|
|
|
|
|
def get_upload_task(self, timeout: int = 5) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
从上传队列 (List) 中获取任务,阻塞等待 |
|
|
|
|
|
Args: |
|
|
timeout: 阻塞等待时间 (秒) |
|
|
|
|
|
Returns: |
|
|
Optional[Dict]: 任务数据,如果超时返回 None |
|
|
""" |
|
|
|
|
|
result = self.redis_client.brpop(self.upload_queue_key, timeout) |
|
|
|
|
|
if not result: |
|
|
return None |
|
|
|
|
|
|
|
|
task_json = result[1] |
|
|
task = json.loads(task_json) |
|
|
|
|
|
logger.debug(f"Task {task.get('task_id', 'Unknown')} retrieved from upload queue.") |
|
|
return task |
|
|
|
|
|
def get_process_queue_stats(self) -> Dict[str, Any]: |
|
|
""" |
|
|
获取处理队列统计信息 |
|
|
""" |
|
|
queued_count = self.redis_client.zcard(self.process_queue_key) |
|
|
upload_count = self.redis_client.llen(self.upload_queue_key) |
|
|
|
|
|
return { |
|
|
'process_queued': queued_count, |
|
|
'upload_queued': upload_count, |
|
|
'timestamp': time.time() |
|
|
} |
|
|
|
|
|
def delete_process_task(self, task_id: str) -> bool: |
|
|
""" |
|
|
根据 task_id 安全、快速、原子地删除任务(推荐生产写法) |
|
|
""" |
|
|
pipe = self.redis_client.pipeline() |
|
|
|
|
|
|
|
|
pipe.hget(self.process_hash_queue_key, task_id) |
|
|
|
|
|
|
|
|
pipe.zrem(self.process_queue_key, task_id) |
|
|
|
|
|
json_str, zrem_count = pipe.execute() |
|
|
|
|
|
if json_str is not None: |
|
|
|
|
|
self.redis_client.hdel(self.process_hash_queue_key, task_id) |
|
|
logger.warning(f"Task {task_id} successfully removed from queue (cancelled).") |
|
|
return True |
|
|
|
|
|
if zrem_count > 0: |
|
|
|
|
|
logger.info(f"Task {task_id} only existed in ZSET (stale), cleaned up.") |
|
|
return True |
|
|
|
|
|
logger.info(f"Task {task_id} not found in queue.") |
|
|
return False |
|
|
|