Spaces:
Sleeping
Sleeping
File size: 2,744 Bytes
aa63765 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
"""
backend/workers/celeryconfig.py
Creates and configures the Celery app for the project.
Usage (development):
# start a worker:
celery -A backend.workers.celeryconfig worker --loglevel=info
# start beat:
celery -A backend.workers.celeryconfig beat --loglevel=info
Notes:
- Requires CELERY_BROKER_URL (no default broker is assumed)
- Tasks autodiscover from 'backend.workers' package
- This file also loads schedule from scheduler.py (SCHEDULE mapping)
"""
import os
from celery import Celery
from celery.schedules import crontab
import logging
# Basic logging
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
logger = logging.getLogger("celeryconfig")
BROKER_URL = os.getenv("CELERY_BROKER_URL")
if not BROKER_URL:
raise RuntimeError(
"CELERY_BROKER_URL is not set. Configure a broker such as amqp://, redis://, or sqs:// before starting workers."
)
RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", BROKER_URL)
if not RESULT_BACKEND:
raise RuntimeError(
"CELERY_RESULT_BACKEND is not set. Provide a backend URL or reuse CELERY_BROKER_URL."
)
celery_app = Celery(
"integrachat_workers",
broker=BROKER_URL,
backend=RESULT_BACKEND,
)
# Recommended worker options
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_acks_late=True,
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=100,
broker_pool_limit=10,
timezone="UTC",
enable_utc=True,
)
# Auto-discover tasks in the workers package
celery_app.autodiscover_tasks(["backend.workers"])
# Load schedule from scheduler.SCHEDULE and convert crontab-like entries
try:
from backend.workers.scheduler import SCHEDULE as CUSTOM_SCHEDULE
beat_schedule = {}
for name, cfg in CUSTOM_SCHEDULE.items():
task_name = cfg["task"]
schedule_cfg = cfg["schedule"]
args = cfg.get("args", ())
# Determine schedule type
if isinstance(schedule_cfg, dict) and schedule_cfg.get("type") == "crontab":
hour = schedule_cfg.get("hour", 0)
minute = schedule_cfg.get("minute", 0)
beat_schedule[name] = {"task": task_name, "schedule": crontab(minute=minute, hour=hour), "args": args}
else:
# fallback: expect a timedelta or seconds (for quick dev)
beat_schedule[name] = {"task": task_name, "schedule": schedule_cfg, "args": args}
celery_app.conf.beat_schedule = beat_schedule
logger.info("Loaded Celery beat schedule with %d jobs", len(beat_schedule))
except Exception as e:
logger.exception("Failed to load scheduler.SCHEDULE: %s", e)
# Export celery_app symbol for import by tasks
__all__ = ["celery_app"]
|