EXAM_RAG_API / worker /tasks.py
MinaNasser's picture
1st
1bc3f18
import json
import os, shutil, tempfile , httpx
from celery_app import celery_app
from config import get_settings
from indexing.indexingController import IndexingController
from routes.schemas.Exam_Models import ExamResponse
from generation.ExamRagGenerator import ExamService
import logging
logger = logging.getLogger("Celery# ")
def send_to_webhook(payload: dict, webhook_url: str):
try:
logger.info(f"Sending webhook to {webhook_url}")
with httpx.Client(timeout=10) as client:
response = client.post(webhook_url, json=payload)
logger.info(f"Webhook status: {response.status_code}")
logger.info(f"Webhook response: {response.text}")
except Exception as e:
logger.error("Webhook failed: %s", e)
def _send_failure_callback(exam_id: str, task_id: str, error_message: str):
try:
httpx.post(
get_settings().CALLBACK_URL,
json={
"exam_id": exam_id,
"status": "failed",
"error": error_message,
"task_id": task_id,
},
timeout=15,
)
except Exception:
pass
@celery_app.task(bind=True,autoretry_for=(ConnectionError, TimeoutError),retry_backoff=5,retry_backoff_max=300,retry_jitter=True,retry_kwargs={"max_retries": 5},)
def generate_exam_task(self, request_dict: dict):
service = ExamService()
# Generate exam
exam_response: ExamResponse = service.exam_task(request_dict)
# Convert to dict for logging & webhook
exam_dict = exam_response.model_dump()
# Pretty-print full exam in Celery logs
logger.info("Full exam output:\n%s", json.dumps(exam_dict, indent=2, ensure_ascii=False))
# Send to webhook
webhook_url = get_settings().CALLBACK_URL
send_to_webhook(exam_dict, webhook_url)
return exam_dict
@celery_app.task(bind=True,autoretry_for=(ConnectionError,),retry_backoff=10,retry_kwargs={"max_retries": 3},)
def process_file_task(self,temp_path: str,original_filename: str,username: str | None,course: str | None,):
worker_tmp = tempfile.mkdtemp(prefix="celery_")
worker_file = os.path.join(worker_tmp, os.path.basename(temp_path))
try:
shutil.copy2(temp_path, worker_file)
if not os.path.exists(worker_file):
raise FileNotFoundError(f"File not found in worker: {worker_file}")
else:
os.remove(temp_path)
indexing_controller = IndexingController()
result = indexing_controller.process_file(
worker_file,
original_filename,
username,
course,
)
payload = {
"filename": original_filename,
"num_chunks": result["num_chunks"],
"collection": get_settings().QDRANT_COLLECTION,
"status": "completed",
"task_id": self.request.id,
"username": username,
"course": course,
}
httpx.post(
get_settings().CALLBACK_URL,
json=payload,
timeout=10,
)
return payload
except ConnectionError:
raise
except Exception as e:
try:
httpx.post(
get_settings().CALLBACK_URL,
json={
"filename": original_filename,
"status": "failed",
"error": str(e),
"task_id": self.request.id,
},
timeout=10,
)
except Exception:
pass
raise
finally:
try:
shutil.rmtree(worker_tmp)
if os.path.exists(temp_path):
os.remove(temp_path)
if os.path.exists(worker_file):
os.remove(worker_file)
except Exception:
pass