File size: 3,834 Bytes
1bc3f18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import json
import os, shutil, tempfile , httpx
from celery_app import celery_app
from config import get_settings
from indexing.indexingController import IndexingController
from routes.schemas.Exam_Models import ExamResponse
from generation.ExamRagGenerator import ExamService
import logging

logger = logging.getLogger("Celery# ")

def send_to_webhook(payload: dict, webhook_url: str):
    try:
        logger.info(f"Sending webhook to {webhook_url}")
        with httpx.Client(timeout=10) as client:
            response = client.post(webhook_url, json=payload)
        logger.info(f"Webhook status: {response.status_code}")
        logger.info(f"Webhook response: {response.text}")
    except Exception as e:
        logger.error("Webhook failed: %s", e)

def _send_failure_callback(exam_id: str, task_id: str, error_message: str):
    try:
        httpx.post(
            get_settings().CALLBACK_URL,
            json={
                "exam_id": exam_id,
                "status": "failed",
                "error": error_message,
                "task_id": task_id,
            },
            timeout=15,
        )
    except Exception:
        pass

@celery_app.task(bind=True,autoretry_for=(ConnectionError, TimeoutError),retry_backoff=5,retry_backoff_max=300,retry_jitter=True,retry_kwargs={"max_retries": 5},)
def generate_exam_task(self, request_dict: dict):
    service = ExamService()
    # Generate exam
    exam_response: ExamResponse = service.exam_task(request_dict)
    # Convert to dict for logging & webhook
    exam_dict = exam_response.model_dump()
    # Pretty-print full exam in Celery logs
    logger.info("Full exam output:\n%s", json.dumps(exam_dict, indent=2, ensure_ascii=False))
    # Send to webhook
    webhook_url = get_settings().CALLBACK_URL
    send_to_webhook(exam_dict, webhook_url)
    return exam_dict

@celery_app.task(bind=True,autoretry_for=(ConnectionError,),retry_backoff=10,retry_kwargs={"max_retries": 3},)
def process_file_task(self,temp_path: str,original_filename: str,username: str | None,course: str | None,):
    worker_tmp = tempfile.mkdtemp(prefix="celery_")
    worker_file = os.path.join(worker_tmp, os.path.basename(temp_path))
    try:
        shutil.copy2(temp_path, worker_file)
        if not os.path.exists(worker_file):
            raise FileNotFoundError(f"File not found in worker: {worker_file}")
        else:
            os.remove(temp_path)
        indexing_controller = IndexingController()
        result = indexing_controller.process_file(
            worker_file,      
            original_filename,
            username,
            course,
        )

        payload = {
            "filename": original_filename,
            "num_chunks": result["num_chunks"],
            "collection": get_settings().QDRANT_COLLECTION,
            "status": "completed",
            "task_id": self.request.id,
            "username": username,
            "course": course,
        }

        httpx.post(
            get_settings().CALLBACK_URL,
            json=payload,
            timeout=10,
        )

        return payload
    except ConnectionError:
        raise
    except Exception as e:
        try:
            httpx.post(
                get_settings().CALLBACK_URL,
                json={
                    "filename": original_filename,
                    "status": "failed",
                    "error": str(e),
                    "task_id": self.request.id,
                },
                timeout=10,
            )
        except Exception:
            pass
        raise

    finally:
        try:
            shutil.rmtree(worker_tmp)
            if os.path.exists(temp_path):
                os.remove(temp_path)
            if os.path.exists(worker_file):
                os.remove(worker_file)
        except Exception:
            pass