File size: 6,803 Bytes
c8b1f17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import os
import json
import logging
from typing import Dict, List, Optional
import uuid
try:
    import redis
except ImportError:
    redis = None

from config import CACHE_CONFIG
from database.models import TaskQueue, SessionLocal

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class QueueManager:
    """
    队列管理器
    支持Redis队列,自动降级为文件队列(基于SQLite),最后降级为内存队列
    """
    
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(QueueManager, cls).__new__(cls)
            cls._instance._initialize()
        return cls._instance
    
    def _initialize(self):
        self.backend = CACHE_CONFIG.get("backend", "memory")
        self.config = CACHE_CONFIG.get("redis", {})
        self.redis_available = False
        
        if self.backend == "redis" and redis is not None:
            try:
                # 初始化Redis连接池
                pool = redis.ConnectionPool(
                    host=self.config.get("host", "localhost"),
                    port=self.config.get("port", 6379),
                    password=self.config.get("password"),
                    db=self.config.get("db", 0),
                    max_connections=self.config.get("max_connections", 100),
                    socket_timeout=self.config.get("socket_timeout", 5)
                )
                self.client = redis.Redis(connection_pool=pool)
                # 测试连接
                self.client.ping()
                self.redis_available = True
                logger.info("Redis连接成功,已启用Redis队列模式")
            except Exception as e:
                logger.error(f"Redis连接失败: {e},将使用文件队列作为降级方案")
                self.backend = "file"
                self.redis_available = False
        elif self.backend == "redis" and redis is None:
             logger.warning("未检测到redis库,自动降级为文件队列")
             self.backend = "file"
             self.redis_available = False
        
        if self.backend == "memory":
             logger.info("使用内存队列模式")
             self.memory_queue = []
        elif self.backend == "file":
             logger.info("使用文件队列模式 (SQLite)")

    def _get_db(self):
        return SessionLocal()

    def _enqueue_file(self, queue_name: str, item: str) -> bool:
        db = self._get_db()
        try:
            task = TaskQueue(
                task_id=str(uuid.uuid4()),
                queue_name=queue_name,
                payload=item,
                status="queued"
            )
            db.add(task)
            db.commit()
            return True
        except Exception as e:
            logger.error(f"文件入队失败: {e}")
            db.rollback()
            return False
        finally:
            db.close()

    def _dequeue_file(self, queue_name: str) -> Optional[str]:
        db = self._get_db()
        try:
            # 获取最早的一个queued任务
            task = db.query(TaskQueue).filter(
                TaskQueue.queue_name == queue_name,
                TaskQueue.status == "queued"
            ).order_by(TaskQueue.created_at.asc()).first()

            if task:
                # 模拟出队(删除任务)
                payload = task.payload
                db.delete(task)
                db.commit()
                return payload
            return None
        except Exception as e:
            logger.error(f"文件出队失败: {e}")
            db.rollback()
            return None
        finally:
            db.close()

    def enqueue(self, queue_name: str, item: str) -> bool:
        """入队"""
        # 优先尝试Redis
        if self.redis_available:
            try:
                self.client.rpush(queue_name, item)
                return True
            except Exception as e:
                logger.error(f"Redis入队异常: {e},尝试降级到文件队列")
                self.redis_available = False # 标记Redis不可用
                # 降级到文件
                return self._enqueue_file(queue_name, item)
        
        # 降级策略
        if self.backend == "file" or (not self.redis_available and self.backend == "redis"):
            return self._enqueue_file(queue_name, item)
        else:
            # 内存模式
            try:
                self.memory_queue.append({"queue": queue_name, "item": item})
                return True
            except Exception as e:
                logger.error(f"内存入队失败: {e}")
                return False
            
    def dequeue(self, queue_name: str) -> Optional[str]:
        """出队"""
        # 优先尝试Redis
        if self.redis_available:
            try:
                # 阻塞式出队,超时时间1秒
                result = self.client.blpop(queue_name, timeout=1)
                if result:
                    return result[1].decode('utf-8')
                return None
            except Exception as e:
                logger.error(f"Redis出队异常: {e},尝试降级到文件队列")
                self.redis_available = False
                return self._dequeue_file(queue_name)
        
        # 降级策略
        if self.backend == "file" or (not self.redis_available and self.backend == "redis"):
             return self._dequeue_file(queue_name)
        else:
            # 内存模式
            try:
                for i, task in enumerate(self.memory_queue):
                    if task["queue"] == queue_name:
                        return self.memory_queue.pop(i)["item"]
                return None
            except Exception as e:
                logger.error(f"内存出队失败: {e}")
                return None
            
    def get_queue_length(self, queue_name: str) -> int:
        """获取队列长度"""
        try:
            if self.redis_available:
                return self.client.llen(queue_name)
            elif self.backend == "file" or (not self.redis_available and self.backend == "redis"):
                 db = self._get_db()
                 try:
                     count = db.query(TaskQueue).filter(
                         TaskQueue.queue_name == queue_name,
                         TaskQueue.status == "queued"
                     ).count()
                     return count
                 finally:
                     db.close()
            else:
                return len([t for t in self.memory_queue if t["queue"] == queue_name])
        except Exception as e:
            logger.error(f"获取队列长度失败: {e}")
            return 0

# 全局实例
queue_manager = QueueManager()