Spaces:
Runtime error
Runtime error
| """Celery configuration for task processing.""" | |
| import logging | |
| from celery import Celery | |
| from celery.signals import task_failure, task_success, worker_ready, worker_shutdown | |
| from app.core.config import settings | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| # Celery configuration | |
| celery_app = Celery( | |
| "dataset_impacts", | |
| broker=settings.REDIS_URL, | |
| backend=settings.REDIS_URL, | |
| ) | |
| # Configure Celery settings | |
| celery_app.conf.update( | |
| task_serializer="json", | |
| accept_content=["json"], | |
| result_serializer="json", | |
| timezone="UTC", | |
| enable_utc=True, | |
| worker_concurrency=settings.WORKER_CONCURRENCY, | |
| task_acks_late=True, # Tasks are acknowledged after execution | |
| task_reject_on_worker_lost=True, # Tasks are rejected if worker is terminated during execution | |
| task_time_limit=3600, # 1 hour timeout per task | |
| task_soft_time_limit=3000, # Soft timeout (30 minutes) - allows for graceful shutdown | |
| worker_prefetch_multiplier=1, # Single prefetch - improves fair distribution of tasks | |
| broker_connection_retry=True, | |
| broker_connection_retry_on_startup=True, | |
| broker_connection_max_retries=10, | |
| broker_pool_limit=10, # Connection pool size | |
| result_expires=60 * 60 * 24, # Results expire after 24 hours | |
| task_track_started=True, # Track when tasks are started | |
| ) | |
| # Set up task routes for different task types | |
| celery_app.conf.task_routes = { | |
| "app.tasks.dataset_tasks.*": {"queue": "dataset_impacts"}, | |
| "app.tasks.maintenance.*": {"queue": "maintenance"}, | |
| } | |
| # Configure retry settings | |
| celery_app.conf.task_default_retry_delay = 30 # 30 seconds | |
| celery_app.conf.task_max_retries = 3 | |
| # Setup beat schedule for periodic tasks if enabled | |
| celery_app.conf.beat_schedule = { | |
| "cleanup-stale-tasks": { | |
| "task": "app.tasks.maintenance.cleanup_stale_tasks", | |
| "schedule": 3600.0, # Run every hour | |
| }, | |
| "health-check": { | |
| "task": "app.tasks.maintenance.health_check", | |
| "schedule": 300.0, # Run every 5 minutes | |
| }, | |
| "refresh-hf-datasets-cache": { | |
| "task": "app.tasks.dataset_tasks.refresh_hf_datasets_cache", | |
| "schedule": 3600.0, # Run every hour | |
| }, | |
| } | |
| # Signal handlers for monitoring and logging | |
| def task_failure_handler(sender=None, task_id=None, exception=None, **kwargs): | |
| """Log failed tasks.""" | |
| logger.error(f"Task {task_id} {sender.name} failed: {exception}") | |
| def task_success_handler(sender=None, result=None, **kwargs): | |
| """Log successful tasks.""" | |
| task_name = sender.name if sender else "Unknown" | |
| logger.info(f"Task {task_name} completed successfully") | |
| def worker_ready_handler(**kwargs): | |
| """Log when worker is ready.""" | |
| logger.info(f"Celery worker ready: {kwargs.get('hostname')}") | |
| def worker_shutdown_handler(**kwargs): | |
| """Log when worker is shutting down.""" | |
| logger.info(f"Celery worker shutting down: {kwargs.get('hostname')}") | |
| def get_celery_app(): | |
| """Get the Celery app instance.""" | |
| # Import all tasks to ensure they're registered | |
| try: | |
| # Using the improved app.tasks module which properly imports all tasks | |
| import app.tasks | |
| logger.info(f"Tasks successfully imported; registered {len(celery_app.tasks)} tasks") | |
| except ImportError as e: | |
| logger.error(f"Error importing tasks: {e}") | |
| return celery_app |