Gradii's picture
extracted backend folder
eb43ce0
Raw
History Blame Contribute Delete
2.45 kB
import logging
from typing import Optional, Any, Dict
import json
import redis.asyncio as redis
from app.core.config import get_settings
logger = logging.getLogger(__name__)
class QueueService:
"""
Queue service for managing asynchronous analysis tasks.
Currently uses in-memory queue, can be extended to use Redis.
"""
def __init__(self):
"""Initialize the queue service."""
self.settings = get_settings()
self.redis_client = None
if self.settings.REDIS_ENABLED:
self._initialize_redis()
def _initialize_redis(self):
try:
self.redis_client = redis.from_url(
self.settings.REDIS_URL, decode_responses=True
)
logger.info(
f"Redis queue service initialized successfully: {self.settings.REDIS_URL}"
)
except Exception as e:
logger.error(f"Failed to initialize Redis client: {e}")
self.redis_client = None
async def enqueue_analysis(
self,
file_url: str,
model: str,
task_id: str,
) -> bool:
"""
Enqueue an analysis task (future background worker implementation).
"""
task_data = {
"task_id": task_id,
"file_url": file_url,
"model": model,
}
logger.info(f"Enqueuing analysis task: {task_id}")
if self.settings.REDIS_ENABLED and self.redis_client:
await self.redis_client.lpush(
self.settings.REDIS_QUEUE_NAME,
json.dumps(task_data)
)
return True
return False
async def get_task_result(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
Get analysis result for a task.
"""
logger.info(f"Retrieving result for task: {task_id}")
if self.settings.REDIS_ENABLED and self.redis_client:
result = await self.redis_client.get(f"result:{task_id}")
return json.loads(result) if result else None
return None
# Singleton instance
_queue_service: Optional[QueueService] = None
def get_queue_service() -> QueueService:
"""Get or create the queue service singleton."""
global _queue_service
if _queue_service is None:
_queue_service = QueueService()
return _queue_service