hr-eval-api-v2 / services /queue_manager.py
KarenYYH
Initial commit - HR Evaluation API v2
c8b1f17
import os
import json
import logging
from typing import Dict, List, Optional
import uuid
try:
import redis
except ImportError:
redis = None
from config import CACHE_CONFIG
from database.models import TaskQueue, SessionLocal
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class QueueManager:
"""
队列管理器
支持Redis队列,自动降级为文件队列(基于SQLite),最后降级为内存队列
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(QueueManager, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
self.backend = CACHE_CONFIG.get("backend", "memory")
self.config = CACHE_CONFIG.get("redis", {})
self.redis_available = False
if self.backend == "redis" and redis is not None:
try:
# 初始化Redis连接池
pool = redis.ConnectionPool(
host=self.config.get("host", "localhost"),
port=self.config.get("port", 6379),
password=self.config.get("password"),
db=self.config.get("db", 0),
max_connections=self.config.get("max_connections", 100),
socket_timeout=self.config.get("socket_timeout", 5)
)
self.client = redis.Redis(connection_pool=pool)
# 测试连接
self.client.ping()
self.redis_available = True
logger.info("Redis连接成功,已启用Redis队列模式")
except Exception as e:
logger.error(f"Redis连接失败: {e},将使用文件队列作为降级方案")
self.backend = "file"
self.redis_available = False
elif self.backend == "redis" and redis is None:
logger.warning("未检测到redis库,自动降级为文件队列")
self.backend = "file"
self.redis_available = False
if self.backend == "memory":
logger.info("使用内存队列模式")
self.memory_queue = []
elif self.backend == "file":
logger.info("使用文件队列模式 (SQLite)")
def _get_db(self):
return SessionLocal()
def _enqueue_file(self, queue_name: str, item: str) -> bool:
db = self._get_db()
try:
task = TaskQueue(
task_id=str(uuid.uuid4()),
queue_name=queue_name,
payload=item,
status="queued"
)
db.add(task)
db.commit()
return True
except Exception as e:
logger.error(f"文件入队失败: {e}")
db.rollback()
return False
finally:
db.close()
def _dequeue_file(self, queue_name: str) -> Optional[str]:
db = self._get_db()
try:
# 获取最早的一个queued任务
task = db.query(TaskQueue).filter(
TaskQueue.queue_name == queue_name,
TaskQueue.status == "queued"
).order_by(TaskQueue.created_at.asc()).first()
if task:
# 模拟出队(删除任务)
payload = task.payload
db.delete(task)
db.commit()
return payload
return None
except Exception as e:
logger.error(f"文件出队失败: {e}")
db.rollback()
return None
finally:
db.close()
def enqueue(self, queue_name: str, item: str) -> bool:
"""入队"""
# 优先尝试Redis
if self.redis_available:
try:
self.client.rpush(queue_name, item)
return True
except Exception as e:
logger.error(f"Redis入队异常: {e},尝试降级到文件队列")
self.redis_available = False # 标记Redis不可用
# 降级到文件
return self._enqueue_file(queue_name, item)
# 降级策略
if self.backend == "file" or (not self.redis_available and self.backend == "redis"):
return self._enqueue_file(queue_name, item)
else:
# 内存模式
try:
self.memory_queue.append({"queue": queue_name, "item": item})
return True
except Exception as e:
logger.error(f"内存入队失败: {e}")
return False
def dequeue(self, queue_name: str) -> Optional[str]:
"""出队"""
# 优先尝试Redis
if self.redis_available:
try:
# 阻塞式出队,超时时间1秒
result = self.client.blpop(queue_name, timeout=1)
if result:
return result[1].decode('utf-8')
return None
except Exception as e:
logger.error(f"Redis出队异常: {e},尝试降级到文件队列")
self.redis_available = False
return self._dequeue_file(queue_name)
# 降级策略
if self.backend == "file" or (not self.redis_available and self.backend == "redis"):
return self._dequeue_file(queue_name)
else:
# 内存模式
try:
for i, task in enumerate(self.memory_queue):
if task["queue"] == queue_name:
return self.memory_queue.pop(i)["item"]
return None
except Exception as e:
logger.error(f"内存出队失败: {e}")
return None
def get_queue_length(self, queue_name: str) -> int:
"""获取队列长度"""
try:
if self.redis_available:
return self.client.llen(queue_name)
elif self.backend == "file" or (not self.redis_available and self.backend == "redis"):
db = self._get_db()
try:
count = db.query(TaskQueue).filter(
TaskQueue.queue_name == queue_name,
TaskQueue.status == "queued"
).count()
return count
finally:
db.close()
else:
return len([t for t in self.memory_queue if t["queue"] == queue_name])
except Exception as e:
logger.error(f"获取队列长度失败: {e}")
return 0
# 全局实例
queue_manager = QueueManager()