import os import json import logging from typing import Dict, List, Optional import uuid try: import redis except ImportError: redis = None from config import CACHE_CONFIG from database.models import TaskQueue, SessionLocal # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class QueueManager: """ 队列管理器 支持Redis队列,自动降级为文件队列(基于SQLite),最后降级为内存队列 """ _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(QueueManager, cls).__new__(cls) cls._instance._initialize() return cls._instance def _initialize(self): self.backend = CACHE_CONFIG.get("backend", "memory") self.config = CACHE_CONFIG.get("redis", {}) self.redis_available = False if self.backend == "redis" and redis is not None: try: # 初始化Redis连接池 pool = redis.ConnectionPool( host=self.config.get("host", "localhost"), port=self.config.get("port", 6379), password=self.config.get("password"), db=self.config.get("db", 0), max_connections=self.config.get("max_connections", 100), socket_timeout=self.config.get("socket_timeout", 5) ) self.client = redis.Redis(connection_pool=pool) # 测试连接 self.client.ping() self.redis_available = True logger.info("Redis连接成功,已启用Redis队列模式") except Exception as e: logger.error(f"Redis连接失败: {e},将使用文件队列作为降级方案") self.backend = "file" self.redis_available = False elif self.backend == "redis" and redis is None: logger.warning("未检测到redis库,自动降级为文件队列") self.backend = "file" self.redis_available = False if self.backend == "memory": logger.info("使用内存队列模式") self.memory_queue = [] elif self.backend == "file": logger.info("使用文件队列模式 (SQLite)") def _get_db(self): return SessionLocal() def _enqueue_file(self, queue_name: str, item: str) -> bool: db = self._get_db() try: task = TaskQueue( task_id=str(uuid.uuid4()), queue_name=queue_name, payload=item, status="queued" ) db.add(task) db.commit() return True except Exception as e: logger.error(f"文件入队失败: {e}") db.rollback() return False finally: db.close() def _dequeue_file(self, queue_name: str) -> Optional[str]: db = self._get_db() try: # 获取最早的一个queued任务 task = db.query(TaskQueue).filter( TaskQueue.queue_name == queue_name, TaskQueue.status == "queued" ).order_by(TaskQueue.created_at.asc()).first() if task: # 模拟出队(删除任务) payload = task.payload db.delete(task) db.commit() return payload return None except Exception as e: logger.error(f"文件出队失败: {e}") db.rollback() return None finally: db.close() def enqueue(self, queue_name: str, item: str) -> bool: """入队""" # 优先尝试Redis if self.redis_available: try: self.client.rpush(queue_name, item) return True except Exception as e: logger.error(f"Redis入队异常: {e},尝试降级到文件队列") self.redis_available = False # 标记Redis不可用 # 降级到文件 return self._enqueue_file(queue_name, item) # 降级策略 if self.backend == "file" or (not self.redis_available and self.backend == "redis"): return self._enqueue_file(queue_name, item) else: # 内存模式 try: self.memory_queue.append({"queue": queue_name, "item": item}) return True except Exception as e: logger.error(f"内存入队失败: {e}") return False def dequeue(self, queue_name: str) -> Optional[str]: """出队""" # 优先尝试Redis if self.redis_available: try: # 阻塞式出队,超时时间1秒 result = self.client.blpop(queue_name, timeout=1) if result: return result[1].decode('utf-8') return None except Exception as e: logger.error(f"Redis出队异常: {e},尝试降级到文件队列") self.redis_available = False return self._dequeue_file(queue_name) # 降级策略 if self.backend == "file" or (not self.redis_available and self.backend == "redis"): return self._dequeue_file(queue_name) else: # 内存模式 try: for i, task in enumerate(self.memory_queue): if task["queue"] == queue_name: return self.memory_queue.pop(i)["item"] return None except Exception as e: logger.error(f"内存出队失败: {e}") return None def get_queue_length(self, queue_name: str) -> int: """获取队列长度""" try: if self.redis_available: return self.client.llen(queue_name) elif self.backend == "file" or (not self.redis_available and self.backend == "redis"): db = self._get_db() try: count = db.query(TaskQueue).filter( TaskQueue.queue_name == queue_name, TaskQueue.status == "queued" ).count() return count finally: db.close() else: return len([t for t in self.memory_queue if t["queue"] == queue_name]) except Exception as e: logger.error(f"获取队列长度失败: {e}") return 0 # 全局实例 queue_manager = QueueManager()