# celery_app.py from celery import Celery import redis from urllib.parse import urlparse from config import get_settings settings = get_settings() def parse_redis_host(raw: str, default_port: int) -> tuple[str, int]: """ Accept any of these formats for REDIS_HOST: - "my-host.upstash.io" - "my-host.upstash.io:6379" - "https://my-host.upstash.io" - "rediss://my-host.upstash.io:6379" Always returns (hostname, port). """ raw = raw.strip() # If there's a scheme, use urlparse if "://" in raw: parsed = urlparse(raw) host = parsed.hostname or raw port = parsed.port or default_port elif ":" in raw: host, port_str = raw.rsplit(":", 1) port = int(port_str) else: host = raw port = default_port return host, port REDIS_HOST, REDIS_PORT = parse_redis_host(settings.REDIS_HOST, settings.REDIS_PORT) REDIS_PASSWORD = settings.REDIS_PASSWORD if REDIS_PASSWORD: BROKER_URL = f"rediss://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0?ssl_cert_reqs=CERT_NONE" BACKEND_URL = f"rediss://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0?ssl_cert_reqs=CERT_NONE" else: BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/0" BACKEND_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/0" celery_app = Celery( "assistant_worker", broker=BROKER_URL, backend=BACKEND_URL, include=["generation.ExamAnswer"] ) celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", task_track_started=True, task_time_limit=60 * 60, broker_use_ssl={"ssl_cert_reqs": "CERT_NONE"} if REDIS_PASSWORD else None, redis_backend_use_ssl={"ssl_cert_reqs": "CERT_NONE"} if REDIS_PASSWORD else None, ) import worker.tasks from generation.ExamAnswer import grade_exam_task def clear_redis_backend(): if REDIS_PASSWORD: r = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, ssl=True, ssl_cert_reqs=None, db=0, ) else: r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0) r.flushdb() print("Redis result backend cleared!") @celery_app.on_after_configure.connect def setup(sender, **kwargs): clear_redis_backend()