Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import logging | |
| import signal | |
| import sys | |
| from typing import Dict, Any | |
| # Add project root to path | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from services.queue_manager import queue_manager | |
| from services.evaluator import get_evaluator | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger("worker") | |
| class TaskWorker: | |
| """Background task worker for evaluating dialogues""" | |
| def __init__(self, queue_name: str = "evaluation_tasks"): | |
| self.queue_name = queue_name | |
| self.running = False | |
| self.evaluator = None | |
| def start(self): | |
| """Start the worker loop""" | |
| logger.info(f"Worker starting, listening on '{self.queue_name}'...") | |
| self.running = True | |
| # Preload model | |
| try: | |
| logger.info("Preloading models...") | |
| self.evaluator = get_evaluator() | |
| # Force init | |
| self.evaluator._lazy_init() | |
| logger.info("Models loaded successfully.") | |
| except Exception as e: | |
| logger.error(f"Failed to load models: {e}") | |
| return | |
| while self.running: | |
| try: | |
| # Dequeue task | |
| task_json = queue_manager.dequeue(self.queue_name) | |
| if task_json: | |
| self.process_task(task_json) | |
| else: | |
| # Sleep briefly if queue is empty | |
| time.sleep(0.1) | |
| except KeyboardInterrupt: | |
| self.stop() | |
| except Exception as e: | |
| logger.error(f"Worker loop error: {e}") | |
| time.sleep(1) | |
| def process_task(self, task_json: str): | |
| """Process a single task""" | |
| task_id = None | |
| try: | |
| task = json.loads(task_json) | |
| task_id = task.get("task_id") | |
| if not task_id: | |
| logger.error("Received task without task_id") | |
| return | |
| logger.info(f"Processing task {task_id}...") | |
| # Update status to processing | |
| self._update_task_status(task_id, "processing") | |
| # Extract data | |
| dialogue = task.get("dialogue", []) | |
| dialogue_id = task.get("dialogue_id") | |
| # Run evaluation | |
| start_time = time.time() | |
| # Convert list of dicts if necessary (evaluator expects list of dicts) | |
| result = self.evaluator.evaluate(dialogue, dialogue_id) | |
| duration = time.time() - start_time | |
| # Save result | |
| self._save_result(task_id, result.model_dump(), duration) | |
| logger.info(f"Task {task_id} completed in {duration:.2f}s") | |
| except Exception as e: | |
| logger.error(f"Task processing failed: {e}") | |
| if task_id: | |
| self._update_task_status(task_id, "failed", error=str(e)) | |
| def _update_task_status(self, task_id: str, status: str, error: str = None): | |
| """Update task status""" | |
| # For Redis backend | |
| if queue_manager.backend == "redis" and queue_manager.client: | |
| key = f"task:{task_id}" | |
| try: | |
| existing = queue_manager.client.get(key) | |
| data = {} | |
| if existing: | |
| data = json.loads(existing) | |
| data["status"] = status | |
| data["updated_at"] = time.time() | |
| if error: | |
| data["error"] = error | |
| queue_manager.client.setex(key, 86400, json.dumps(data)) # 24h expire | |
| except Exception as e: | |
| logger.error(f"Failed to update status in Redis: {e}") | |
| # For Memory backend (limited support, mostly for testing within same process) | |
| # If worker is in separate process, memory backend won't work. | |
| pass | |
| def _save_result(self, task_id: str, result: Dict, duration: float): | |
| """Save final result""" | |
| if queue_manager.backend == "redis" and queue_manager.client: | |
| key = f"task:{task_id}" | |
| try: | |
| data = { | |
| "status": "completed", | |
| "result": result, | |
| "duration": duration, | |
| "completed_at": time.time() | |
| } | |
| queue_manager.client.setex(key, 86400, json.dumps(data)) | |
| except Exception as e: | |
| logger.error(f"Failed to save result to Redis: {e}") | |
| def stop(self): | |
| self.running = False | |
| logger.info("Worker stopping...") | |
| if __name__ == "__main__": | |
| worker = TaskWorker() | |
| def signal_handler(sig, frame): | |
| worker.stop() | |
| sys.exit(0) | |
| signal.signal(signal.SIGINT, signal_handler) | |
| signal.signal(signal.SIGTERM, signal_handler) | |
| worker.start() | |