KarenYYH
Initial commit - HR Evaluation API v2
c8b1f17
import os
import json
import time
import logging
import signal
import sys
from typing import Dict, Any
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from services.queue_manager import queue_manager
from services.evaluator import get_evaluator
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("worker")
class TaskWorker:
"""Background task worker for evaluating dialogues"""
def __init__(self, queue_name: str = "evaluation_tasks"):
self.queue_name = queue_name
self.running = False
self.evaluator = None
def start(self):
"""Start the worker loop"""
logger.info(f"Worker starting, listening on '{self.queue_name}'...")
self.running = True
# Preload model
try:
logger.info("Preloading models...")
self.evaluator = get_evaluator()
# Force init
self.evaluator._lazy_init()
logger.info("Models loaded successfully.")
except Exception as e:
logger.error(f"Failed to load models: {e}")
return
while self.running:
try:
# Dequeue task
task_json = queue_manager.dequeue(self.queue_name)
if task_json:
self.process_task(task_json)
else:
# Sleep briefly if queue is empty
time.sleep(0.1)
except KeyboardInterrupt:
self.stop()
except Exception as e:
logger.error(f"Worker loop error: {e}")
time.sleep(1)
def process_task(self, task_json: str):
"""Process a single task"""
task_id = None
try:
task = json.loads(task_json)
task_id = task.get("task_id")
if not task_id:
logger.error("Received task without task_id")
return
logger.info(f"Processing task {task_id}...")
# Update status to processing
self._update_task_status(task_id, "processing")
# Extract data
dialogue = task.get("dialogue", [])
dialogue_id = task.get("dialogue_id")
# Run evaluation
start_time = time.time()
# Convert list of dicts if necessary (evaluator expects list of dicts)
result = self.evaluator.evaluate(dialogue, dialogue_id)
duration = time.time() - start_time
# Save result
self._save_result(task_id, result.model_dump(), duration)
logger.info(f"Task {task_id} completed in {duration:.2f}s")
except Exception as e:
logger.error(f"Task processing failed: {e}")
if task_id:
self._update_task_status(task_id, "failed", error=str(e))
def _update_task_status(self, task_id: str, status: str, error: str = None):
"""Update task status"""
# For Redis backend
if queue_manager.backend == "redis" and queue_manager.client:
key = f"task:{task_id}"
try:
existing = queue_manager.client.get(key)
data = {}
if existing:
data = json.loads(existing)
data["status"] = status
data["updated_at"] = time.time()
if error:
data["error"] = error
queue_manager.client.setex(key, 86400, json.dumps(data)) # 24h expire
except Exception as e:
logger.error(f"Failed to update status in Redis: {e}")
# For Memory backend (limited support, mostly for testing within same process)
# If worker is in separate process, memory backend won't work.
pass
def _save_result(self, task_id: str, result: Dict, duration: float):
"""Save final result"""
if queue_manager.backend == "redis" and queue_manager.client:
key = f"task:{task_id}"
try:
data = {
"status": "completed",
"result": result,
"duration": duration,
"completed_at": time.time()
}
queue_manager.client.setex(key, 86400, json.dumps(data))
except Exception as e:
logger.error(f"Failed to save result to Redis: {e}")
def stop(self):
self.running = False
logger.info("Worker stopping...")
if __name__ == "__main__":
worker = TaskWorker()
def signal_handler(sig, frame):
worker.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
worker.start()