""" Celery worker configuration for AegisLM SaaS Backend. Production-ready Celery setup with Redis broker, task monitoring, and error handling. """ import os from celery import Celery from celery.signals import task_prerun, task_postrun, task_failure from datetime import datetime from core.config import settings # Create Celery app celery_app = Celery( "aegislm_worker", broker=settings.REDIS_URL, backend=settings.REDIS_URL, include=["tasks.evaluation_task"] ) # Configure Celery celery_app.conf.update( # Task settings task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True, # Worker settings - Optimized for concurrency control worker_prefetch_multiplier=1, task_acks_late=True, worker_max_tasks_per_child=50, # Reduced for better memory management worker_concurrency=getattr(settings, 'CELERY_WORKER_CONCURRENCY', 2), # Limited concurrency # Rate limiting task_default_rate_limit=getattr(settings, 'CELERY_TASK_RATE_LIMIT', '10/m'), # 10 tasks per minute # Result settings result_expires=3600, # 1 hour result_backend_transport_options={ "master_name": "mymaster", }, # Routing task_routes={ "tasks.evaluation_task.run_evaluation_task": {"queue": "evaluation"}, "tasks.evaluation_task.run_benchmark_task": {"queue": "benchmark"}, }, # Queue settings task_default_queue="default", task_queues={ "default": { "exchange": "default", "routing_key": "default", }, "evaluation": { "exchange": "evaluation", "routing_key": "evaluation", }, "benchmark": { "exchange": "benchmark", "routing_key": "benchmark", }, }, # Monitoring worker_send_task_events=True, task_send_sent_event=True, # Error handling and retries task_reject_on_worker_lost=True, task_ignore_result=False, task_default_retry_delay=60, # 1 minute task_max_retries=3, task_retry_backoff=True, task_retry_backoff_max=300, # 5 minutes max # Beat scheduler (if needed) beat_schedule={ "cleanup-expired-results": { "task": "tasks.evaluation_task.cleanup_expired_results", "schedule": 3600.0, # Every hour }, }, ) # Task monitoring signals @task_prerun.connect def task_prerun_handler(task_id, task, args, kwargs, **extras): """ Handle task pre-run signal. Args: task_id: Task ID task: Task object args: Task arguments kwargs: Task keyword arguments **extras: Extra arguments """ print(f"Task {task.name}[{task_id}] started at {datetime.utcnow()}") # Update evaluation status to running if task.name == "tasks.evaluation_task.run_evaluation_task": # This would update the database status pass @task_postrun.connect def task_postrun_handler(task_id, task, args, kwargs, retval, state, **extras): """ Handle task post-run signal. Args: task_id: Task ID task: Task object args: Task arguments kwargs: Task keyword arguments retval: Return value state: Task state **extras: Extra arguments """ print(f"Task {task.name}[{task_id}] completed with state {state} at {datetime.utcnow()}") @task_failure.connect def task_failure_handler(task_id, error, traceback, einfo, **kwargs): """ Handle task failure signal. Args: task_id: Task ID error: Error object traceback: Traceback einfo: Exception info **kwargs: Extra arguments """ print(f"Task {task_id} failed: {error}") # Update evaluation status to failed # This would update the database status # Worker initialization @celery_app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): """ Setup periodic tasks after Celery configuration. Args: sender: Celery app **kwargs: Extra arguments """ # Add periodic tasks here if needed pass # Health check task @celery_app.task(bind=True, name="tasks.health_check") def health_check(self): """ Simple health check task. Returns: dict: Health status """ return { "status": "healthy", "timestamp": datetime.utcnow().isoformat(), "worker_id": self.request.id } # Worker info task @celery_app.task(bind=True, name="tasks.worker_info") def worker_info(self): """ Get worker information. Returns: dict: Worker information """ return { "worker_id": self.request.id, "hostname": self.request.hostname, "timestamp": datetime.utcnow().isoformat(), "active_tasks": len(self.request.tasks), } # Task to cleanup expired results @celery_app.task(name="tasks.cleanup_expired_results") def cleanup_expired_results(): """ Cleanup expired evaluation results. Returns: dict: Cleanup results """ # This would implement cleanup logic return { "status": "completed", "timestamp": datetime.utcnow().isoformat(), "cleaned_items": 0 } # Custom task base class for evaluation tasks class EvaluationTask(celery_app.Task): """ Custom task base class for evaluation tasks. Provides common functionality for evaluation-related tasks. """ def on_success(self, retval, task_id, args, kwargs): """ Handle task success. Args: retval: Return value task_id: Task ID args: Task arguments kwargs: Task keyword arguments """ super().on_success(retval, task_id, args, kwargs) # Update evaluation status to completed if self.name == "tasks.evaluation_task.run_evaluation_task": # This would update the database status pass def on_failure(self, exc, task_id, args, kwargs, einfo): """ Handle task failure. Args: exc: Exception task_id: Task ID args: Task arguments kwargs: Task keyword arguments einfo: Exception info """ super().on_failure(exc, task_id, args, kwargs, einfo) # Update evaluation status to failed if self.name == "tasks.evaluation_task.run_evaluation_task": # This would update the database status pass def on_retry(self, exc, task_id, args, kwargs, einfo): """ Handle task retry. Args: exc: Exception task_id: Task ID args: Task arguments kwargs: Task keyword arguments einfo: Exception info """ super().on_retry(exc, task_id, args, kwargs, einfo) print(f"Task {task_id} retrying due to: {exc}") # Register custom task base celery_app.Task = EvaluationTask # Worker startup if __name__ == "__main__": celery_app.start()