EXAM_RAG_API / celery_app.py
MinaNasser's picture
11th
4f96034
# celery_app.py
from celery import Celery
import redis
from urllib.parse import urlparse
from config import get_settings
settings = get_settings()
def parse_redis_host(raw: str, default_port: int) -> tuple[str, int]:
"""
Accept any of these formats for REDIS_HOST:
- "my-host.upstash.io"
- "my-host.upstash.io:6379"
- "https://my-host.upstash.io"
- "rediss://my-host.upstash.io:6379"
Always returns (hostname, port).
"""
raw = raw.strip()
# If there's a scheme, use urlparse
if "://" in raw:
parsed = urlparse(raw)
host = parsed.hostname or raw
port = parsed.port or default_port
elif ":" in raw:
host, port_str = raw.rsplit(":", 1)
port = int(port_str)
else:
host = raw
port = default_port
return host, port
REDIS_HOST, REDIS_PORT = parse_redis_host(settings.REDIS_HOST, settings.REDIS_PORT)
REDIS_PASSWORD = settings.REDIS_PASSWORD
if REDIS_PASSWORD:
BROKER_URL = f"rediss://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0?ssl_cert_reqs=CERT_NONE"
BACKEND_URL = f"rediss://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0?ssl_cert_reqs=CERT_NONE"
else:
BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/0"
BACKEND_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/0"
celery_app = Celery(
"assistant_worker",
broker=BROKER_URL,
backend=BACKEND_URL,
include=["generation.ExamAnswer"]
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
task_track_started=True,
task_time_limit=60 * 60,
broker_use_ssl={"ssl_cert_reqs": "CERT_NONE"} if REDIS_PASSWORD else None,
redis_backend_use_ssl={"ssl_cert_reqs": "CERT_NONE"} if REDIS_PASSWORD else None,
)
import worker.tasks
from generation.ExamAnswer import grade_exam_task
def clear_redis_backend():
if REDIS_PASSWORD:
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
ssl=True,
ssl_cert_reqs=None,
db=0,
)
else:
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)
r.flushdb()
print("Redis result backend cleared!")
@celery_app.on_after_configure.connect
def setup(sender, **kwargs):
clear_redis_backend()