import os import json import time import logging import signal import sys from typing import Dict, Any # Add project root to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from services.queue_manager import queue_manager from services.evaluator import get_evaluator logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("worker") class TaskWorker: """Background task worker for evaluating dialogues""" def __init__(self, queue_name: str = "evaluation_tasks"): self.queue_name = queue_name self.running = False self.evaluator = None def start(self): """Start the worker loop""" logger.info(f"Worker starting, listening on '{self.queue_name}'...") self.running = True # Preload model try: logger.info("Preloading models...") self.evaluator = get_evaluator() # Force init self.evaluator._lazy_init() logger.info("Models loaded successfully.") except Exception as e: logger.error(f"Failed to load models: {e}") return while self.running: try: # Dequeue task task_json = queue_manager.dequeue(self.queue_name) if task_json: self.process_task(task_json) else: # Sleep briefly if queue is empty time.sleep(0.1) except KeyboardInterrupt: self.stop() except Exception as e: logger.error(f"Worker loop error: {e}") time.sleep(1) def process_task(self, task_json: str): """Process a single task""" task_id = None try: task = json.loads(task_json) task_id = task.get("task_id") if not task_id: logger.error("Received task without task_id") return logger.info(f"Processing task {task_id}...") # Update status to processing self._update_task_status(task_id, "processing") # Extract data dialogue = task.get("dialogue", []) dialogue_id = task.get("dialogue_id") # Run evaluation start_time = time.time() # Convert list of dicts if necessary (evaluator expects list of dicts) result = self.evaluator.evaluate(dialogue, dialogue_id) duration = time.time() - start_time # Save result self._save_result(task_id, result.model_dump(), duration) logger.info(f"Task {task_id} completed in {duration:.2f}s") except Exception as e: logger.error(f"Task processing failed: {e}") if task_id: self._update_task_status(task_id, "failed", error=str(e)) def _update_task_status(self, task_id: str, status: str, error: str = None): """Update task status""" # For Redis backend if queue_manager.backend == "redis" and queue_manager.client: key = f"task:{task_id}" try: existing = queue_manager.client.get(key) data = {} if existing: data = json.loads(existing) data["status"] = status data["updated_at"] = time.time() if error: data["error"] = error queue_manager.client.setex(key, 86400, json.dumps(data)) # 24h expire except Exception as e: logger.error(f"Failed to update status in Redis: {e}") # For Memory backend (limited support, mostly for testing within same process) # If worker is in separate process, memory backend won't work. pass def _save_result(self, task_id: str, result: Dict, duration: float): """Save final result""" if queue_manager.backend == "redis" and queue_manager.client: key = f"task:{task_id}" try: data = { "status": "completed", "result": result, "duration": duration, "completed_at": time.time() } queue_manager.client.setex(key, 86400, json.dumps(data)) except Exception as e: logger.error(f"Failed to save result to Redis: {e}") def stop(self): self.running = False logger.info("Worker stopping...") if __name__ == "__main__": worker = TaskWorker() def signal_handler(sig, frame): worker.stop() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) worker.start()