Spaces:
Sleeping
Sleeping
File size: 6,803 Bytes
c8b1f17 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
import os
import json
import logging
from typing import Dict, List, Optional
import uuid
try:
import redis
except ImportError:
redis = None
from config import CACHE_CONFIG
from database.models import TaskQueue, SessionLocal
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class QueueManager:
"""
队列管理器
支持Redis队列,自动降级为文件队列(基于SQLite),最后降级为内存队列
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(QueueManager, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
self.backend = CACHE_CONFIG.get("backend", "memory")
self.config = CACHE_CONFIG.get("redis", {})
self.redis_available = False
if self.backend == "redis" and redis is not None:
try:
# 初始化Redis连接池
pool = redis.ConnectionPool(
host=self.config.get("host", "localhost"),
port=self.config.get("port", 6379),
password=self.config.get("password"),
db=self.config.get("db", 0),
max_connections=self.config.get("max_connections", 100),
socket_timeout=self.config.get("socket_timeout", 5)
)
self.client = redis.Redis(connection_pool=pool)
# 测试连接
self.client.ping()
self.redis_available = True
logger.info("Redis连接成功,已启用Redis队列模式")
except Exception as e:
logger.error(f"Redis连接失败: {e},将使用文件队列作为降级方案")
self.backend = "file"
self.redis_available = False
elif self.backend == "redis" and redis is None:
logger.warning("未检测到redis库,自动降级为文件队列")
self.backend = "file"
self.redis_available = False
if self.backend == "memory":
logger.info("使用内存队列模式")
self.memory_queue = []
elif self.backend == "file":
logger.info("使用文件队列模式 (SQLite)")
def _get_db(self):
return SessionLocal()
def _enqueue_file(self, queue_name: str, item: str) -> bool:
db = self._get_db()
try:
task = TaskQueue(
task_id=str(uuid.uuid4()),
queue_name=queue_name,
payload=item,
status="queued"
)
db.add(task)
db.commit()
return True
except Exception as e:
logger.error(f"文件入队失败: {e}")
db.rollback()
return False
finally:
db.close()
def _dequeue_file(self, queue_name: str) -> Optional[str]:
db = self._get_db()
try:
# 获取最早的一个queued任务
task = db.query(TaskQueue).filter(
TaskQueue.queue_name == queue_name,
TaskQueue.status == "queued"
).order_by(TaskQueue.created_at.asc()).first()
if task:
# 模拟出队(删除任务)
payload = task.payload
db.delete(task)
db.commit()
return payload
return None
except Exception as e:
logger.error(f"文件出队失败: {e}")
db.rollback()
return None
finally:
db.close()
def enqueue(self, queue_name: str, item: str) -> bool:
"""入队"""
# 优先尝试Redis
if self.redis_available:
try:
self.client.rpush(queue_name, item)
return True
except Exception as e:
logger.error(f"Redis入队异常: {e},尝试降级到文件队列")
self.redis_available = False # 标记Redis不可用
# 降级到文件
return self._enqueue_file(queue_name, item)
# 降级策略
if self.backend == "file" or (not self.redis_available and self.backend == "redis"):
return self._enqueue_file(queue_name, item)
else:
# 内存模式
try:
self.memory_queue.append({"queue": queue_name, "item": item})
return True
except Exception as e:
logger.error(f"内存入队失败: {e}")
return False
def dequeue(self, queue_name: str) -> Optional[str]:
"""出队"""
# 优先尝试Redis
if self.redis_available:
try:
# 阻塞式出队,超时时间1秒
result = self.client.blpop(queue_name, timeout=1)
if result:
return result[1].decode('utf-8')
return None
except Exception as e:
logger.error(f"Redis出队异常: {e},尝试降级到文件队列")
self.redis_available = False
return self._dequeue_file(queue_name)
# 降级策略
if self.backend == "file" or (not self.redis_available and self.backend == "redis"):
return self._dequeue_file(queue_name)
else:
# 内存模式
try:
for i, task in enumerate(self.memory_queue):
if task["queue"] == queue_name:
return self.memory_queue.pop(i)["item"]
return None
except Exception as e:
logger.error(f"内存出队失败: {e}")
return None
def get_queue_length(self, queue_name: str) -> int:
"""获取队列长度"""
try:
if self.redis_available:
return self.client.llen(queue_name)
elif self.backend == "file" or (not self.redis_available and self.backend == "redis"):
db = self._get_db()
try:
count = db.query(TaskQueue).filter(
TaskQueue.queue_name == queue_name,
TaskQueue.status == "queued"
).count()
return count
finally:
db.close()
else:
return len([t for t in self.memory_queue if t["queue"] == queue_name])
except Exception as e:
logger.error(f"获取队列长度失败: {e}")
return 0
# 全局实例
queue_manager = QueueManager()
|