ALM-2 / backend /workers /celery_worker.py
ACA050's picture
Upload 520 files
2ed8996 verified
"""
Celery worker configuration for AegisLM SaaS Backend.
Production-ready Celery setup with Redis broker,
task monitoring, and error handling.
"""
import os
from celery import Celery
from celery.signals import task_prerun, task_postrun, task_failure
from datetime import datetime
from core.config import settings
# Create Celery app
celery_app = Celery(
"aegislm_worker",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
include=["tasks.evaluation_task"]
)
# Configure Celery
celery_app.conf.update(
# Task settings
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
# Worker settings - Optimized for concurrency control
worker_prefetch_multiplier=1,
task_acks_late=True,
worker_max_tasks_per_child=50, # Reduced for better memory management
worker_concurrency=getattr(settings, 'CELERY_WORKER_CONCURRENCY', 2), # Limited concurrency
# Rate limiting
task_default_rate_limit=getattr(settings, 'CELERY_TASK_RATE_LIMIT', '10/m'), # 10 tasks per minute
# Result settings
result_expires=3600, # 1 hour
result_backend_transport_options={
"master_name": "mymaster",
},
# Routing
task_routes={
"tasks.evaluation_task.run_evaluation_task": {"queue": "evaluation"},
"tasks.evaluation_task.run_benchmark_task": {"queue": "benchmark"},
},
# Queue settings
task_default_queue="default",
task_queues={
"default": {
"exchange": "default",
"routing_key": "default",
},
"evaluation": {
"exchange": "evaluation",
"routing_key": "evaluation",
},
"benchmark": {
"exchange": "benchmark",
"routing_key": "benchmark",
},
},
# Monitoring
worker_send_task_events=True,
task_send_sent_event=True,
# Error handling and retries
task_reject_on_worker_lost=True,
task_ignore_result=False,
task_default_retry_delay=60, # 1 minute
task_max_retries=3,
task_retry_backoff=True,
task_retry_backoff_max=300, # 5 minutes max
# Beat scheduler (if needed)
beat_schedule={
"cleanup-expired-results": {
"task": "tasks.evaluation_task.cleanup_expired_results",
"schedule": 3600.0, # Every hour
},
},
)
# Task monitoring signals
@task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **extras):
"""
Handle task pre-run signal.
Args:
task_id: Task ID
task: Task object
args: Task arguments
kwargs: Task keyword arguments
**extras: Extra arguments
"""
print(f"Task {task.name}[{task_id}] started at {datetime.utcnow()}")
# Update evaluation status to running
if task.name == "tasks.evaluation_task.run_evaluation_task":
# This would update the database status
pass
@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **extras):
"""
Handle task post-run signal.
Args:
task_id: Task ID
task: Task object
args: Task arguments
kwargs: Task keyword arguments
retval: Return value
state: Task state
**extras: Extra arguments
"""
print(f"Task {task.name}[{task_id}] completed with state {state} at {datetime.utcnow()}")
@task_failure.connect
def task_failure_handler(task_id, error, traceback, einfo, **kwargs):
"""
Handle task failure signal.
Args:
task_id: Task ID
error: Error object
traceback: Traceback
einfo: Exception info
**kwargs: Extra arguments
"""
print(f"Task {task_id} failed: {error}")
# Update evaluation status to failed
# This would update the database status
# Worker initialization
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
"""
Setup periodic tasks after Celery configuration.
Args:
sender: Celery app
**kwargs: Extra arguments
"""
# Add periodic tasks here if needed
pass
# Health check task
@celery_app.task(bind=True, name="tasks.health_check")
def health_check(self):
"""
Simple health check task.
Returns:
dict: Health status
"""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"worker_id": self.request.id
}
# Worker info task
@celery_app.task(bind=True, name="tasks.worker_info")
def worker_info(self):
"""
Get worker information.
Returns:
dict: Worker information
"""
return {
"worker_id": self.request.id,
"hostname": self.request.hostname,
"timestamp": datetime.utcnow().isoformat(),
"active_tasks": len(self.request.tasks),
}
# Task to cleanup expired results
@celery_app.task(name="tasks.cleanup_expired_results")
def cleanup_expired_results():
"""
Cleanup expired evaluation results.
Returns:
dict: Cleanup results
"""
# This would implement cleanup logic
return {
"status": "completed",
"timestamp": datetime.utcnow().isoformat(),
"cleaned_items": 0
}
# Custom task base class for evaluation tasks
class EvaluationTask(celery_app.Task):
"""
Custom task base class for evaluation tasks.
Provides common functionality for evaluation-related tasks.
"""
def on_success(self, retval, task_id, args, kwargs):
"""
Handle task success.
Args:
retval: Return value
task_id: Task ID
args: Task arguments
kwargs: Task keyword arguments
"""
super().on_success(retval, task_id, args, kwargs)
# Update evaluation status to completed
if self.name == "tasks.evaluation_task.run_evaluation_task":
# This would update the database status
pass
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""
Handle task failure.
Args:
exc: Exception
task_id: Task ID
args: Task arguments
kwargs: Task keyword arguments
einfo: Exception info
"""
super().on_failure(exc, task_id, args, kwargs, einfo)
# Update evaluation status to failed
if self.name == "tasks.evaluation_task.run_evaluation_task":
# This would update the database status
pass
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""
Handle task retry.
Args:
exc: Exception
task_id: Task ID
args: Task arguments
kwargs: Task keyword arguments
einfo: Exception info
"""
super().on_retry(exc, task_id, args, kwargs, einfo)
print(f"Task {task_id} retrying due to: {exc}")
# Register custom task base
celery_app.Task = EvaluationTask
# Worker startup
if __name__ == "__main__":
celery_app.start()