File size: 6,341 Bytes
7c71fa7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import json
import logging
import time
import uuid
from typing import Dict, Optional, Any

import redis
from redis import ConnectionPool

logger = logging.getLogger("services")


def _calculate_score(priority: int) -> float:
    """
    计算任务得分 (优先级)
    """
    timestamp = time.time()
    score = (6 - priority) * timestamp
    return score


class QueueManager:
    """Redis 队列管理器"""

    def __init__(self, redis_config: Dict[str, Any]):
        """
        初始化队列管理器

        Args:
            redis_config: Redis 配置字典
        """
        # 提取 Redis 连接参数
        host = redis_config['host']
        port = redis_config['port']
        db = redis_config['db']
        password = redis_config.get('password')

        max_connections = redis_config.get('max_connections', 10)

        # 1. 创建 Redis 连接池
        self.pool = ConnectionPool(
            host=host,
            port=port,
            db=db,
            password=password,
            max_connections=max_connections,
            decode_responses=True
        )

        self.redis_client = redis.Redis(connection_pool=self.pool)

        self.process_queue_key = redis_config['queue_key']

        self.process_hash_queue_key = redis_config['queue_key'] + '_hash'

        self.upload_queue_key = redis_config['upload_queue_key']

        logger.info(
            f"QueueManager initialized with Connection Pool (Max={max_connections}): "
            f"Process Queue={self.process_queue_key}, Upload Queue={self.upload_queue_key}")

        try:
            self.redis_client.ping()
            logger.info("Redis connection successful.")
        except Exception as e:
            logger.error(f"Failed to connect to Redis: {e}")
            raise

    def add_task(self, task_data: Dict[str, Any], priority: int = 3) -> str:
        """
        添加任务到处理队列

        Args:
            task_data: 任务数据 (包含 hook_url, text, etc.)
            priority: 优先级 (1-5),1 为最高优先级

        Returns:
            str: 任务 ID
        """
        task_id = str(uuid.uuid4())

        task = {
            'task_id': task_id,
            'priority': priority,
            'created_at': time.time(),
            'data': task_data
        }

        score = _calculate_score(priority)

        # 管道操作,一次网络请求完成两件事(原子性 + 性能高)
        pipe = self.redis_client.pipeline()

        # 1. ZSET:用于优先级排序和弹出
        pipe.zadd(self.process_queue_key, {task_id: score})

        # 2. HASH:专门存 uuid -> 完整 JSON(支持快速读取/修改/删除)
        pipe.hset(self.process_hash_queue_key, task_id, json.dumps(task))

        # 执行
        pipe.execute()

        logger.info(f"Task {task_id} added to process queue with priority {priority}, score {score}")

        return task_id

    def get_process_task(self) -> Optional[Dict[str, Any]]:
        """
        从处理队列 (ZSET) 中获取下一个任务

        Returns:
            Optional[Dict]: 任务数据,如果队列为空返回 None
        """
        # 使用 ZPOPMAX 获取得分最高的任务
        result = self.redis_client.zpopmax(self.process_queue_key, 1)

        if not result:
            return None

        task_id, _ = result[0]

        pipe = self.redis_client.pipeline()
        pipe.hget(self.process_hash_queue_key, task_id)
        pipe.hdel(self.process_hash_queue_key, task_id)

        task_json, _ = pipe.execute()

        task = json.loads(task_json)

        logger.info(f"Task {task.get('task_id', 'Unknown')} retrieved from process queue.")
        return task

    def push_upload_task(self, task_result: Dict[str, Any]):
        """
        将处理结果推送到上传队列 (List)

        Args:
            task_result: 任务处理结果 (包含 task_id, output_paths, hook_url等)
        """
        self.redis_client.lpush(self.upload_queue_key, json.dumps(task_result))
        logger.info(f"Task {task_result['task_id']} pushed to upload queue.")

    def get_upload_task(self, timeout: int = 5) -> Optional[Dict[str, Any]]:
        """
        从上传队列 (List) 中获取任务,阻塞等待

        Args:
            timeout: 阻塞等待时间 (秒)

        Returns:
            Optional[Dict]: 任务数据,如果超时返回 None
        """
        # 使用 BRPOP 阻塞弹出
        result = self.redis_client.brpop(self.upload_queue_key, timeout)

        if not result:
            return None

        # BRPOP 返回 (key, value)
        task_json = result[1]
        task = json.loads(task_json)

        logger.debug(f"Task {task.get('task_id', 'Unknown')} retrieved from upload queue.")
        return task

    def get_process_queue_stats(self) -> Dict[str, Any]:
        """
        获取处理队列统计信息
        """
        queued_count = self.redis_client.zcard(self.process_queue_key)
        upload_count = self.redis_client.llen(self.upload_queue_key)

        return {
            'process_queued': queued_count,
            'upload_queued': upload_count,
            'timestamp': time.time()
        }

    def delete_process_task(self, task_id: str) -> bool:
        """
        根据 task_id 安全、快速、原子地删除任务(推荐生产写法)
        """
        pipe = self.redis_client.pipeline()

        # 1. 先查 HASH 中是否存在(O(1))
        pipe.hget(self.process_hash_queue_key, task_id)

        # 2. 同时从 ZSET 删除(即使 HASH 已不存在也能删干净)
        pipe.zrem(self.process_queue_key, task_id)

        json_str, zrem_count = pipe.execute()

        if json_str is not None:
            # 任务存在于 HASH,说明之前没被消费,真正需要删除
            self.redis_client.hdel(self.process_hash_queue_key, task_id)
            logger.warning(f"Task {task_id} successfully removed from queue (cancelled).")
            return True

        if zrem_count > 0:
            # 任务可能已经被消费了,但 ZSET 里还有残留(异常情况),也算清理成功
            logger.info(f"Task {task_id} only existed in ZSET (stale), cleaned up.")
            return True

        logger.info(f"Task {task_id} not found in queue.")
        return False