import json import os, shutil, tempfile , httpx from celery_app import celery_app from config import get_settings from indexing.indexingController import IndexingController from routes.schemas.Exam_Models import ExamResponse from generation.ExamRagGenerator import ExamService import logging logger = logging.getLogger("Celery# ") def send_to_webhook(payload: dict, webhook_url: str): try: logger.info(f"Sending webhook to {webhook_url}") with httpx.Client(timeout=10) as client: response = client.post(webhook_url, json=payload) logger.info(f"Webhook status: {response.status_code}") logger.info(f"Webhook response: {response.text}") except Exception as e: logger.error("Webhook failed: %s", e) def _send_failure_callback(exam_id: str, task_id: str, error_message: str): try: httpx.post( get_settings().CALLBACK_URL, json={ "exam_id": exam_id, "status": "failed", "error": error_message, "task_id": task_id, }, timeout=15, ) except Exception: pass @celery_app.task(bind=True,autoretry_for=(ConnectionError, TimeoutError),retry_backoff=5,retry_backoff_max=300,retry_jitter=True,retry_kwargs={"max_retries": 5},) def generate_exam_task(self, request_dict: dict): service = ExamService() # Generate exam exam_response: ExamResponse = service.exam_task(request_dict) # Convert to dict for logging & webhook exam_dict = exam_response.model_dump() # Pretty-print full exam in Celery logs logger.info("Full exam output:\n%s", json.dumps(exam_dict, indent=2, ensure_ascii=False)) # Send to webhook webhook_url = get_settings().CALLBACK_URL send_to_webhook(exam_dict, webhook_url) return exam_dict @celery_app.task(bind=True,autoretry_for=(ConnectionError,),retry_backoff=10,retry_kwargs={"max_retries": 3},) def process_file_task(self,temp_path: str,original_filename: str,username: str | None,course: str | None,): worker_tmp = tempfile.mkdtemp(prefix="celery_") worker_file = os.path.join(worker_tmp, os.path.basename(temp_path)) try: shutil.copy2(temp_path, worker_file) if not os.path.exists(worker_file): raise FileNotFoundError(f"File not found in worker: {worker_file}") else: os.remove(temp_path) indexing_controller = IndexingController() result = indexing_controller.process_file( worker_file, original_filename, username, course, ) payload = { "filename": original_filename, "num_chunks": result["num_chunks"], "collection": get_settings().QDRANT_COLLECTION, "status": "completed", "task_id": self.request.id, "username": username, "course": course, } httpx.post( get_settings().CALLBACK_URL, json=payload, timeout=10, ) return payload except ConnectionError: raise except Exception as e: try: httpx.post( get_settings().CALLBACK_URL, json={ "filename": original_filename, "status": "failed", "error": str(e), "task_id": self.request.id, }, timeout=10, ) except Exception: pass raise finally: try: shutil.rmtree(worker_tmp) if os.path.exists(temp_path): os.remove(temp_path) if os.path.exists(worker_file): os.remove(worker_file) except Exception: pass