""" backend/workers/celeryconfig.py Creates and configures the Celery app for the project. Usage (development): # start a worker: celery -A backend.workers.celeryconfig worker --loglevel=info # start beat: celery -A backend.workers.celeryconfig beat --loglevel=info Notes: - Requires CELERY_BROKER_URL (no default broker is assumed) - Tasks autodiscover from 'backend.workers' package - This file also loads schedule from scheduler.py (SCHEDULE mapping) """ import os from celery import Celery from celery.schedules import crontab import logging # Basic logging logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) logger = logging.getLogger("celeryconfig") BROKER_URL = os.getenv("CELERY_BROKER_URL") if not BROKER_URL: raise RuntimeError( "CELERY_BROKER_URL is not set. Configure a broker such as amqp://, redis://, or sqs:// before starting workers." ) RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", BROKER_URL) if not RESULT_BACKEND: raise RuntimeError( "CELERY_RESULT_BACKEND is not set. Provide a backend URL or reuse CELERY_BROKER_URL." ) celery_app = Celery( "integrachat_workers", broker=BROKER_URL, backend=RESULT_BACKEND, ) # Recommended worker options celery_app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], task_acks_late=True, worker_prefetch_multiplier=1, worker_max_tasks_per_child=100, broker_pool_limit=10, timezone="UTC", enable_utc=True, ) # Auto-discover tasks in the workers package celery_app.autodiscover_tasks(["backend.workers"]) # Load schedule from scheduler.SCHEDULE and convert crontab-like entries try: from backend.workers.scheduler import SCHEDULE as CUSTOM_SCHEDULE beat_schedule = {} for name, cfg in CUSTOM_SCHEDULE.items(): task_name = cfg["task"] schedule_cfg = cfg["schedule"] args = cfg.get("args", ()) # Determine schedule type if isinstance(schedule_cfg, dict) and schedule_cfg.get("type") == "crontab": hour = schedule_cfg.get("hour", 0) minute = schedule_cfg.get("minute", 0) beat_schedule[name] = {"task": task_name, "schedule": crontab(minute=minute, hour=hour), "args": args} else: # fallback: expect a timedelta or seconds (for quick dev) beat_schedule[name] = {"task": task_name, "schedule": schedule_cfg, "args": args} celery_app.conf.beat_schedule = beat_schedule logger.info("Loaded Celery beat schedule with %d jobs", len(beat_schedule)) except Exception as e: logger.exception("Failed to load scheduler.SCHEDULE: %s", e) # Export celery_app symbol for import by tasks __all__ = ["celery_app"]