Spaces:
Paused
Paused
| import json | |
| import os, shutil, tempfile , httpx | |
| from celery_app import celery_app | |
| from config import get_settings | |
| from indexing.indexingController import IndexingController | |
| from routes.schemas.Exam_Models import ExamResponse | |
| from generation.ExamRagGenerator import ExamService | |
| import logging | |
| logger = logging.getLogger("Celery# ") | |
| def send_to_webhook(payload: dict, webhook_url: str): | |
| try: | |
| logger.info(f"Sending webhook to {webhook_url}") | |
| with httpx.Client(timeout=10) as client: | |
| response = client.post(webhook_url, json=payload) | |
| logger.info(f"Webhook status: {response.status_code}") | |
| logger.info(f"Webhook response: {response.text}") | |
| except Exception as e: | |
| logger.error("Webhook failed: %s", e) | |
| def _send_failure_callback(exam_id: str, task_id: str, error_message: str): | |
| try: | |
| httpx.post( | |
| get_settings().CALLBACK_URL, | |
| json={ | |
| "exam_id": exam_id, | |
| "status": "failed", | |
| "error": error_message, | |
| "task_id": task_id, | |
| }, | |
| timeout=15, | |
| ) | |
| except Exception: | |
| pass | |
| def generate_exam_task(self, request_dict: dict): | |
| service = ExamService() | |
| # Generate exam | |
| exam_response: ExamResponse = service.exam_task(request_dict) | |
| # Convert to dict for logging & webhook | |
| exam_dict = exam_response.model_dump() | |
| # Pretty-print full exam in Celery logs | |
| logger.info("Full exam output:\n%s", json.dumps(exam_dict, indent=2, ensure_ascii=False)) | |
| # Send to webhook | |
| webhook_url = get_settings().CALLBACK_URL | |
| send_to_webhook(exam_dict, webhook_url) | |
| return exam_dict | |
| def process_file_task(self,temp_path: str,original_filename: str,username: str | None,course: str | None,): | |
| worker_tmp = tempfile.mkdtemp(prefix="celery_") | |
| worker_file = os.path.join(worker_tmp, os.path.basename(temp_path)) | |
| try: | |
| shutil.copy2(temp_path, worker_file) | |
| if not os.path.exists(worker_file): | |
| raise FileNotFoundError(f"File not found in worker: {worker_file}") | |
| else: | |
| os.remove(temp_path) | |
| indexing_controller = IndexingController() | |
| result = indexing_controller.process_file( | |
| worker_file, | |
| original_filename, | |
| username, | |
| course, | |
| ) | |
| payload = { | |
| "filename": original_filename, | |
| "num_chunks": result["num_chunks"], | |
| "collection": get_settings().QDRANT_COLLECTION, | |
| "status": "completed", | |
| "task_id": self.request.id, | |
| "username": username, | |
| "course": course, | |
| } | |
| httpx.post( | |
| get_settings().CALLBACK_URL, | |
| json=payload, | |
| timeout=10, | |
| ) | |
| return payload | |
| except ConnectionError: | |
| raise | |
| except Exception as e: | |
| try: | |
| httpx.post( | |
| get_settings().CALLBACK_URL, | |
| json={ | |
| "filename": original_filename, | |
| "status": "failed", | |
| "error": str(e), | |
| "task_id": self.request.id, | |
| }, | |
| timeout=10, | |
| ) | |
| except Exception: | |
| pass | |
| raise | |
| finally: | |
| try: | |
| shutil.rmtree(worker_tmp) | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| if os.path.exists(worker_file): | |
| os.remove(worker_file) | |
| except Exception: | |
| pass | |