Spaces:
Sleeping
Sleeping
| """ | |
| Task queue worker for ABSA processing. | |
| Processes jobs from Redis queue asynchronously. | |
| """ | |
| import logging | |
| import uuid | |
| import threading | |
| import time | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime, timezone | |
| from .redis_service import get_redis_service | |
| from .mongodb_service import get_mongodb_service | |
| logger = logging.getLogger(__name__) | |
| class TaskQueue: | |
| """Task queue for asynchronous ABSA processing.""" | |
| def __init__(self, data_processor=None): | |
| """ | |
| Initialize task queue. | |
| Args: | |
| data_processor: DataProcessor instance for ABSA processing | |
| """ | |
| self.redis_service = get_redis_service() | |
| self.mongodb_service = get_mongodb_service() | |
| self.data_processor = data_processor | |
| self._worker_thread: Optional[threading.Thread] = None | |
| self._stop_worker = threading.Event() | |
| def generate_job_id(self) -> str: | |
| """Generate unique job ID.""" | |
| return f"job_{uuid.uuid4().hex[:12]}" | |
| def submit_job( | |
| self, | |
| data: Dict[str, Any], | |
| device_id: str, | |
| user_id: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Submit ABSA job to queue. | |
| Args: | |
| data: Job data (e.g., CSV data, parameters) | |
| device_id: Device identifier | |
| user_id: Optional user identifier | |
| Returns: | |
| Job ID | |
| """ | |
| job_id = self.generate_job_id() | |
| # Log ANALYSIS_REQUEST event | |
| self.mongodb_service.log_event( | |
| event_type="ANALYSIS_REQUEST", | |
| device_id=device_id, | |
| user_id=user_id, | |
| metadata={"job_id": job_id} | |
| ) | |
| # Enqueue task | |
| task_data = { | |
| "device_id": device_id, | |
| "user_id": user_id, | |
| "data": data | |
| } | |
| success = self.redis_service.enqueue_task(job_id, task_data) | |
| if success: | |
| # Log TASK_QUEUED event | |
| self.mongodb_service.log_event( | |
| event_type="TASK_QUEUED", | |
| device_id=device_id, | |
| user_id=user_id, | |
| metadata={"job_id": job_id} | |
| ) | |
| logger.info(f"Job {job_id} submitted successfully") | |
| else: | |
| logger.error(f"Failed to submit job {job_id}") | |
| return job_id | |
| def get_job_status(self, job_id: str) -> Optional[str]: | |
| """ | |
| Get job status. | |
| Args: | |
| job_id: Job identifier | |
| Returns: | |
| Status string (PENDING, RUNNING, DONE, FAILED) or None | |
| """ | |
| return self.redis_service.get_task_status(job_id) | |
| def get_job_result(self, job_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get job result if completed. | |
| Args: | |
| job_id: Job identifier | |
| Returns: | |
| Result data or None | |
| """ | |
| status = self.get_job_status(job_id) | |
| if status == "DONE": | |
| return self.redis_service.get_task_result(job_id) | |
| return None | |
| def _process_task(self, task: Dict[str, Any]) -> bool: | |
| """ | |
| Process a single task. | |
| Args: | |
| task: Task payload from queue | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| job_id = task["job_id"] | |
| task_data = task["data"] | |
| device_id = task_data.get("device_id") | |
| user_id = task_data.get("user_id") | |
| logger.info(f"Processing task {job_id}") | |
| try: | |
| # Update status to RUNNING | |
| self.redis_service.set_task_status(job_id, "RUNNING") | |
| # Process ABSA (placeholder - actual processing depends on data_processor) | |
| if self.data_processor: | |
| # Example: process CSV data | |
| csv_data = task_data["data"].get("csv_data") | |
| if csv_data: | |
| # Process with DataProcessor | |
| result = self.data_processor.process_data(csv_data) | |
| else: | |
| result = {"status": "error", "message": "No CSV data provided"} | |
| else: | |
| # Simulate processing | |
| time.sleep(2) | |
| result = { | |
| "status": "success", | |
| "message": "Processing completed", | |
| "processed_at": datetime.now(timezone.utc).isoformat() | |
| } | |
| # Store result | |
| self.redis_service.set_task_result(job_id, result) | |
| # Update status to DONE | |
| self.redis_service.set_task_status(job_id, "DONE") | |
| # Log TASK_COMPLETED event | |
| self.mongodb_service.log_event( | |
| event_type="TASK_COMPLETED", | |
| device_id=device_id, | |
| user_id=user_id, | |
| metadata={"job_id": job_id, "success": True} | |
| ) | |
| logger.info(f"Task {job_id} completed successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Task {job_id} failed: {str(e)}") | |
| # Update status to FAILED | |
| self.redis_service.set_task_status(job_id, "FAILED") | |
| # Store error result | |
| error_result = { | |
| "status": "error", | |
| "message": str(e), | |
| "failed_at": datetime.now(timezone.utc).isoformat() | |
| } | |
| self.redis_service.set_task_result(job_id, error_result) | |
| # Log TASK_COMPLETED event with failure | |
| self.mongodb_service.log_event( | |
| event_type="TASK_COMPLETED", | |
| device_id=device_id, | |
| user_id=user_id, | |
| metadata={"job_id": job_id, "success": False, "error": str(e)} | |
| ) | |
| return False | |
| def _worker_loop(self): | |
| """Worker loop to process tasks from queue.""" | |
| logger.info("Task worker started") | |
| while not self._stop_worker.is_set(): | |
| try: | |
| # Get next task (blocking with 1 second timeout) | |
| task = self.redis_service.dequeue_task(timeout=1) | |
| if task: | |
| self._process_task(task) | |
| except Exception as e: | |
| logger.error(f"Worker error: {str(e)}") | |
| time.sleep(1) | |
| logger.info("Task worker stopped") | |
| def start_worker(self): | |
| """Start background worker thread.""" | |
| if self._worker_thread and self._worker_thread.is_alive(): | |
| logger.warning("Worker already running") | |
| return | |
| self._stop_worker.clear() | |
| self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True) | |
| self._worker_thread.start() | |
| logger.info("Task worker thread started") | |
| def stop_worker(self): | |
| """Stop background worker thread.""" | |
| if self._worker_thread and self._worker_thread.is_alive(): | |
| logger.info("Stopping task worker...") | |
| self._stop_worker.set() | |
| self._worker_thread.join(timeout=5) | |
| logger.info("Task worker stopped") | |
| # Global task queue instance | |
| _task_queue = None | |
| def get_task_queue(data_processor=None) -> TaskQueue: | |
| """Get task queue singleton.""" | |
| global _task_queue | |
| if _task_queue is None: | |
| _task_queue = TaskQueue(data_processor) | |
| return _task_queue | |