Spaces:
Sleeping
Sleeping
| """ | |
| backend/workers/celeryconfig.py | |
| Creates and configures the Celery app for the project. | |
| Usage (development): | |
| # start a worker: | |
| celery -A backend.workers.celeryconfig worker --loglevel=info | |
| # start beat: | |
| celery -A backend.workers.celeryconfig beat --loglevel=info | |
| Notes: | |
| - Requires CELERY_BROKER_URL (no default broker is assumed) | |
| - Tasks autodiscover from 'backend.workers' package | |
| - This file also loads schedule from scheduler.py (SCHEDULE mapping) | |
| """ | |
| import os | |
| from celery import Celery | |
| from celery.schedules import crontab | |
| import logging | |
| # Basic logging | |
| logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) | |
| logger = logging.getLogger("celeryconfig") | |
| BROKER_URL = os.getenv("CELERY_BROKER_URL") | |
| if not BROKER_URL: | |
| raise RuntimeError( | |
| "CELERY_BROKER_URL is not set. Configure a broker such as amqp://, redis://, or sqs:// before starting workers." | |
| ) | |
| RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", BROKER_URL) | |
| if not RESULT_BACKEND: | |
| raise RuntimeError( | |
| "CELERY_RESULT_BACKEND is not set. Provide a backend URL or reuse CELERY_BROKER_URL." | |
| ) | |
| celery_app = Celery( | |
| "integrachat_workers", | |
| broker=BROKER_URL, | |
| backend=RESULT_BACKEND, | |
| ) | |
| # Recommended worker options | |
| celery_app.conf.update( | |
| task_serializer="json", | |
| result_serializer="json", | |
| accept_content=["json"], | |
| task_acks_late=True, | |
| worker_prefetch_multiplier=1, | |
| worker_max_tasks_per_child=100, | |
| broker_pool_limit=10, | |
| timezone="UTC", | |
| enable_utc=True, | |
| ) | |
| # Auto-discover tasks in the workers package | |
| celery_app.autodiscover_tasks(["backend.workers"]) | |
| # Load schedule from scheduler.SCHEDULE and convert crontab-like entries | |
| try: | |
| from backend.workers.scheduler import SCHEDULE as CUSTOM_SCHEDULE | |
| beat_schedule = {} | |
| for name, cfg in CUSTOM_SCHEDULE.items(): | |
| task_name = cfg["task"] | |
| schedule_cfg = cfg["schedule"] | |
| args = cfg.get("args", ()) | |
| # Determine schedule type | |
| if isinstance(schedule_cfg, dict) and schedule_cfg.get("type") == "crontab": | |
| hour = schedule_cfg.get("hour", 0) | |
| minute = schedule_cfg.get("minute", 0) | |
| beat_schedule[name] = {"task": task_name, "schedule": crontab(minute=minute, hour=hour), "args": args} | |
| else: | |
| # fallback: expect a timedelta or seconds (for quick dev) | |
| beat_schedule[name] = {"task": task_name, "schedule": schedule_cfg, "args": args} | |
| celery_app.conf.beat_schedule = beat_schedule | |
| logger.info("Loaded Celery beat schedule with %d jobs", len(beat_schedule)) | |
| except Exception as e: | |
| logger.exception("Failed to load scheduler.SCHEDULE: %s", e) | |
| # Export celery_app symbol for import by tasks | |
| __all__ = ["celery_app"] | |