Spaces:
Running
Running
| import logging | |
| from typing import Optional, Any, Dict | |
| import json | |
| import redis.asyncio as redis | |
| from app.core.config import get_settings | |
| logger = logging.getLogger(__name__) | |
| class QueueService: | |
| """ | |
| Queue service for managing asynchronous analysis tasks. | |
| Currently uses in-memory queue, can be extended to use Redis. | |
| """ | |
| def __init__(self): | |
| """Initialize the queue service.""" | |
| self.settings = get_settings() | |
| self.redis_client = None | |
| if self.settings.REDIS_ENABLED: | |
| self._initialize_redis() | |
| def _initialize_redis(self): | |
| try: | |
| self.redis_client = redis.from_url( | |
| self.settings.REDIS_URL, decode_responses=True | |
| ) | |
| logger.info( | |
| f"Redis queue service initialized successfully: {self.settings.REDIS_URL}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Redis client: {e}") | |
| self.redis_client = None | |
| async def enqueue_analysis( | |
| self, | |
| file_url: str, | |
| model: str, | |
| task_id: str, | |
| ) -> bool: | |
| """ | |
| Enqueue an analysis task (future background worker implementation). | |
| """ | |
| task_data = { | |
| "task_id": task_id, | |
| "file_url": file_url, | |
| "model": model, | |
| } | |
| logger.info(f"Enqueuing analysis task: {task_id}") | |
| if self.settings.REDIS_ENABLED and self.redis_client: | |
| await self.redis_client.lpush( | |
| self.settings.REDIS_QUEUE_NAME, | |
| json.dumps(task_data) | |
| ) | |
| return True | |
| return False | |
| async def get_task_result(self, task_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get analysis result for a task. | |
| """ | |
| logger.info(f"Retrieving result for task: {task_id}") | |
| if self.settings.REDIS_ENABLED and self.redis_client: | |
| result = await self.redis_client.get(f"result:{task_id}") | |
| return json.loads(result) if result else None | |
| return None | |
| # Singleton instance | |
| _queue_service: Optional[QueueService] = None | |
| def get_queue_service() -> QueueService: | |
| """Get or create the queue service singleton.""" | |
| global _queue_service | |
| if _queue_service is None: | |
| _queue_service = QueueService() | |
| return _queue_service | |