Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| from typing import Dict, List, Optional | |
| import uuid | |
| try: | |
| import redis | |
| except ImportError: | |
| redis = None | |
| from config import CACHE_CONFIG | |
| from database.models import TaskQueue, SessionLocal | |
| # 配置日志 | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class QueueManager: | |
| """ | |
| 队列管理器 | |
| 支持Redis队列,自动降级为文件队列(基于SQLite),最后降级为内存队列 | |
| """ | |
| _instance = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(QueueManager, cls).__new__(cls) | |
| cls._instance._initialize() | |
| return cls._instance | |
| def _initialize(self): | |
| self.backend = CACHE_CONFIG.get("backend", "memory") | |
| self.config = CACHE_CONFIG.get("redis", {}) | |
| self.redis_available = False | |
| if self.backend == "redis" and redis is not None: | |
| try: | |
| # 初始化Redis连接池 | |
| pool = redis.ConnectionPool( | |
| host=self.config.get("host", "localhost"), | |
| port=self.config.get("port", 6379), | |
| password=self.config.get("password"), | |
| db=self.config.get("db", 0), | |
| max_connections=self.config.get("max_connections", 100), | |
| socket_timeout=self.config.get("socket_timeout", 5) | |
| ) | |
| self.client = redis.Redis(connection_pool=pool) | |
| # 测试连接 | |
| self.client.ping() | |
| self.redis_available = True | |
| logger.info("Redis连接成功,已启用Redis队列模式") | |
| except Exception as e: | |
| logger.error(f"Redis连接失败: {e},将使用文件队列作为降级方案") | |
| self.backend = "file" | |
| self.redis_available = False | |
| elif self.backend == "redis" and redis is None: | |
| logger.warning("未检测到redis库,自动降级为文件队列") | |
| self.backend = "file" | |
| self.redis_available = False | |
| if self.backend == "memory": | |
| logger.info("使用内存队列模式") | |
| self.memory_queue = [] | |
| elif self.backend == "file": | |
| logger.info("使用文件队列模式 (SQLite)") | |
| def _get_db(self): | |
| return SessionLocal() | |
| def _enqueue_file(self, queue_name: str, item: str) -> bool: | |
| db = self._get_db() | |
| try: | |
| task = TaskQueue( | |
| task_id=str(uuid.uuid4()), | |
| queue_name=queue_name, | |
| payload=item, | |
| status="queued" | |
| ) | |
| db.add(task) | |
| db.commit() | |
| return True | |
| except Exception as e: | |
| logger.error(f"文件入队失败: {e}") | |
| db.rollback() | |
| return False | |
| finally: | |
| db.close() | |
| def _dequeue_file(self, queue_name: str) -> Optional[str]: | |
| db = self._get_db() | |
| try: | |
| # 获取最早的一个queued任务 | |
| task = db.query(TaskQueue).filter( | |
| TaskQueue.queue_name == queue_name, | |
| TaskQueue.status == "queued" | |
| ).order_by(TaskQueue.created_at.asc()).first() | |
| if task: | |
| # 模拟出队(删除任务) | |
| payload = task.payload | |
| db.delete(task) | |
| db.commit() | |
| return payload | |
| return None | |
| except Exception as e: | |
| logger.error(f"文件出队失败: {e}") | |
| db.rollback() | |
| return None | |
| finally: | |
| db.close() | |
| def enqueue(self, queue_name: str, item: str) -> bool: | |
| """入队""" | |
| # 优先尝试Redis | |
| if self.redis_available: | |
| try: | |
| self.client.rpush(queue_name, item) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Redis入队异常: {e},尝试降级到文件队列") | |
| self.redis_available = False # 标记Redis不可用 | |
| # 降级到文件 | |
| return self._enqueue_file(queue_name, item) | |
| # 降级策略 | |
| if self.backend == "file" or (not self.redis_available and self.backend == "redis"): | |
| return self._enqueue_file(queue_name, item) | |
| else: | |
| # 内存模式 | |
| try: | |
| self.memory_queue.append({"queue": queue_name, "item": item}) | |
| return True | |
| except Exception as e: | |
| logger.error(f"内存入队失败: {e}") | |
| return False | |
| def dequeue(self, queue_name: str) -> Optional[str]: | |
| """出队""" | |
| # 优先尝试Redis | |
| if self.redis_available: | |
| try: | |
| # 阻塞式出队,超时时间1秒 | |
| result = self.client.blpop(queue_name, timeout=1) | |
| if result: | |
| return result[1].decode('utf-8') | |
| return None | |
| except Exception as e: | |
| logger.error(f"Redis出队异常: {e},尝试降级到文件队列") | |
| self.redis_available = False | |
| return self._dequeue_file(queue_name) | |
| # 降级策略 | |
| if self.backend == "file" or (not self.redis_available and self.backend == "redis"): | |
| return self._dequeue_file(queue_name) | |
| else: | |
| # 内存模式 | |
| try: | |
| for i, task in enumerate(self.memory_queue): | |
| if task["queue"] == queue_name: | |
| return self.memory_queue.pop(i)["item"] | |
| return None | |
| except Exception as e: | |
| logger.error(f"内存出队失败: {e}") | |
| return None | |
| def get_queue_length(self, queue_name: str) -> int: | |
| """获取队列长度""" | |
| try: | |
| if self.redis_available: | |
| return self.client.llen(queue_name) | |
| elif self.backend == "file" or (not self.redis_available and self.backend == "redis"): | |
| db = self._get_db() | |
| try: | |
| count = db.query(TaskQueue).filter( | |
| TaskQueue.queue_name == queue_name, | |
| TaskQueue.status == "queued" | |
| ).count() | |
| return count | |
| finally: | |
| db.close() | |
| else: | |
| return len([t for t in self.memory_queue if t["queue"] == queue_name]) | |
| except Exception as e: | |
| logger.error(f"获取队列长度失败: {e}") | |
| return 0 | |
| # 全局实例 | |
| queue_manager = QueueManager() | |