| """ |
| Celery worker configuration for AegisLM SaaS Backend. |
| |
| Production-ready Celery setup with Redis broker, |
| task monitoring, and error handling. |
| """ |
|
|
| import os |
| from celery import Celery |
| from celery.signals import task_prerun, task_postrun, task_failure |
| from datetime import datetime |
|
|
| from core.config import settings |
|
|
|
|
| |
| celery_app = Celery( |
| "aegislm_worker", |
| broker=settings.REDIS_URL, |
| backend=settings.REDIS_URL, |
| include=["tasks.evaluation_task"] |
| ) |
|
|
| |
| celery_app.conf.update( |
| |
| task_serializer="json", |
| accept_content=["json"], |
| result_serializer="json", |
| timezone="UTC", |
| enable_utc=True, |
| |
| |
| worker_prefetch_multiplier=1, |
| task_acks_late=True, |
| worker_max_tasks_per_child=50, |
| worker_concurrency=getattr(settings, 'CELERY_WORKER_CONCURRENCY', 2), |
| |
| |
| task_default_rate_limit=getattr(settings, 'CELERY_TASK_RATE_LIMIT', '10/m'), |
| |
| |
| result_expires=3600, |
| result_backend_transport_options={ |
| "master_name": "mymaster", |
| }, |
| |
| |
| task_routes={ |
| "tasks.evaluation_task.run_evaluation_task": {"queue": "evaluation"}, |
| "tasks.evaluation_task.run_benchmark_task": {"queue": "benchmark"}, |
| }, |
| |
| |
| task_default_queue="default", |
| task_queues={ |
| "default": { |
| "exchange": "default", |
| "routing_key": "default", |
| }, |
| "evaluation": { |
| "exchange": "evaluation", |
| "routing_key": "evaluation", |
| }, |
| "benchmark": { |
| "exchange": "benchmark", |
| "routing_key": "benchmark", |
| }, |
| }, |
| |
| |
| worker_send_task_events=True, |
| task_send_sent_event=True, |
| |
| |
| task_reject_on_worker_lost=True, |
| task_ignore_result=False, |
| task_default_retry_delay=60, |
| task_max_retries=3, |
| task_retry_backoff=True, |
| task_retry_backoff_max=300, |
| |
| |
| beat_schedule={ |
| "cleanup-expired-results": { |
| "task": "tasks.evaluation_task.cleanup_expired_results", |
| "schedule": 3600.0, |
| }, |
| }, |
| ) |
|
|
|
|
| |
| @task_prerun.connect |
| def task_prerun_handler(task_id, task, args, kwargs, **extras): |
| """ |
| Handle task pre-run signal. |
| |
| Args: |
| task_id: Task ID |
| task: Task object |
| args: Task arguments |
| kwargs: Task keyword arguments |
| **extras: Extra arguments |
| """ |
| print(f"Task {task.name}[{task_id}] started at {datetime.utcnow()}") |
| |
| |
| if task.name == "tasks.evaluation_task.run_evaluation_task": |
| |
| pass |
|
|
|
|
| @task_postrun.connect |
| def task_postrun_handler(task_id, task, args, kwargs, retval, state, **extras): |
| """ |
| Handle task post-run signal. |
| |
| Args: |
| task_id: Task ID |
| task: Task object |
| args: Task arguments |
| kwargs: Task keyword arguments |
| retval: Return value |
| state: Task state |
| **extras: Extra arguments |
| """ |
| print(f"Task {task.name}[{task_id}] completed with state {state} at {datetime.utcnow()}") |
|
|
|
|
| @task_failure.connect |
| def task_failure_handler(task_id, error, traceback, einfo, **kwargs): |
| """ |
| Handle task failure signal. |
| |
| Args: |
| task_id: Task ID |
| error: Error object |
| traceback: Traceback |
| einfo: Exception info |
| **kwargs: Extra arguments |
| """ |
| print(f"Task {task_id} failed: {error}") |
| |
| |
| |
|
|
|
|
| |
| @celery_app.on_after_configure.connect |
| def setup_periodic_tasks(sender, **kwargs): |
| """ |
| Setup periodic tasks after Celery configuration. |
| |
| Args: |
| sender: Celery app |
| **kwargs: Extra arguments |
| """ |
| |
| pass |
|
|
|
|
| |
| @celery_app.task(bind=True, name="tasks.health_check") |
| def health_check(self): |
| """ |
| Simple health check task. |
| |
| Returns: |
| dict: Health status |
| """ |
| return { |
| "status": "healthy", |
| "timestamp": datetime.utcnow().isoformat(), |
| "worker_id": self.request.id |
| } |
|
|
|
|
| |
| @celery_app.task(bind=True, name="tasks.worker_info") |
| def worker_info(self): |
| """ |
| Get worker information. |
| |
| Returns: |
| dict: Worker information |
| """ |
| return { |
| "worker_id": self.request.id, |
| "hostname": self.request.hostname, |
| "timestamp": datetime.utcnow().isoformat(), |
| "active_tasks": len(self.request.tasks), |
| } |
|
|
|
|
| |
| @celery_app.task(name="tasks.cleanup_expired_results") |
| def cleanup_expired_results(): |
| """ |
| Cleanup expired evaluation results. |
| |
| Returns: |
| dict: Cleanup results |
| """ |
| |
| return { |
| "status": "completed", |
| "timestamp": datetime.utcnow().isoformat(), |
| "cleaned_items": 0 |
| } |
|
|
|
|
| |
| class EvaluationTask(celery_app.Task): |
| """ |
| Custom task base class for evaluation tasks. |
| |
| Provides common functionality for evaluation-related tasks. |
| """ |
| |
| def on_success(self, retval, task_id, args, kwargs): |
| """ |
| Handle task success. |
| |
| Args: |
| retval: Return value |
| task_id: Task ID |
| args: Task arguments |
| kwargs: Task keyword arguments |
| """ |
| super().on_success(retval, task_id, args, kwargs) |
| |
| |
| if self.name == "tasks.evaluation_task.run_evaluation_task": |
| |
| pass |
| |
| def on_failure(self, exc, task_id, args, kwargs, einfo): |
| """ |
| Handle task failure. |
| |
| Args: |
| exc: Exception |
| task_id: Task ID |
| args: Task arguments |
| kwargs: Task keyword arguments |
| einfo: Exception info |
| """ |
| super().on_failure(exc, task_id, args, kwargs, einfo) |
| |
| |
| if self.name == "tasks.evaluation_task.run_evaluation_task": |
| |
| pass |
| |
| def on_retry(self, exc, task_id, args, kwargs, einfo): |
| """ |
| Handle task retry. |
| |
| Args: |
| exc: Exception |
| task_id: Task ID |
| args: Task arguments |
| kwargs: Task keyword arguments |
| einfo: Exception info |
| """ |
| super().on_retry(exc, task_id, args, kwargs, einfo) |
| |
| print(f"Task {task_id} retrying due to: {exc}") |
|
|
|
|
| |
| celery_app.Task = EvaluationTask |
|
|
|
|
| |
| if __name__ == "__main__": |
| celery_app.start() |
|
|