diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,21 +1,67 @@ +# app.py import os import io import re -import uuid -from datetime import datetime -from typing import Dict, Any, List +import json +from typing import Dict, Any, List, Tuple import requests from fastapi import FastAPI, UploadFile, File, Form, HTTPException +from fastapi.middleware.cors import CORSMiddleware from PIL import Image, ImageOps, ImageFilter import pytesseract +from dotenv import load_dotenv +load_dotenv() + +# Optional extractors for DOCX/PDF +try: + from docx import Document # python-docx +except Exception: + Document = None + +try: + from pypdf import PdfReader +except Exception: + PdfReader = None + +try: + from pdf2image import convert_from_bytes # requires poppler +except Exception: + convert_from_bytes = None + from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity -# ✅ OpenAI SDK (pip install openai) -from openai import OpenAI +# ✅ Gemini SDK +try: + from google import genai +except Exception as e: + genai = None + print(f"[WARN] google-genai import failed: {e}") + +# ✅ Google Cloud Vision SDK (for better handwritten OCR) +try: + from google.cloud import vision + from google.cloud.vision_v1 import types + google_vision_available = True +except Exception as e: + google_vision_available = False + print(f"[WARN] google-cloud-vision import failed: {e}") + + +# ========================================================= +# ✅ FASTAPI APP INSTANCE +# ========================================================= +app = FastAPI() +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) # ========================================================= # ✅ TESSERACT PATH @@ -33,15 +79,487 @@ ERP_BASE = os.getenv("ERP_BASE", "https://erp.triz.co.in/lms_data") STORAGE_BASE = os.getenv("STORAGE_BASE", "https://erp.triz.co.in/storage/student/") ERP_TOKEN = os.getenv("ERP_TOKEN", "") + +# ========================================================= +# ✅ GEMINI CONFIG +# ========================================================= +GOOGLE_API_KEY = (os.getenv("GOOGLE_API_KEY") or "").strip() +GEMINI_MODEL = (os.getenv("GEMINI_MODEL", "models/gemini-flash-lite-latest") or "").strip() +if GEMINI_MODEL and not GEMINI_MODEL.startswith("models/"): + GEMINI_MODEL = "models/" + GEMINI_MODEL + +# ========================================================= +# ✅ GOOGLE CLOUD VISION CONFIG (for handwritten OCR) +# ========================================================= +GOOGLE_CLOUD_VISION_API_KEY = (os.getenv("GCV_API_KEY") or "").strip() +# Fall back to Gemini API key if no separate Vision key provided +if not GOOGLE_CLOUD_VISION_API_KEY and GOOGLE_API_KEY: + GOOGLE_CLOUD_VISION_API_KEY = GOOGLE_API_KEY + +vision_client = None +if google_vision_available and GOOGLE_CLOUD_VISION_API_KEY: + try: + # Use API key authentication + vision_client = vision.ImageAnnotatorClient(client_options={ + 'api_key': GOOGLE_CLOUD_VISION_API_KEY + }) + print("[INFO] Google Cloud Vision client initialized") + except Exception as e: + print(f"[WARN] Google Cloud Vision init failed: {e}") + +gemini_client = None +GEMINI_LAST_ERROR = "" + + +def _init_gemini_client() -> None: + global gemini_client, GEMINI_LAST_ERROR + + if gemini_client is not None: + return + + if not genai: + GEMINI_LAST_ERROR = "google-genai not installed / import failed" + gemini_client = None + return + + if not GOOGLE_API_KEY: + GEMINI_LAST_ERROR = "GOOGLE_API_KEY not set" + gemini_client = None + return + + try: + gemini_client = genai.Client(api_key=GOOGLE_API_KEY) + GEMINI_LAST_ERROR = "" + print("[INFO] Gemini client initialized") + except Exception as e: + gemini_client = None + GEMINI_LAST_ERROR = str(e) + print(f"[WARN] Gemini init failed: {GEMINI_LAST_ERROR}") + + +_init_gemini_client() + + +def parse_gemini_error(error_msg: str) -> dict: + msg = (error_msg or "").strip() + lower = msg.lower() + + if "service_disabled" in lower or "generativelanguage.googleapis.com" in lower: + return {"ok": False, "error_type": "GEMINI_SERVICE_DISABLED", "message": msg} + + if "api key" in lower or "invalid" in lower or "permission" in lower or "unauthorized" in lower: + return {"ok": False, "error_type": "GEMINI_KEY_OR_PERMISSION_ERROR", "message": msg} + + return {"ok": False, "error_type": "GEMINI_ERROR", "message": msg} + + +def generate_gemini_response( + prompt: str, + system_prompt: str = "", + max_tokens: int = 650, + temperature: float = 0.3, +) -> str: + global GEMINI_LAST_ERROR + + if gemini_client is None: + if not GEMINI_LAST_ERROR: + GEMINI_LAST_ERROR = "Gemini client not initialized" + return "" + + try: + contents = [] + if system_prompt: + contents.append(system_prompt) + contents.append(prompt) + + resp = gemini_client.models.generate_content( + model=GEMINI_MODEL, + contents=contents, + config={"temperature": temperature, "max_output_tokens": max_tokens}, + ) + text = (getattr(resp, "text", "") or "").strip() + if text: + GEMINI_LAST_ERROR = "" + return text + except Exception as e: + GEMINI_LAST_ERROR = str(e) + print(f"[ERROR] Gemini call failed: {GEMINI_LAST_ERROR}") + return "" + +import time + +def generate_gemini_with_retry(prompt: str, system_prompt: str, max_tokens=450, temperature=0.3, retries=3) -> str: + last = "" + for i in range(retries): + text = generate_gemini_response( + prompt=prompt, + system_prompt=system_prompt, + max_tokens=max_tokens, + temperature=temperature, + ) + if text: + return text + last = GEMINI_LAST_ERROR + # small backoff + time.sleep(1 + i) + return "" + +def cheap_overlap_score(student_text: str, prompt: str) -> int: + # remove tiny words + def tokens(s): + return {w for w in re.findall(r"[a-zA-Z]{4,}", (s or "").lower())} + s = tokens(student_text) + p = tokens(prompt) + if not s or not p: + return 0 + overlap = len(s & p) / max(1, len(p)) + # map to a sane range + return int(round(min(0.6, overlap) * 100)) # cap at 60 + + # ========================================================= -# ✅ OPENAI CONFIG +# ✅ SMALL UTILS # ========================================================= -OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") -OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini") # you can change -client = OpenAI(api_key=OPENAI_API_KEY) if OPENAI_API_KEY else None +def _norm(s: str) -> str: + return re.sub(r"\s+", " ", (s or "").strip().lower()) + + +def cosine_sim(a: str, b: str) -> float: + a = (a or "").strip().lower() + b = (b or "").strip().lower() + if not a or not b: + return 0.0 + vec = TfidfVectorizer().fit([a, b]) + X = vec.transform([a, b]) + return float(cosine_similarity(X[0], X[1])[0][0]) + + +def normalize_level(level: str) -> str: + l = (level or "").strip().lower() + if l in ("easy",): + return "Easy" + if l in ("hard",): + return "Hard" + if l in ("meadium", "mediam", "medium"): + return "Medium" + return "Medium" + + +def level_policy(student_level: str) -> dict: + lvl = normalize_level(student_level).lower() + if lvl == "easy": + return {"w_sim": 0.8, "w_cov": 0.2, "verified": 65, "partial": 40, "kp_thr": 0.25} + if lvl == "hard": + return {"w_sim": 0.4, "w_cov": 0.6, "verified": 85, "partial": 65, "kp_thr": 0.40} + return {"w_sim": 0.6, "w_cov": 0.4, "verified": 75, "partial": 55, "kp_thr": 0.20} + + +def mcq_partial_credit(student_level: str) -> dict: + """ + Returns partial credit percentage for MCQ questions based on student level. + This allows easier students to get partial marks even if they get some questions wrong. + + Returns dict with: + - credit_per_question: percentage earned per correct answer + - passing_threshold: minimum percentage needed to pass + """ + lvl = normalize_level(student_level).lower() + if lvl == "easy": + # Easy students get 50% credit per correct answer + return {"credit_per_question": 50, "passing_threshold": 50} + if lvl == "hard": + # Hard students need 100% - no partial credit + return {"credit_per_question": 100, "passing_threshold": 100} + # Medium students get 75% credit per correct answer + return {"credit_per_question": 75, "passing_threshold": 75} + + +def keypoint_coverage(student_text: str, key_points: List[str], kp_threshold: float) -> Tuple[List[str], List[str], float]: + covered, missing = [], [] + for kp in key_points: + kp = (kp or "").strip() + if not kp: + continue + s = cosine_sim(kp, student_text) + if s >= kp_threshold: + covered.append(kp) + else: + missing.append(kp) + + total = len(covered) + len(missing) + coverage = (len(covered) / total) if total else 0.0 + return covered, missing, coverage + + +# ========================================================= +# ✅ QUESTION TYPE INFERENCE + MCQ PARSING +# ========================================================= +def infer_question_type_from_prompt(prompt: str) -> str: + p = _norm(prompt) + + # Explicit markers - check for (mcq) first since it's common in parentheses + if re.search(r"\(mcq\)", p) or re.search(r"\btype\s*:\s*mcq\b", p) or re.search(r"\bquestion_type\s*:\s*mcq\b", p): + return "mcq" + if re.search(r"\btype\s*:\s*narrative\b", p) or re.search(r"\bquestion_type\s*:\s*narrative\b", p): + return "narrative" + + # Heuristic: options A/B/C/D exist -> likely MCQ + if re.search(r"\b(a|b|c|d)\s*[\)\.]\s+", p) or "option a" in p or "option b" in p: + return "mcq" + + return "narrative" + + +def parse_questions_from_prompt(prompt: str) -> List[Dict[str, Any]]: + """ + Parse individual questions from the prompt, detecting MCQ vs Narrative for each. + Returns list of dicts with: qid, type, question_text, correct_answer (for MCQ) + """ + questions = [] + # Match patterns like "Q1:", "Q2.", "Question 1:", etc. + q_pattern = re.compile(r'(Q\s*\d+[.:]\s*|Question\s*\d+[.:]\s*)(.*?)(?=(Q\s*\d|Question\s*\d|$))', re.IGNORECASE | re.DOTALL) + + # Alternative: split by Q1, Q2, etc. + lines = prompt.split('\n') + current_q = None + current_type = None + current_qid = None + current_correct = None + + for line in lines: + line = line.strip() + if not line: + continue + + # Detect new question + q_match = re.match(r'^(Q\s*\d+|Question\s*\d+)[.:]\s*(.*)', line, re.IGNORECASE) + if q_match: + # Save previous question if exists + if current_q is not None: + questions.append({ + 'qid': current_qid, + 'type': current_type, + 'question': current_q, + 'correct_answer': current_correct + }) + # Start new question + current_qid = q_match.group(1).strip() + remaining = q_match.group(2).strip() + current_q = remaining + current_type = None + current_correct = None + + # Check if this is MCQ or Narrative + line_lower = line.lower() + if '(mcq)' in line_lower or 'multiple choice' in line_lower or 'type: mcq' in line_lower: + current_type = 'mcq' + elif 'narrative' in line_lower or 'type: narrative' in line_lower: + current_type = 'narrative' + else: + # This line belongs to current question + if current_q is not None: + current_q += ' ' + line + + # Check for type markers + line_lower = line.lower() + if current_type is None: + if '(mcq)' in line_lower or 'multiple choice' in line_lower or 'type: mcq' in line_lower: + current_type = 'mcq' + elif 'narrative' in line_lower or 'type: narrative' in line_lower: + current_type = 'narrative' + + # Check for correct answer (for MCQ) + if current_type == 'mcq': + # First check: is this line "Correct Answer(s):" with nothing after it? + # If so, we need to look for the answer on the next line + if re.search(r'^correct\s*answer\s*\(?s\)?\s*[:\.]?\s*$', line, re.IGNORECASE): + # Set flag to look for answer on next line + current_q += ' [CORRECT_ANSWER_PENDING]' + continue + + # Check if we have a pending correct answer marker + if '[CORRECT_ANSWER_PENDING]' in current_q: + # This line should contain the answer like "A. Devdatta" + letter_match = re.search(r'^([A-D])\.?\s*', line) + if letter_match: + current_correct = letter_match.group(1).upper() + # Remove the pending marker from question text + current_q = current_q.replace(' [CORRECT_ANSWER_PENDING]', '') + continue + else: + # Not a letter, remove the pending marker + current_q = current_q.replace(' [CORRECT_ANSWER_PENDING]', '') + + # Look for "Correct Answer(s):" or "Correct:" or "Answer:" in same line + # Support formats: "Correct Answer(s): A.", "Correct: B", "Answer: C" + correct_match = re.search(r'(?:Correct\s*(?:Answer)?|Answer)[:.]\s*(?:[A-D]\.?\s*)?(.+)', line, re.IGNORECASE) + if correct_match and not current_correct: + # Extract just the letter (A, B, C, or D) + correct_text = correct_match.group(1).strip() + letter_match = re.search(r'^([A-D])\b', correct_text) + if letter_match: + current_correct = letter_match.group(1).upper() + else: + # Try to extract first letter + current_correct = correct_text[0].upper() if correct_text else None + + # Don't forget the last question + if current_q is not None: + questions.append({ + 'qid': current_qid, + 'type': current_type, + 'question': current_q, + 'correct_answer': current_correct + }) + + # If no questions parsed, fall back to old behavior + if not questions: + qtype = infer_question_type_from_prompt(prompt) + return [{'qid': 'Q1', 'type': qtype, 'question': prompt, 'correct_answer': None}] + + return questions + + +def extract_mcq_choice(text: str) -> str: + """ + Extract chosen option from student text: + supports: A, (B), Option C, Ans: D, Answer: B + """ + t = _norm(text) + + m = re.search(r"\b(answer|ans|selected)\s*[:\-]?\s*\(?\s*([a-d])\s*\)?\b", t) + if m: + return m.group(2) + + m2 = re.search(r"\boption\s*([a-d])\b", t) + if m2: + return m2.group(1) + + m3 = re.search(r"^\(?\s*([a-d])\s*\)?$", t.strip()) + if m3: + return m3.group(1) + + # last-resort: find first standalone A/B/C/D + m4 = re.search(r"\b([a-d])\b", t) + if m4: + return m4.group(1) + + return "" + + +def extract_mcq_answers_with_qid(text: str) -> Dict[str, str]: + """ + Extract MCQ answers WITH question numbers from student text. + This handles shuffled answers where question numbers are needed to match. + + Supports patterns like: + - "Q1: A, Q2: C, Q3: B" + - "Q1. A Q2. C Q3. B" + - "1) A 2) C 3) B" + - "Answer 1: A Answer 2: C Answer 3: B" + - "Q1 A Q2 C Q3 B" (space separated) + + Returns dict like: {"Q1": "A", "Q2": "C", "Q3": "B"} + """ + results = {} + t = (text or "").strip() + + if not t: + return results + + # Pattern 1: Q1: A, Q2. B, Q3 - C, Question 4: D + pattern1 = re.compile(r'(Q(?:uestion)?\s*(\d+))[:.\-\s]+([a-dA-D])', re.IGNORECASE) + for match in pattern1.finditer(t): + qnum = match.group(2) + answer = match.group(3).upper() + results[f"Q{qnum}"] = answer + + # Pattern 2: 1) A, 2) B, 3: C (numbered without Q prefix) + pattern2 = re.compile(r'(?:^|\s)(\d+)\s*[\):\.]\s*([a-dA-D])(?:\s|$)', re.IGNORECASE) + for match in pattern2.finditer(t): + qnum = match.group(1) + answer = match.group(2).upper() + # Only add if not already found (Q pattern takes priority) + if f"Q{qnum}" not in results: + results[f"Q{qnum}"] = answer + + # Pattern 3: "Answer for Q1 is A", "Answer to question 2: B" + pattern3 = re.compile(r'(?:answer|ans)\s*(?:for|to)?\s*(?:Q(?:uestion)?\s*)?(\d+)\s*(?:is|was)?\s*[:\-]?\s*([a-dA-D])', re.IGNORECASE) + for match in pattern3.finditer(t): + qnum = match.group(1) + answer = match.group(2).upper() + if f"Q{qnum}" not in results: + results[f"Q{qnum}"] = answer + + # Pattern 4: Line by line format like "Q1 A" or "1 A" on same line + pattern4 = re.compile(r'(?:^|\n)\s*(Q(?:uestion)?\s*)?(\d+)\s+([a-dA-D])\s*(?:\n|\s{2,}|$)', re.IGNORECASE) + for match in pattern4.finditer(t): + qnum = match.group(2) + answer = match.group(3).upper() + if f"Q{qnum}" not in results: + results[f"Q{qnum}"] = answer + + return results -app = FastAPI(title="Homework Validation System (LLM Remarks)") +def extract_correct_mcq_from_prompt(prompt: str) -> str: + """ + This is IMPORTANT: + Your prompt must contain correct option somewhere like: + - Correct: B + - Answer: C + - correct_option: D + - Correct Answer(s): A. Devdatta + or JSON: {"correct_option":"B"} + + Supports formats: + - "Correct Answer: A" + - "Correct Answer(s): A. Devdatta" + - "Correct: B" + - "Answer: C" + """ + p = (prompt or "").strip() + if not p: + return "" + + # JSON prompt support + if p.startswith("{") and p.endswith("}"): + try: + obj = json.loads(p) + for k in ("correct_option", "correct", "answer", "ans"): + v = obj.get(k) + if isinstance(v, str) and v.strip(): + return extract_mcq_choice(v) + except Exception: + pass + + # Text prompt support - new format: "Correct Answer(s): A. Devdatta" or "Correct Answer: B" + t = _norm(p) + + # Pattern 1: "Correct Answer(s): A. ..." or "Correct Answer: B. ..." + # This handles format like "Correct Answer(s): A. Devdatta" or "Correct Answer(s): + # A. Devdatta" + m1 = re.search(r"correct\s*answer\s*\(?s\)?\s*[:\.]\s*([a-d])\.?\s*", t) + if m1: + return m1.group(1) + + # Pattern 1b: Handle multi-line format where answer is on next line like: + # "Correct Answer(s):\n A. Devdatta" + m1b = re.search(r"correct\s*answer\s*\(?s\)?\s*[:\.]\s*\n\s*([a-d])\.?", t) + if m1b: + return m1b.group(1) + + # Pattern 1c: Handle format with option text after letter like "Correct Answer(s): A. Devdatta" + m1c = re.search(r"correct\s*answer\s*\(?s\)?\s*[:\.]\s*([a-d])\.", t) + if m1c: + return m1c.group(1) + + # Pattern 2: "Correct: A" or "Answer: B" (original pattern) + m = re.search(r"\b(correct|answer|ans)\s*[:\-]?\s*\(?\s*([a-d])\s*\)?\b", t) + if m: + return m.group(2) + + return "" # ========================================================= @@ -67,376 +585,1291 @@ def fetch_student_record(homework_id: int, student_id: int) -> Dict[str, Any]: return data[0] -def fetch_teacher_image_by_homework_id(homework_id: int) -> str: - data = _erp_get({"table": "homework", "filters[id]": homework_id}) - if not data: - raise HTTPException(status_code=404, detail="No ERP homework record found for this homework_id") - - row = data[0] - for key in ("image", "teacher_image", "reference_image", "solution_image"): - v = (row.get(key) or "").strip() - if v: - return v - - raise HTTPException( - status_code=422, - detail="Teacher image missing in ERP for this homework_id (image/teacher_image/reference_image/solution_image all empty).", - ) - - -# ========================================================= -# ✅ DOWNLOAD -# ========================================================= -def _looks_like_html(b: bytes) -> bool: - head = (b[:300] or b"").lower() - return (b" bytes: - headers = {} - if ERP_TOKEN: - headers["Authorization"] = f"Bearer {ERP_TOKEN}" - - r = requests.get(url, headers=headers, timeout=30) - r.raise_for_status() - b = r.content or b"" - - if _looks_like_html(b): - raise HTTPException(status_code=502, detail="Teacher image URL returned HTML (not an image). Storage may require auth.") - - return b +def fetch_student_level_from_erp(row: Dict[str, Any]) -> str: + """ + ERP field name is not guaranteed; try common ones. + """ + for k in ("student_level", "level", "difficulty", "difficulty_level"): + v = row.get(k) + if isinstance(v, str) and v.strip(): + return normalize_level(v) + return "Medium" # ========================================================= -# ✅ OCR +# ✅ OCR + TEXT EXTRACTION - IMPROVED FOR HANDWRITTEN # ========================================================= def _preprocess_for_ocr(img: Image.Image) -> Image.Image: + """ + Enhanced preprocessing for better OCR on handwritten images. + Includes adaptive thresholding, noise removal, and contrast enhancement. + """ + # Convert to grayscale img = img.convert("L") - img = ImageOps.autocontrast(img) - + w, h = img.size - if max(w, h) < 1600: - scale = 1600 / max(w, h) - img = img.resize((int(w * scale), int(h * scale))) - - img = img.filter(ImageFilter.SHARPEN) - img = img.point(lambda p: 255 if p > 170 else 0) + + # Scale up for better detail (especially for handwritten) + if max(w, h) < 2000: + scale = 2000 / max(w, h) + new_w = int(w * scale) + new_h = int(h * scale) + img = img.resize((new_w, new_h), Image.LANCZOS) + + # Apply adaptive thresholding for better handwritten recognition + from PIL import ImageFilter + + # Try multiple preprocessing approaches and use the best + img_enhanced = img + + # Method 1: Increase contrast significantly + img_contrast = img.point(lambda p: 255 if p > 180 else int(p * 1.5)) + + # Method 2: Apply sharpening twice for handwritten + img_sharp = img.filter(ImageFilter.SHARPEN) + img_sharp = img_sharp.filter(ImageFilter.SHARPEN) + + # Method 3: Apply unsharp mask for edge enhancement + img_unsharp = img.filter(ImageFilter.UnsharpMask(radius=2, percent=150, threshold=3)) + + # Use the sharpened version as primary + img = img_sharp + + # Apply binary threshold with lower cutoff to capture lighter handwriting + img = img.point(lambda p: 255 if p > 160 else 0) + return img +def _extract_text_google_vision(image_bytes: bytes) -> str: + """ + Extract text using Google Cloud Vision API - much better for handwriting. + Returns empty string if API is not available. + """ + global vision_client + + if not vision_client: + return "" + + try: + # Create image object + image = vision.Image(content=image_bytes) + + # Use document text detection for better handwriting + response = vision_client.document_text_detection(image=image) + + if response.texts: + return "\n".join([t.description for t in response.texts]) + return "" + except Exception as e: + print(f"[WARN] Google Vision OCR failed: {e}") + return "" + + def extract_text_from_image(image_bytes: bytes, filename: str = "unknown") -> str: - """Extract text from image bytes with validation.""" - # Validate that we have actual image data if not image_bytes or len(image_bytes) < 50: - raise HTTPException(status_code=400, detail=f"Invalid file: '{filename}' - file is empty or too small") - - # Check for common image magic bytes + raise HTTPException(status_code=400, detail=f"Invalid file: '{filename}' - empty/too small") + valid_image_signatures = { - b'\xff\xd8\xff': 'JPEG', - b'\x89PNG\r\n\x1a\n': 'PNG', - b'GIF87a': 'GIF', - b'GIF89a': 'GIF', - b'BM': 'BMP', + b"\xff\xd8\xff": "JPEG", + b"\x89PNG\r\n\x1a\n": "PNG", + b"GIF87a": "GIF", + b"GIF89a": "GIF", + b"BM": "BMP", } + is_valid = any(image_bytes.startswith(sig) for sig in valid_image_signatures) + if not is_valid: + head = image_bytes[:12] + raise HTTPException(status_code=400, detail=f"Invalid image format: '{filename}' (header={head})") + + # First try Google Cloud Vision (better for handwriting) + if vision_client: + gv_text = _extract_text_google_vision(image_bytes) + if gv_text and len(gv_text.strip()) > 10: + return _clean_extracted_text(gv_text) - is_valid_image = False - detected_type = None - for sig, img_type in valid_image_signatures.items(): - if image_bytes[:len(sig)] == sig: - is_valid_image = True - detected_type = img_type - break - - if not is_valid_image: - # Try to identify what was actually sent - file_header = image_bytes[:20] - raise HTTPException( - status_code=400, - detail=f"Invalid image format: '{filename}' - not a valid image file (detected: {file_header[:10]}). Supported formats: JPEG, PNG, GIF, BMP" - ) - + # Fallback to Tesseract with improved preprocessing try: img = Image.open(io.BytesIO(image_bytes)) except Exception as e: - raise HTTPException(status_code=400, detail=f"Invalid image: '{filename}' - cannot read image file: {str(e)}") + raise HTTPException(status_code=400, detail=f"Invalid image '{filename}': {e}") img = _preprocess_for_ocr(img) - try: - text = pytesseract.image_to_string(img, lang="eng", config="--oem 3 --psm 6") - except pytesseract.TesseractNotFoundError: - raise HTTPException(status_code=500, detail="Tesseract OCR not found. Install it / fix path.") - except Exception as e: - raise HTTPException(status_code=500, detail=f"OCR failed: {e}") - - text = (text or "").strip() + # Try multiple OCR configurations for better handwritten recognition + ocr_configs = [ + "--oem 3 --psm 6", # Default + "--oem 3 --psm 4", # Treat as single column + "--oem 1 --psm 3", # Fully automatic + ] + + best_text = "" + best_confidence = 0 + + for config in ocr_configs: + try: + text = pytesseract.image_to_string(img, lang="eng", config=config) + if text and len(text.strip()) > len(best_text.strip()): + best_text = text + except Exception: + continue + + if not best_text: + # Fallback to default if all fail + try: + best_text = pytesseract.image_to_string(img, lang="eng", config="--oem 3 --psm 6") + except pytesseract.TesseractNotFoundError: + raise HTTPException(status_code=500, detail="Tesseract OCR not found. Install it / fix path.") + except Exception as e: + raise HTTPException(status_code=500, detail=f"OCR failed: {e}") + + text = (best_text or "").strip() text = re.sub(r"[ \t]+", " ", text) return text -# ========================================================= -# ✅ SIMILARITY -# ========================================================= -def cosine_sim(a: str, b: str) -> float: - a = (a or "").strip().lower() - b = (b or "").strip().lower() - if not a or not b: - return 0.0 - vec = TfidfVectorizer().fit([a, b]) - X = vec.transform([a, b]) - return float(cosine_similarity(X[0], X[1])[0][0]) +def _clean_extracted_text(text: str) -> str: + text = (text or "").strip() + text = re.sub(r"[ \t]+", " ", text) + text = re.sub(r"\n{3,}", "\n\n", text) + return text.strip() -def grade_similarity(teacher_text: str, student_text: str, threshold: float) -> Dict[str, Any]: - teacher_text = (teacher_text or "").strip() - student_text = (student_text or "").strip() +def extract_text_from_docx(docx_bytes: bytes, filename: str = "unknown.docx") -> str: + if Document is None: + raise HTTPException(status_code=500, detail="DOCX support not installed. Add 'python-docx'.") + try: + doc = Document(io.BytesIO(docx_bytes)) + parts = [] + for p in doc.paragraphs: + if p.text and p.text.strip(): + parts.append(p.text.strip()) + for t in doc.tables: + for row in t.rows: + cells = [c.text.strip() for c in row.cells if c.text and c.text.strip()] + if cells: + parts.append(" | ".join(cells)) + return _clean_extracted_text("\n".join(parts)) + except Exception as e: + raise HTTPException(status_code=400, detail=f"Unable to read DOCX '{filename}': {e}") + + +def extract_text_from_pdf(pdf_bytes: bytes, filename: str = "unknown.pdf") -> Dict[str, Any]: + used_ocr = False + extracted = "" + + if PdfReader is not None: + try: + reader = PdfReader(io.BytesIO(pdf_bytes)) + parts = [] + for page in reader.pages: + t = page.extract_text() or "" + if t.strip(): + parts.append(t) + extracted = _clean_extracted_text("\n\n".join(parts)) + except Exception: + extracted = "" + + if len(extracted) < 50: + if convert_from_bytes is None: + return {"text": extracted, "used_ocr": False, "needs_ocr": True} + try: + used_ocr = True + # Higher DPI for better handwritten OCR + pages = convert_from_bytes(pdf_bytes, dpi=300) + page_texts = [] + for img in pages: + # Use the improved preprocessing + img = _preprocess_for_ocr(img) + + # Try multiple OCR configs + for config in ["--oem 3 --psm 6", "--oem 3 --psm 4", "--oem 1 --psm 3"]: + try: + t = pytesseract.image_to_string(img, lang="eng", config=config) or "" + if t.strip() and len(t.strip()) > 20: + page_texts.append(t) + break + except: + continue + + if page_texts: + extracted = _clean_extracted_text("\n\n".join(page_texts)) + else: + # Final fallback with default config + img = pages[0] if pages else None + if img: + img = _preprocess_for_ocr(img) + extracted = pytesseract.image_to_string(img, lang="eng", config="--oem 3 --psm 6") or "" + except Exception as e: + return {"text": extracted, "used_ocr": used_ocr, "needs_ocr": True, "ocr_error": str(e)} + + return {"text": extracted, "used_ocr": used_ocr, "needs_ocr": False} + + +async def extract_text_from_upload(file: UploadFile) -> Dict[str, Any]: + filename = getattr(file, "filename", "") or "upload" + content_type = (getattr(file, "content_type", "") or "").lower() + data = await file.read() + + if not data or len(data) < 20: + return {"text": "", "kind": "unknown", "used_ocr": False, "needs_ocr": False, "error": "empty"} + + ext = (os.path.splitext(filename)[1] or "").lower() + + is_image = content_type.startswith("image/") or ext in {".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp"} + is_pdf = (content_type == "application/pdf") or ext == ".pdf" + is_docx = (content_type in { + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/msword" + }) or ext in {".docx", ".doc"} + + if is_image: + try: + return {"text": _clean_extracted_text(extract_text_from_image(data, filename=filename)), + "kind": "image", "used_ocr": True, "needs_ocr": False} + except HTTPException as e: + return {"text": "", "kind": "image", "used_ocr": True, "needs_ocr": False, "error": e.detail} + + if is_docx: + try: + return {"text": _clean_extracted_text(extract_text_from_docx(data, filename=filename)), + "kind": "docx", "used_ocr": False, "needs_ocr": False} + except HTTPException as e: + return {"text": "", "kind": "docx", "used_ocr": False, "needs_ocr": False, "error": e.detail} + + if is_pdf: + info = extract_text_from_pdf(data, filename=filename) + return {"text": info.get("text", ""), "kind": "pdf", + "used_ocr": bool(info.get("used_ocr", False)), + "needs_ocr": bool(info.get("needs_ocr", False)), + "ocr_error": info.get("ocr_error")} + + # fallback: try as image + try: + return {"text": _clean_extracted_text(extract_text_from_image(data, filename=filename)), + "kind": "unknown_as_image", "used_ocr": True, "needs_ocr": False} + except Exception: + return {"text": "", "kind": "unknown", "used_ocr": False, "needs_ocr": False, + "error": f"Unsupported file type: {content_type or ext or 'unknown'}"} - if not teacher_text: - return {"status": "NO_TEACHER_TEXT", "overall_score": None, "threshold": float(threshold)} - if not student_text: - return {"status": "NO_STUDENT_TEXT", "overall_score": None, "threshold": float(threshold)} - sim = cosine_sim(student_text, teacher_text) - return {"status": "EVALUATED", "overall_score": sim, "threshold": float(threshold)} # ========================================================= -# ✅ FALLBACK MESSAGES - Score-appropriate and varied +# ✅ ROUTES # ========================================================= -HIGH_SCORE_MESSAGES = [ - "Excellent work! Your {:.0%} score shows great understanding of the material!", - "Amazing job! You achieved {:.0%} - your hard work is paying off!", - "Outstanding performance with {:.0%} - keep up the fantastic work!", - "Brilliant! {:.0%} demonstrates excellent grasp of concepts!", - "Perfect! {:.0%} shows you've mastered this topic completely!", - "Wonderful! {:.0%} reflects your dedication and smart work!", - "Spot on! {:.0%} shows you've understood everything perfectly!", - "Spectacular! {:.0%} - you're doing an amazing job!", - "Marvelous! {:.0%} shows exceptional understanding!", - "Superb! {:.0%} - your efforts have truly paid off!", - "Fantastic! {:.0%} demonstrates your excellent grasp!", - "Incredible! {:.0%} - you're exceeding all expectations!", - "Remarkable! {:.0%} shows true mastery of the subject!", - "Brilliant work! {:.0%} reflects your hard work and talent!", - "Spectacular! {:.0%} shows you're a natural!", - "Outstanding! {:.0%} - you're crushing it!", - "Magnificent! {:.0%} shows incredible dedication!", - "Excellent! {:.0%} - you're making amazing progress!", - "Superb work! {:.0%} shows your commitment!", - "First class! {:.0%} - you're doing wonderfully!", -] - -MEDIUM_SCORE_MESSAGES = [ - "Good effort! Your score of {:.0%} shows decent understanding. Review missed parts to improve!", - "Solid work at {:.0%}. Focus on areas where you lost marks for next time!", - "You're making progress at {:.0%}. Keep practicing the topics you missed!", - "Nice try at {:.0%}. A bit more study will help you reach full marks!", - "Nice work! {:.0%} shows potential - review and improve!", - "Good progress! {:.0%} - keep pushing forward!", - "Decent attempt at {:.0%}. Some areas need more attention!", - "Good start at {:.0%}. Build on this foundation!", - "Promising {:.0%}. Spend more time on challenging topics!", - "Nearly there! {:.0%} - almost perfect, keep trying!", - "Keep going! {:.0%} shows you're on the right track!", - "Good improvement! {:.0%} - continue this positive trend!", - "Nice effort! {:.0%} - review what you missed and grow!", - "Well done! {:.0%} - a little more practice will help!", - "Good job! {:.0%} - focus on weak areas next time!", - "Promising! {:.0%} - you're getting closer to mastery!", - "Keep studying! {:.0%} - every bit of effort counts!", - "Nice work! {:.0%} - identify gaps and fill them!", - "Growing! {:.0%} - you're making steady progress!", - "Focused! {:.0%} - keep refining your understanding!", -] - -LOW_SCORE_MESSAGES = [ - "Your score of {:.0%} shows you need to review the material more carefully.", - "Keep trying! {:.0%} means there's room for improvement. Review the teacher's answers!", - "Your submission scored {:.0%}. Please review the correct answers and try again!", - "At {:.0%}, you'll need to study the material more thoroughly before resubmitting.", - "{:.0%} suggests more practice is needed. Go through the concepts again!", - "{:.0%} is a starting point. Focus on understanding the basics!", - "{:.0%} indicates you should revisit the topics covered. Don't give up!", - "{:.0%} means it's time for extra study. Review and try again!", - "{:.0%} - please review the lesson materials and resubmit!", - "{:.0%} shows you need more practice. Keep working at it!", - "{:.0%} - every expert was once a beginner. Keep learning!", - "{:.0%} - identify what you missed and study those areas!", - "{:.0%} - review the reference materials carefully!", - "{:.0%} - don't be discouraged, persistence pays off!", - "{:.0%} - take time to understand each concept step by step!", - "{:.0%} - practice makes perfect. Try again soon!", - "{:.0%} - this is an opportunity to learn and grow!", - "{:.0%} - focus on understanding, not just memorizing!", - "{:.0%} - put in more time and effort to improve!", - "{:.0%} - review, practice, and you'll get better!", -] +@app.get("/health") +def health(): + return {"status": "ok"} -# ========================================================= -# ✅ LLM REMARK (for individual image evaluation) -# ========================================================= -def generate_llm_remark( - teacher_text: str, - student_text: str, - sim_score: float, - threshold: float, - completion_status: str = "N", - unique_seed: str = "" -) -> str: - """ - Generate AI-generated remark using OpenAI API for individual image evaluation. - unique_seed ensures different outputs even for identical inputs. - """ - if client is None: - return "AI remark generation unavailable (OpenAI API key not configured)." - - # Keep excerpts reasonable - teacher_excerpt = (teacher_text or "")[:800] - student_excerpt = (student_text or "")[:800] - - # Determine if individual answer passed - passed = sim_score >= threshold - - # System prompt for maximum variation - system_prompt = ( - "You are a creative teacher giving unique feedback each time. " - "CRITICAL: You MUST create COMPLETELY DIFFERENT responses for each submission. " - "Never repeat the same words, phrases, or structure. " - "Use different metaphors, emojis, encouragement styles, and expressions. " - "Keep it concise but always fresh and unique." - ) +@app.get("/health/llm") +def health_llm(): + return { + "ok": bool(gemini_client) and bool(GOOGLE_API_KEY), + "gemini": { + "sdk_import_ok": genai is not None, + "configured": bool(GOOGLE_API_KEY), + "client_ready": gemini_client is not None, + "model": GEMINI_MODEL, + "last_error": GEMINI_LAST_ERROR if GEMINI_LAST_ERROR else None, + }, + } + + +@app.post("/homework/validate") +async def homework_validate( + student_id: int = Form(...), + homework_id: int = Form(...), + sub_institute_id: int = Form(...), + syear: str = Form(...), + prompt: str = Form(...), + student_file: UploadFile = File(...), +): + # 0) Fetch ERP record -> get student_level automatically + erp_row = fetch_student_record(homework_id, student_id) + student_level = fetch_student_level_from_erp(erp_row) + policy = level_policy(student_level) + + # 1) Infer question_type from prompt automatically (NO EXTRA FIELD) + # Try to parse mixed questions first + parsed_questions = parse_questions_from_prompt(prompt) + has_mcq = any(q.get('type') == 'mcq' for q in parsed_questions) + has_narrative = any(q.get('type') == 'narrative' for q in parsed_questions) - # User prompt with unique_seed + # Determine overall question type for backwards compatibility + if has_mcq and has_narrative: + question_type = "mixed" + elif has_mcq: + question_type = "mcq" + elif has_narrative: + question_type = "narrative" + else: + question_type = infer_question_type_from_prompt(prompt) + + # 2) Extract student text + student_info = await extract_text_from_upload(student_file) + student_text = (student_info.get("text") or "").strip() + + MIN_WORDS = 3 if question_type == "mcq" else 8 + if len(student_text.split()) < MIN_WORDS: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": question_type, + "student_level": student_level, + "status": "Unreadable", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "Answer text could not be read clearly. Please upload a clearer file.", + "student_extracted_text": student_text, + "llm_used": False, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + if student_info.get("needs_ocr") and not student_text: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": question_type, + "student_level": student_level, + "status": "Unreadable", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "This PDF looks scanned. OCR is required (install pdf2image + poppler) or upload a clearer file.", + "student_extracted_text": student_text, + "llm_used": False, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + # ========================================================= + # ✅ MIXED QUESTION TYPES CHECK (MCQ + Narrative) + # ========================================================= + if question_type == "mixed": + # Process each question type separately and combine results + mcq_results = [] + narrative_results = [] + + # Extract ALL MCQ answers from student text with question numbers + student_answers_by_qid = extract_mcq_answers_with_qid(student_text) + + # Extract MCQ answers from student text for each MCQ question + for q in parsed_questions: + if q.get('type') == 'mcq': + qid = q.get('qid', '') + q_num = qid.replace('Q', '').strip() if qid else '' + + # Try to get answer by question number first + chosen = student_answers_by_qid.get(qid) or student_answers_by_qid.get(f"Q{q_num}") + + # Fallback to old method if no question number found + if not chosen: + chosen = extract_mcq_choice(student_text) + + correct = q.get('correct_answer') or extract_correct_mcq_from_prompt(q.get('question', '')) + + if correct and chosen: + is_correct = (chosen.lower().strip() == correct.lower().strip()) + mcq_results.append({ + 'qid': qid, + 'correct': is_correct, + 'chosen': chosen, + 'correct_answer': correct + }) + + # For narrative questions, use AI to generate reference + narrative_questions = [q for q in parsed_questions if q.get('type') == 'narrative'] + + if narrative_questions and gemini_client: + # Combine narrative questions into one prompt for AI + narrative_prompt_text = "\n".join([ + f"{q.get('qid')}: {q.get('question')}" for q in narrative_questions + ]) + + ai_prompt = ( + f"STUDENT_LEVEL: {student_level}\n" + f"QUESTIONS:\n{narrative_prompt_text}\n\n" + 'Return ONLY valid JSON with keys: {"ai_reference_answer": string, "key_points": [string, ...]}.' + ) + + response_text = generate_gemini_response( + prompt=ai_prompt, + system_prompt=( + "Generate correct reference answers for homework evaluation. " + "Keep it aligned with the student level. Output strict JSON only." + ), + max_tokens=650, + temperature=0.3, + ) + + if response_text: + try: + m = re.search(r'\{.*\}', response_text, flags=re.S) + payload = json.loads(m.group(0) if m else response_text) + + ai_reference_answer = (payload.get("ai_reference_answer") or "").strip() + key_points = payload.get("key_points") or [] + + if isinstance(key_points, list): + key_points = [str(x).strip() for x in key_points if str(x).strip()] + + sim = cosine_sim(student_text, ai_reference_answer) + covered, missing, coverage = keypoint_coverage( + student_text, key_points, kp_threshold=policy["kp_thr"] + ) + + final = policy["w_sim"] * sim + policy["w_cov"] * coverage + match_pct = int(round(final * 100)) + + narrative_results = { + 'similarity': sim, + 'coverage': coverage, + 'match_percentage': match_pct, + 'key_points_covered': covered, + 'key_points_missing': missing + } + except Exception as e: + narrative_results = {'error': str(e)} + + # Calculate combined score with level-based partial credit for MCQ + total_mcq = len(mcq_results) + correct_mcq = sum(1 for r in mcq_results if r.get('correct')) + + # Get level-based credit per question + mcq_credit = mcq_partial_credit(student_level) + credit_per_q = mcq_credit["credit_per_question"] + passing_threshold = mcq_credit["passing_threshold"] + + # Calculate MCQ score based on level (not just binary correct/incorrect) + mcq_score = (correct_mcq * credit_per_q) / max(1, total_mcq) + + narrative_score = narrative_results.get('match_percentage', 0) if narrative_results else 0 + + # Weight: 50% MCQ, 50% Narrative (if both exist) + if total_mcq > 0 and narrative_results and 'error' not in narrative_results: + final_score = int((mcq_score + narrative_score) / 2) + elif total_mcq > 0: + final_score = mcq_score + elif narrative_results and 'error' not in narrative_results: + final_score = narrative_score + else: + final_score = 0 + + # Determine status + if final_score >= policy["verified"]: + status = "Verified" + elif final_score >= policy["partial"]: + status = "Partial" + else: + status = "Needs Review" + + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mixed", + "student_level": student_level, + "status": status, + "match_percentage": final_score, + "ai_generated_remark": None, + "rule_based_remark": f"MCQ: {correct_mcq}/{total_mcq} correct. Narrative score: {narrative_score}%. (Level: {student_level}, Credit per Q: {credit_per_q}%)", + "llm_used": bool(narrative_results and 'error' not in narrative_results), + "student_extracted_text": student_text, + "mcq_results": mcq_results, + "narrative_results": narrative_results, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + "debug": { + "erp_row_fields": list(erp_row.keys()) if erp_row else [], + "erp_student_level_raw": erp_row.get("student_level") or erp_row.get("level") or erp_row.get("difficulty") or erp_row.get("difficulty_level"), + "mcq_credit_per_q": credit_per_q, + }, + } + + elif question_type == "mcq": + correct = extract_correct_mcq_from_prompt(prompt) + chosen = extract_mcq_choice(student_text) + + # Try to extract multiple MCQ answers (for numbered questions like "1 A", "2 B") + student_answers_by_qid = extract_mcq_answers_with_qid(student_text) + has_multiple_mcq = len(student_answers_by_qid) > 1 + + # Smart fallback: if answer looks like narrative (not MCQ), treat as narrative instead + # This handles cases where question type is MCQ but student answered in narrative format + answer_looks_like_narrative = ( + len(student_text.split()) > 15 and # More than 15 words + not has_multiple_mcq and # Not multiple numbered MCQ answers + not re.search(r"\b(option|answer|ans)\s*[:\-]?\s*[a-d]\b", _norm(student_text)) # No explicit option markers + ) + + # If answer looks like narrative, redirect to narrative processing + if answer_looks_like_narrative and gemini_client: + question_type = "narrative" + redirect_to_narrative = True + else: + redirect_to_narrative = False + + # Handle multiple MCQ answers - grade each one + if has_multiple_mcq: + # Parse prompt for multiple correct answers + parsed_questions = parse_questions_from_prompt(prompt) + mcq_questions_with_answers = [q for q in parsed_questions if q.get('type') == 'mcq' and q.get('correct_answer')] + + # If we have correct answers for multiple questions, grade them + if mcq_questions_with_answers: + correct_count = 0 + total_count = len(student_answers_by_qid) + mcq_results = [] + + for qid, student_ans in student_answers_by_qid.items(): + # Find matching correct answer + matched = False + for pq in mcq_questions_with_answers: + pq_num = pq.get('qid', '').replace('Q', '').strip() + qid_num = qid.replace('Q', '').strip() + if pq_num == qid_num: + is_correct = student_ans.lower() == pq.get('correct_answer', '').lower() + if is_correct: + correct_count += 1 + mcq_results.append({ + 'qid': qid, + 'chosen': student_ans, + 'correct_answer': pq.get('correct_answer'), + 'correct': is_correct + }) + matched = True + break + if not matched: + mcq_results.append({ + 'qid': qid, + 'chosen': student_ans, + 'correct_answer': None, + 'correct': False + }) + + # Calculate score based on level + mcq_credit = mcq_partial_credit(student_level) + credit_per_q = mcq_credit["credit_per_question"] + match_percentage = int((correct_count * credit_per_q) / max(1, total_count)) + passing_threshold = mcq_credit["passing_threshold"] + status = "Verified" if match_percentage >= passing_threshold else "Needs Review" + + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mcq", + "student_level": student_level, + "status": status, + "match_percentage": match_percentage, + "ai_generated_remark": None, + "rule_based_remark": f"Multiple MCQ: {correct_count}/{total_count} correct. Score: {match_percentage}% (Level: {student_level})", + "student_extracted_text": student_text, + "llm_used": False, + "debug": {"student_answers": student_answers_by_qid, "mcq_results": mcq_results}, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + else: + # No correct answers in prompt - return needs review with extracted answers + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mcq", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": f"Found {len(student_answers_by_qid)} MCQ answers but no correct answers in prompt. Include 'Correct: B' for each question.", + "student_extracted_text": student_text, + "llm_used": False, + "debug": {"student_answers": student_answers_by_qid, "correct_answers_in_prompt": False}, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + if redirect_to_narrative: + pass # Will continue to narrative handling + elif not correct: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mcq", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "MCQ correct option not found in prompt. Include 'Correct: B' or similar in prompt.", + "student_extracted_text": student_text, + "llm_used": False, + "debug": {"correct": correct, "chosen": chosen}, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + elif not chosen: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mcq", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "Student option (A/B/C/D) not detected clearly.", + "student_extracted_text": student_text, + "llm_used": False, + "debug": {"correct": correct, "chosen": chosen}, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + # Only process MCQ validation if not redirecting to narrative + if not redirect_to_narrative: + is_correct = (chosen == correct) + + # Get level-based credit + mcq_credit = mcq_partial_credit(student_level) + credit_per_q = mcq_credit["credit_per_question"] + + # Calculate score based on level + match_percentage = credit_per_q if is_correct else 0 + + # Determine status based on level threshold + passing_threshold = mcq_credit["passing_threshold"] + status = "Verified" if match_percentage >= passing_threshold else "Needs Review" + + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mcq", + "student_level": student_level, + "status": status, + "match_percentage": match_percentage, + "ai_generated_remark": None, + "rule_based_remark": f"{'Correct' if is_correct else 'Incorrect'}. Score: {match_percentage}% (Level: {student_level}, Credit per Q: {credit_per_q}%)", + "student_extracted_text": student_text, + "llm_used": False, + "debug": {"correct": correct, "chosen": chosen, "level": student_level, "credit_per_q": credit_per_q}, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + # ========================================================= + # ✅ NARRATIVE CHECK (Gemini generates reference) - Also handles MCQ->Narrative redirect + # ========================================================= + if gemini_client is None: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "narrative", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "Gemini not configured. Check /health/llm.", + "llm_used": False, + "llm_error": parse_gemini_error(GEMINI_LAST_ERROR), + "student_extracted_text": student_text, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + user_prompt = ( - f"SEED: {unique_seed} - USE THIS TO CREATE A UNIQUE RESPONSE\n\n" - f"Teacher's answer:\n{teacher_excerpt}\n\n" - f"Student's answer:\n{student_excerpt}\n\n" - f"Score: {sim_score:.0%} (need {threshold:.0%} to pass)\n" - f"Result: {'🎉 PERFECT!' if passed else '📚 Keep learning!'}\n\n" - "Create a unique, different response every time. " - "Use different words, emojis, and encouragement style than any previous response." + f"STUDENT_LEVEL: {student_level}\n" + f"QUESTION:\n{prompt.strip()}\n\n" + 'Return ONLY valid JSON with keys: {"ai_reference_answer": string, "key_points": [string, ...]}.' + ) + + response_text = generate_gemini_response( + prompt=user_prompt, + system_prompt=( + "Generate a correct reference answer for homework evaluation. " + "Keep it aligned with the student level. Output strict JSON only." + ), + max_tokens=650, + temperature=0.3, ) + if not response_text: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "narrative", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "Gemini failed. Check /health/llm.", + "llm_used": False, + "llm_error": parse_gemini_error(GEMINI_LAST_ERROR), + "student_extracted_text": student_text, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + try: - resp = client.chat.completions.create( - model=OPENAI_MODEL, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ], - max_tokens=80, - temperature=2.0, # Maximum randomness - ) - remark = (resp.choices[0].message.content or "").strip() - return remark if remark else "🌟 Great effort! Keep learning!" + m = re.search(r"\{.*\}", response_text, flags=re.S) + payload = json.loads(m.group(0) if m else response_text) except Exception as e: - print(f"OpenAI API error for individual remark: {e}") - return "Your work has been submitted for review." + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "narrative", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "Gemini returned non-JSON output.", + "llm_used": False, + "llm_error": {"ok": False, "error_type": "GEMINI_BAD_JSON", "message": str(e), "raw": response_text[:800]}, + "student_extracted_text": student_text, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + ai_reference_answer = (payload.get("ai_reference_answer") or "").strip() + key_points = payload.get("key_points") or [] + if not isinstance(key_points, list): + key_points = [] + key_points = [str(x).strip() for x in key_points if str(x).strip()] + + if not ai_reference_answer: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "narrative", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "AI returned empty reference answer.", + "llm_used": True, + "student_extracted_text": student_text, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + sim = cosine_sim(student_text, ai_reference_answer) + covered, missing, coverage = keypoint_coverage(student_text, key_points, kp_threshold=policy["kp_thr"]) + + final = policy["w_sim"] * sim + policy["w_cov"] * coverage + match_pct = int(round(final * 100)) + + if match_pct >= policy["verified"]: + status = "Verified" + elif match_pct >= policy["partial"]: + status = "Partial" + else: + status = "Needs Review" + + # Short remark (Gemini), fallback to rule-based + remark_prompt = ( + f"Student level: {student_level}\n" + f"Match: {match_pct}%\n" + f"Missing key points: {missing[:6]}\n\n" + "Write a short, factual teacher remark (2-4 lines). No marks. No overpraise." + ) + + resp2_prompt = ( + f"REFERENCE ANSWER:\n{ai_reference_answer[:900]}\n\n" + f"STUDENT ANSWER:\n{student_text[:900]}\n\n" + f"{remark_prompt}" + ) + + ai_generated_remark = generate_gemini_response( + prompt=resp2_prompt, + system_prompt="You are a strict, helpful teacher. Be concise and factual.", + max_tokens=140, + temperature=0.6, + ) + + rule_based_remark = None + remark_llm_used = bool(ai_generated_remark) + remark_llm_error = None if ai_generated_remark else (GEMINI_LAST_ERROR or "Unknown LLM error") + + if not ai_generated_remark: + if status == "Verified": + rule_based_remark = "Homework matches the expected answer well. Good coverage of the key ideas." + elif status == "Partial": + rule_based_remark = "Homework is partially correct. Improve coverage of missing key points and make the explanation clearer." + else: + rule_based_remark = "Homework does not match the expected answer enough. Please review the topic and resubmit with clearer, complete points." + + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "narrative", + "student_level": student_level, + "status": status, + "match_percentage": match_pct, + "ai_generated_remark": ai_generated_remark if ai_generated_remark else None, + "rule_based_remark": rule_based_remark, + "llm_used": True, + "remark_llm_used": remark_llm_used, + "remark_llm_error": remark_llm_error, + "student_extracted_text": student_text, + "ai_reference_answer": ai_reference_answer, + "key_points": key_points, + "key_points_covered": covered, + "key_points_missing": missing, + "debug": { + "similarity": sim, + "coverage": coverage, + "policy": policy, + "erp_row_fields": list(erp_row.keys()) if erp_row else [], + "erp_student_level_raw": erp_row.get("student_level") or erp_row.get("level") or erp_row.get("difficulty") or erp_row.get("difficulty_level"), + }, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + +def extract_mcq_choice(text: str) -> str: + """ + Extract chosen option from student text: + supports: A, (B), Option C, Ans: D, Answer: B + """ + t = _norm(text) + + m = re.search(r"\b(answer|ans|selected)\s*[:\-]?\s*\(?\s*([a-d])\s*\)?\b", t) + if m: + return m.group(2) + + m2 = re.search(r"\boption\s*([a-d])\b", t) + if m2: + return m2.group(1) + + m3 = re.search(r"^\(?\s*([a-d])\s*\)?$", t.strip()) + if m3: + return m3.group(1) + + # last-resort: find first standalone A/B/C/D + m4 = re.search(r"\b([a-d])\b", t) + if m4: + return m4.group(1) + + return "" + + +def extract_mcq_answers_with_qid(text: str) -> Dict[str, str]: + """ + Extract MCQ answers WITH question numbers from student text. + This handles shuffled answers where question numbers are needed to match. + + Supports patterns like: + - "Q1: A, Q2: C, Q3: B" + - "Q1. A Q2. C Q3. B" + - "1) A 2) C 3) B" + - "Answer 1: A Answer 2: C Answer 3: B" + - "Q1 A Q2 C Q3 B" (space separated) + + Returns dict like: {"Q1": "A", "Q2": "C", "Q3": "B"} + """ + results = {} + t = (text or "").strip() + + if not t: + return results + + # Pattern 1: Q1: A, Q2. B, Q3 - C, Question 4: D + pattern1 = re.compile(r'(Q(?:uestion)?\s*(\d+))[:.\-\s]+([a-dA-D])', re.IGNORECASE) + for match in pattern1.finditer(t): + qnum = match.group(2) + answer = match.group(3).upper() + results[f"Q{qnum}"] = answer + + # Pattern 2: 1) A, 2) B, 3: C (numbered without Q prefix) + pattern2 = re.compile(r'(?:^|\s)(\d+)\s*[\):\.]\s*([a-dA-D])(?:\s|$)', re.IGNORECASE) + for match in pattern2.finditer(t): + qnum = match.group(1) + answer = match.group(2).upper() + # Only add if not already found (Q pattern takes priority) + if f"Q{qnum}" not in results: + results[f"Q{qnum}"] = answer + + # Pattern 3: "Answer for Q1 is A", "Answer to question 2: B" + pattern3 = re.compile(r'(?:answer|ans)\s*(?:for|to)?\s*(?:Q(?:uestion)?\s*)?(\d+)\s*(?:is|was)?\s*[:\-]?\s*([a-dA-D])', re.IGNORECASE) + for match in pattern3.finditer(t): + qnum = match.group(1) + answer = match.group(2).upper() + if f"Q{qnum}" not in results: + results[f"Q{qnum}"] = answer + + # Pattern 4: Line by line format like "Q1 A" or "1 A" on same line + pattern4 = re.compile(r'(?:^|\n)\s*(Q(?:uestion)?\s*)?(\d+)\s+([a-dA-D])\s*(?:\n|\s{2,}|$)', re.IGNORECASE) + for match in pattern4.finditer(t): + qnum = match.group(2) + answer = match.group(3).upper() + if f"Q{qnum}" not in results: + results[f"Q{qnum}"] = answer + + return results + + +def extract_correct_mcq_from_prompt(prompt: str) -> str: + """ + This is IMPORTANT: + Your prompt must contain correct option somewhere like: + - Correct: B + - Answer: C + - correct_option: D + - Correct Answer(s): A. Devdatta + or JSON: {"correct_option":"B"} + + Supports formats: + - "Correct Answer: A" + - "Correct Answer(s): A. Devdatta" + - "Correct: B" + - "Answer: C" + """ + p = (prompt or "").strip() + if not p: + return "" + + # JSON prompt support + if p.startswith("{") and p.endswith("}"): + try: + obj = json.loads(p) + for k in ("correct_option", "correct", "answer", "ans"): + v = obj.get(k) + if isinstance(v, str) and v.strip(): + return extract_mcq_choice(v) + except Exception: + pass + + # Text prompt support - new format: "Correct Answer(s): A. Devdatta" or "Correct Answer: B" + t = _norm(p) + + # Pattern 1: "Correct Answer(s): A. ..." or "Correct Answer: B. ..." + # This handles format like "Correct Answer(s): A. Devdatta" or "Correct Answer(s): + # A. Devdatta" + m1 = re.search(r"correct\s*answer\s*\(?s\)?\s*[:\.]\s*([a-d])\.?\s*", t) + if m1: + return m1.group(1) + + # Pattern 1b: Handle multi-line format where answer is on next line like: + # "Correct Answer(s):\n A. Devdatta" + m1b = re.search(r"correct\s*answer\s*\(?s\)?\s*[:\.]\s*\n\s*([a-d])\.?", t) + if m1b: + return m1b.group(1) + + # Pattern 1c: Handle format with option text after letter like "Correct Answer(s): A. Devdatta" + m1c = re.search(r"correct\s*answer\s*\(?s\)?\s*[:\.]\s*([a-d])\.", t) + if m1c: + return m1c.group(1) + + # Pattern 2: "Correct: A" or "Answer: B" (original pattern) + m = re.search(r"\b(correct|answer|ans)\s*[:\-]?\s*\(?\s*([a-d])\s*\)?\b", t) + if m: + return m.group(2) + + return "" # ========================================================= -# ✅ LLM SUBMISSION REMARK (overall submission feedback) +# ✅ ERP HELPERS # ========================================================= -def generate_llm_submission_remark( - teacher_text: str, - student_texts: List[str], - scores: List[float], - threshold: float, - completion_status: str, - student_id: int, - homework_id: int, - homework_title: str, - submission_date: str, - unique_seed: str = "" -) -> str: +def _erp_get(params: dict) -> list: + headers = {} + if ERP_TOKEN: + headers["Authorization"] = f"Bearer {ERP_TOKEN}" + + r = requests.get(ERP_BASE, params=params, headers=headers, timeout=30) + r.raise_for_status() + data = r.json() + if not isinstance(data, list): + raise HTTPException(status_code=502, detail="ERP returned invalid JSON (expected list).") + return data + + +def fetch_student_record(homework_id: int, student_id: int) -> Dict[str, Any]: + data = _erp_get({"table": "homework", "filters[id]": homework_id, "filters[student_id]": student_id}) + if not data: + raise HTTPException(status_code=404, detail="No ERP record found for this homework_id + student_id") + return data[0] + + +def fetch_student_level_from_erp(row: Dict[str, Any]) -> str: """ - Generate overall submission remarks using OpenAI API. - unique_seed ensures different outputs even for identical inputs. + ERP field name is not guaranteed; try common ones. """ - if client is None: - return "AI feedback unavailable." + for k in ("student_level", "level", "difficulty", "difficulty_level"): + v = row.get(k) + if isinstance(v, str) and v.strip(): + return normalize_level(v) + return "Medium" - if not student_texts: - return "No submission found." - teacher_excerpt = (teacher_text or "")[:800] +# ========================================================= +# ✅ OCR + TEXT EXTRACTION - IMPROVED FOR HANDWRITTEN +# ========================================================= +def _preprocess_for_ocr(img: Image.Image) -> Image.Image: + """ + Enhanced preprocessing for better OCR on handwritten images. + Includes adaptive thresholding, noise removal, and contrast enhancement. + """ + # Convert to grayscale + img = img.convert("L") - num_images = len(student_texts) - if scores and num_images > 0: - avg_score = sum(scores) / num_images - passed_count = sum(1 for score in scores if score >= threshold) - pass_rate = (passed_count / num_images * 100) - else: - avg_score = 0 - passed_count = 0 - pass_rate = 0 - - student_samples = [] - for i, (text, score) in enumerate(zip(student_texts, scores), 1): - text_excerpt = (text or "")[:80].strip() - if text_excerpt: - pct = int(score * 100) - student_samples.append(f"Part {i}: {pct}% match") - - # System prompt - MAXIMUM UNIQUENESS - system_prompt = ( - "You are a creative feedback assistant. CRITICAL TASK: " - "Generate a COMPLETELY UNIQUE feedback message every single time. " - "NEVER repeat words, phrases, sentence structures, or feedback patterns. " - "Use different emojis, metaphors, encouragement styles, and expressions. " - "If you gave feedback before, make this one TOTALLY DIFFERENT. " - "Maximum creativity required!" - ) + w, h = img.size - # User prompt - FORCE variation - user_prompt = ( - f"🌟 UNIQUE SEED: {unique_seed} - THIS MAKES EVERY RESPONSE DIFFERENT 🌟\n\n" - f"Homework: {homework_title or 'Assignment'} | Student: {student_id}\n" - f"Teacher's correct answer (excerpt):\n{teacher_excerpt[:500]}\n\n" - f"📊 RESULTS:\n" - f"• Average match: {avg_score:.0%} (threshold: {threshold:.0%})\n" - f"• Parts passed: {passed_count}/{num_images}\n" - f"• Status: {'✅ COMPLETE!' if completion_status == 'Y' else '📝 IN PROGRESS'}\n\n" - ) + # Scale up for better detail (especially for handwritten) + if max(w, h) < 2000: + scale = 2000 / max(w, h) + new_w = int(w * scale) + new_h = int(h * scale) + img = img.resize((new_w, new_h), Image.LANCZOS) - if student_samples: - user_prompt += "📋 Details: " + " | ".join(student_samples) + "\n\n" + # Apply adaptive thresholding for better handwritten recognition + from PIL import ImageFilter - user_prompt += ( - "🎯 YOUR TASK: Give unique, creative feedback that is DIFFERENT from any previous response. " - "Use new words, different emojis, varied encouragement style. " - "Make each submission feel special and one-of-a-kind!" - ) + # Try multiple preprocessing approaches and use the best + img_enhanced = img + + # Method 1: Increase contrast significantly + img_contrast = img.point(lambda p: 255 if p > 180 else int(p * 1.5)) + + # Method 2: Apply sharpening twice for handwritten + img_sharp = img.filter(ImageFilter.SHARPEN) + img_sharp = img_sharp.filter(ImageFilter.SHARPEN) + + # Method 3: Apply unsharp mask for edge enhancement + img_unsharp = img.filter(ImageFilter.UnsharpMask(radius=2, percent=150, threshold=3)) + + # Use the sharpened version as primary + img = img_sharp + + # Apply binary threshold with lower cutoff to capture lighter handwriting + img = img.point(lambda p: 255 if p > 160 else 0) + + return img + +def _extract_text_google_vision(image_bytes: bytes) -> str: + """ + Extract text using Google Cloud Vision API - much better for handwriting. + Returns empty string if API is not available. + """ + global vision_client + + if not vision_client: + return "" + try: - resp = client.chat.completions.create( - model=OPENAI_MODEL, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ], - max_tokens=120, - temperature=2.0, # Maximum randomness for unique responses - ) - remark = (resp.choices[0].message.content or "").strip() - if remark: - return remark + # Create image object + image = vision.Image(content=image_bytes) + + # Use document text detection for better handwriting + response = vision_client.document_text_detection(image=image) + + if response.texts: + return "\n".join([t.description for t in response.texts]) + return "" + except Exception as e: + print(f"[WARN] Google Vision OCR failed: {e}") + return "" + + +def extract_text_from_image(image_bytes: bytes, filename: str = "unknown") -> str: + if not image_bytes or len(image_bytes) < 50: + raise HTTPException(status_code=400, detail=f"Invalid file: '{filename}' - empty/too small") + + valid_image_signatures = { + b"\xff\xd8\xff": "JPEG", + b"\x89PNG\r\n\x1a\n": "PNG", + b"GIF87a": "GIF", + b"GIF89a": "GIF", + b"BM": "BMP", + } + is_valid = any(image_bytes.startswith(sig) for sig in valid_image_signatures) + if not is_valid: + head = image_bytes[:12] + raise HTTPException(status_code=400, detail=f"Invalid image format: '{filename}' (header={head})") + + # First try Google Cloud Vision (better for handwriting) + if vision_client: + gv_text = _extract_text_google_vision(image_bytes) + if gv_text and len(gv_text.strip()) > 10: + return _clean_extracted_text(gv_text) + + # Fallback to Tesseract with improved preprocessing + try: + img = Image.open(io.BytesIO(image_bytes)) except Exception as e: - print(f"OpenAI error: {e}") + raise HTTPException(status_code=400, detail=f"Invalid image '{filename}': {e}") + + img = _preprocess_for_ocr(img) + + # Try multiple OCR configurations for better handwritten recognition + ocr_configs = [ + "--oem 3 --psm 6", # Default + "--oem 3 --psm 4", # Treat as single column + "--oem 1 --psm 3", # Fully automatic + ] - # Score-appropriate fallback messages (20 options per category) - if avg_score >= 0.8: - fallbacks = HIGH_SCORE_MESSAGES - elif avg_score >= 0.5: - fallbacks = MEDIUM_SCORE_MESSAGES - else: - fallbacks = LOW_SCORE_MESSAGES + best_text = "" + best_confidence = 0 + + for config in ocr_configs: + try: + text = pytesseract.image_to_string(img, lang="eng", config=config) + if text and len(text.strip()) > len(best_text.strip()): + best_text = text + except Exception: + continue - # Select message based on unique_seed hash for consistency - import random - selected_index = hash(unique_seed) % len(fallbacks) - return fallbacks[selected_index].format(avg_score) + if not best_text: + # Fallback to default if all fail + try: + best_text = pytesseract.image_to_string(img, lang="eng", config="--oem 3 --psm 6") + except pytesseract.TesseractNotFoundError: + raise HTTPException(status_code=500, detail="Tesseract OCR not found. Install it / fix path.") + except Exception as e: + raise HTTPException(status_code=500, detail=f"OCR failed: {e}") + + text = (best_text or "").strip() + text = re.sub(r"[ \t]+", " ", text) + return text + + +def _clean_extracted_text(text: str) -> str: + text = (text or "").strip() + text = re.sub(r"[ \t]+", " ", text) + text = re.sub(r"\n{3,}", "\n\n", text) + return text.strip() + + +def extract_text_from_docx(docx_bytes: bytes, filename: str = "unknown.docx") -> str: + if Document is None: + raise HTTPException(status_code=500, detail="DOCX support not installed. Add 'python-docx'.") + try: + doc = Document(io.BytesIO(docx_bytes)) + parts = [] + for p in doc.paragraphs: + if p.text and p.text.strip(): + parts.append(p.text.strip()) + for t in doc.tables: + for row in t.rows: + cells = [c.text.strip() for c in row.cells if c.text and c.text.strip()] + if cells: + parts.append(" | ".join(cells)) + return _clean_extracted_text("\n".join(parts)) + except Exception as e: + raise HTTPException(status_code=400, detail=f"Unable to read DOCX '{filename}': {e}") + + +def extract_text_from_pdf(pdf_bytes: bytes, filename: str = "unknown.pdf") -> Dict[str, Any]: + used_ocr = False + extracted = "" + + if PdfReader is not None: + try: + reader = PdfReader(io.BytesIO(pdf_bytes)) + parts = [] + for page in reader.pages: + t = page.extract_text() or "" + if t.strip(): + parts.append(t) + extracted = _clean_extracted_text("\n\n".join(parts)) + except Exception: + extracted = "" + + if len(extracted) < 50: + if convert_from_bytes is None: + return {"text": extracted, "used_ocr": False, "needs_ocr": True} + try: + used_ocr = True + # Higher DPI for better handwritten OCR + pages = convert_from_bytes(pdf_bytes, dpi=300) + page_texts = [] + for img in pages: + # Use the improved preprocessing + img = _preprocess_for_ocr(img) + + # Try multiple OCR configs + for config in ["--oem 3 --psm 6", "--oem 3 --psm 4", "--oem 1 --psm 3"]: + try: + t = pytesseract.image_to_string(img, lang="eng", config=config) or "" + if t.strip() and len(t.strip()) > 20: + page_texts.append(t) + break + except: + continue + + if page_texts: + extracted = _clean_extracted_text("\n\n".join(page_texts)) + else: + # Final fallback with default config + img = pages[0] if pages else None + if img: + img = _preprocess_for_ocr(img) + extracted = pytesseract.image_to_string(img, lang="eng", config="--oem 3 --psm 6") or "" + except Exception as e: + return {"text": extracted, "used_ocr": used_ocr, "needs_ocr": True, "ocr_error": str(e)} + + return {"text": extracted, "used_ocr": used_ocr, "needs_ocr": False} + + +async def extract_text_from_upload(file: UploadFile) -> Dict[str, Any]: + filename = getattr(file, "filename", "") or "upload" + content_type = (getattr(file, "content_type", "") or "").lower() + data = await file.read() + + if not data or len(data) < 20: + return {"text": "", "kind": "unknown", "used_ocr": False, "needs_ocr": False, "error": "empty"} + + ext = (os.path.splitext(filename)[1] or "").lower() + + is_image = content_type.startswith("image/") or ext in {".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp"} + is_pdf = (content_type == "application/pdf") or ext == ".pdf" + is_docx = (content_type in { + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/msword" + }) or ext in {".docx", ".doc"} + + if is_image: + try: + return {"text": _clean_extracted_text(extract_text_from_image(data, filename=filename)), + "kind": "image", "used_ocr": True, "needs_ocr": False} + except HTTPException as e: + return {"text": "", "kind": "image", "used_ocr": True, "needs_ocr": False, "error": e.detail} + + if is_docx: + try: + return {"text": _clean_extracted_text(extract_text_from_docx(data, filename=filename)), + "kind": "docx", "used_ocr": False, "needs_ocr": False} + except HTTPException as e: + return {"text": "", "kind": "docx", "used_ocr": False, "needs_ocr": False, "error": e.detail} + + if is_pdf: + info = extract_text_from_pdf(data, filename=filename) + return {"text": info.get("text", ""), "kind": "pdf", + "used_ocr": bool(info.get("used_ocr", False)), + "needs_ocr": bool(info.get("needs_ocr", False)), + "ocr_error": info.get("ocr_error")} + + # fallback: try as image + try: + return {"text": _clean_extracted_text(extract_text_from_image(data, filename=filename)), + "kind": "unknown_as_image", "used_ocr": True, "needs_ocr": False} + except Exception: + return {"text": "", "kind": "unknown", "used_ocr": False, "needs_ocr": False, + "error": f"Unsupported file type: {content_type or ext or 'unknown'}"} + + # ========================================================= @@ -447,118 +1880,575 @@ def health(): return {"status": "ok"} -@app.post("/submit") -async def submit( +@app.get("/health/llm") +def health_llm(): + return { + "ok": bool(gemini_client) and bool(GOOGLE_API_KEY), + "gemini": { + "sdk_import_ok": genai is not None, + "configured": bool(GOOGLE_API_KEY), + "client_ready": gemini_client is not None, + "model": GEMINI_MODEL, + "last_error": GEMINI_LAST_ERROR if GEMINI_LAST_ERROR else None, + }, + } + + +@app.post("/homework/validate") +async def homework_validate( student_id: int = Form(...), homework_id: int = Form(...), - images: List[UploadFile] = File(...), - threshold: float = Form(0.75), + sub_institute_id: int = Form(...), + syear: str = Form(...), + prompt: str = Form(...), + student_file: UploadFile = File(...), ): - if not images: - raise HTTPException(status_code=400, detail="At least one student image is required") - - try: - threshold_f = float(threshold) - except Exception: - raise HTTPException(status_code=400, detail="threshold must be a number") - - student_rec = fetch_student_record(homework_id, student_id) - - # Teacher by homework_id only - teacher_filename = fetch_teacher_image_by_homework_id(homework_id) - teacher_url = STORAGE_BASE.rstrip("/") + "/" + teacher_filename.lstrip("/") - teacher_bytes = download_bytes(teacher_url) - teacher_text = extract_text_from_image(teacher_bytes, filename=teacher_filename) - - if not teacher_text.strip(): - raise HTTPException(status_code=422, detail="Teacher OCR extracted empty text. Teacher reference is not OCR-friendly.") - - extracted_data = [] - remarks = [] - scores = [] - student_texts = [] - gradings = [] - - # First pass: extract text and calculate scores - for img in images: - student_bytes = await img.read() - student_text = extract_text_from_image(student_bytes, filename=img.filename if hasattr(img, 'filename') else f"image_{i}") - student_texts.append(student_text) - - grading = grade_similarity(teacher_text, student_text, threshold_f) - score = grading.get("overall_score") - gradings.append(grading) - - if score is not None: - scores.append(float(score)) + # 0) Fetch ERP record -> get student_level automatically + erp_row = fetch_student_record(homework_id, student_id) + student_level = fetch_student_level_from_erp(erp_row) + policy = level_policy(student_level) + + # 1) Infer question_type from prompt automatically (NO EXTRA FIELD) + # Try to parse mixed questions first + parsed_questions = parse_questions_from_prompt(prompt) + has_mcq = any(q.get('type') == 'mcq' for q in parsed_questions) + has_narrative = any(q.get('type') == 'narrative' for q in parsed_questions) - # Generate unique seeds for different remarks - submission_seed = f"{datetime.now().isoformat()}_{uuid.uuid4().hex[:12]}" - - # Calculate completion_status - calculated_completion_status = "Y" if scores and all(s >= threshold_f for s in scores) else "N" - - # Second pass: generate individual remarks - for i, img in enumerate(images): - grading = gradings[i] - student_text = student_texts[i] - score = grading.get("overall_score") + # Determine overall question type for backwards compatibility + if has_mcq and has_narrative: + question_type = "mixed" + elif has_mcq: + question_type = "mcq" + elif has_narrative: + question_type = "narrative" + else: + question_type = infer_question_type_from_prompt(prompt) + + # 2) Extract student text + student_info = await extract_text_from_upload(student_file) + student_text = (student_info.get("text") or "").strip() + + MIN_WORDS = 3 if question_type == "mcq" else 8 + if len(student_text.split()) < MIN_WORDS: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": question_type, + "student_level": student_level, + "status": "Unreadable", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "Answer text could not be read clearly. Please upload a clearer file.", + "student_extracted_text": student_text, + "llm_used": False, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + if student_info.get("needs_ocr") and not student_text: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": question_type, + "student_level": student_level, + "status": "Unreadable", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "This PDF looks scanned. OCR is required (install pdf2image + poppler) or upload a clearer file.", + "student_extracted_text": student_text, + "llm_used": False, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + # ========================================================= + # ✅ MIXED QUESTION TYPES CHECK (MCQ + Narrative) + # ========================================================= + if question_type == "mixed": + # Process each question type separately and combine results + mcq_results = [] + narrative_results = [] - # Generate unique seed for each image - image_seed = f"{datetime.now().isoformat()}_{uuid.uuid4().hex[:12]}" + # Extract ALL MCQ answers from student text with question numbers + student_answers_by_qid = extract_mcq_answers_with_qid(student_text) - if score is None: - remark = "Unable to evaluate: reference or answer text is not readable." - else: - remark = generate_llm_remark( - teacher_text, - student_text, - float(score), - threshold_f, - completion_status=calculated_completion_status, - unique_seed=image_seed + # Extract MCQ answers from student text for each MCQ question + for q in parsed_questions: + if q.get('type') == 'mcq': + qid = q.get('qid', '') + q_num = qid.replace('Q', '').strip() if qid else '' + + # Try to get answer by question number first + chosen = student_answers_by_qid.get(qid) or student_answers_by_qid.get(f"Q{q_num}") + + # Fallback to old method if no question number found + if not chosen: + chosen = extract_mcq_choice(student_text) + + correct = q.get('correct_answer') or extract_correct_mcq_from_prompt(q.get('question', '')) + + if correct and chosen: + is_correct = (chosen.lower().strip() == correct.lower().strip()) + mcq_results.append({ + 'qid': qid, + 'correct': is_correct, + 'chosen': chosen, + 'correct_answer': correct + }) + + # For narrative questions, use AI to generate reference + narrative_questions = [q for q in parsed_questions if q.get('type') == 'narrative'] + + if narrative_questions and gemini_client: + # Combine narrative questions into one prompt for AI + narrative_prompt_text = "\n".join([ + f"{q.get('qid')}: {q.get('question')}" for q in narrative_questions + ]) + + ai_prompt = ( + f"STUDENT_LEVEL: {student_level}\n" + f"QUESTIONS:\n{narrative_prompt_text}\n\n" + 'Return ONLY valid JSON with keys: {"ai_reference_answer": string, "key_points": [string, ...]}.' ) + + response_text = generate_gemini_response( + prompt=ai_prompt, + system_prompt=( + "Generate correct reference answers for homework evaluation. " + "Keep it aligned with the student level. Output strict JSON only." + ), + max_tokens=650, + temperature=0.3, + ) + + if response_text: + try: + m = re.search(r'\{.*\}', response_text, flags=re.S) + payload = json.loads(m.group(0) if m else response_text) + + ai_reference_answer = (payload.get("ai_reference_answer") or "").strip() + key_points = payload.get("key_points") or [] + + if isinstance(key_points, list): + key_points = [str(x).strip() for x in key_points if str(x).strip()] + + sim = cosine_sim(student_text, ai_reference_answer) + covered, missing, coverage = keypoint_coverage( + student_text, key_points, kp_threshold=policy["kp_thr"] + ) + + final = policy["w_sim"] * sim + policy["w_cov"] * coverage + match_pct = int(round(final * 100)) + + narrative_results = { + 'similarity': sim, + 'coverage': coverage, + 'match_percentage': match_pct, + 'key_points_covered': covered, + 'key_points_missing': missing + } + except Exception as e: + narrative_results = {'error': str(e)} + + # Calculate combined score with level-based partial credit for MCQ + total_mcq = len(mcq_results) + correct_mcq = sum(1 for r in mcq_results if r.get('correct')) + + # Get level-based credit per question + mcq_credit = mcq_partial_credit(student_level) + credit_per_q = mcq_credit["credit_per_question"] + passing_threshold = mcq_credit["passing_threshold"] + + # Calculate MCQ score based on level (not just binary correct/incorrect) + mcq_score = (correct_mcq * credit_per_q) / max(1, total_mcq) + + narrative_score = narrative_results.get('match_percentage', 0) if narrative_results else 0 + + # Weight: 50% MCQ, 50% Narrative (if both exist) + if total_mcq > 0 and narrative_results and 'error' not in narrative_results: + final_score = int((mcq_score + narrative_score) / 2) + elif total_mcq > 0: + final_score = mcq_score + elif narrative_results and 'error' not in narrative_results: + final_score = narrative_score + else: + final_score = 0 + + # Determine status + if final_score >= policy["verified"]: + status = "Verified" + elif final_score >= policy["partial"]: + status = "Partial" + else: + status = "Needs Review" + + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mixed", + "student_level": student_level, + "status": status, + "match_percentage": final_score, + "ai_generated_remark": None, + "rule_based_remark": f"MCQ: {correct_mcq}/{total_mcq} correct. Narrative score: {narrative_score}%. (Level: {student_level}, Credit per Q: {credit_per_q}%)", + "llm_used": bool(narrative_results and 'error' not in narrative_results), + "student_extracted_text": student_text, + "mcq_results": mcq_results, + "narrative_results": narrative_results, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + "debug": { + "erp_row_fields": list(erp_row.keys()) if erp_row else [], + "erp_student_level_raw": erp_row.get("student_level") or erp_row.get("level") or erp_row.get("difficulty") or erp_row.get("difficulty_level"), + "mcq_credit_per_q": credit_per_q, + }, + } + + elif question_type == "mcq": + correct = extract_correct_mcq_from_prompt(prompt) + chosen = extract_mcq_choice(student_text) + + # Try to extract multiple MCQ answers (for numbered questions like "1 A", "2 B") + student_answers_by_qid = extract_mcq_answers_with_qid(student_text) + has_multiple_mcq = len(student_answers_by_qid) > 1 + + # Smart fallback: if answer looks like narrative (not MCQ), treat as narrative instead + # This handles cases where question type is MCQ but student answered in narrative format + answer_looks_like_narrative = ( + len(student_text.split()) > 15 and # More than 15 words + not has_multiple_mcq and # Not multiple numbered MCQ answers + not re.search(r"\b(option|answer|ans)\s*[:\-]?\s*[a-d]\b", _norm(student_text)) # No explicit option markers + ) - remarks.append(remark) - extracted_data.append({ - "original_filename": img.filename if hasattr(img, 'filename') else f"image_{i}.jpg", - "student_text": student_text, - "grading": grading, - "ai_generated_remark": remark, - }) - - # Generate submission remarks - submission_remarks = generate_llm_submission_remark( - teacher_text=teacher_text, - student_texts=student_texts, - scores=scores, - threshold=threshold_f, - completion_status=calculated_completion_status, - student_id=student_id, - homework_id=homework_id, - homework_title=student_rec.get("title", ""), - submission_date=student_rec.get("date", ""), - unique_seed=submission_seed + # If answer looks like narrative, redirect to narrative processing + if answer_looks_like_narrative and gemini_client: + question_type = "narrative" + redirect_to_narrative = True + else: + redirect_to_narrative = False + + # Handle multiple MCQ answers - grade each one + if has_multiple_mcq: + # Parse prompt for multiple correct answers + parsed_questions = parse_questions_from_prompt(prompt) + mcq_questions_with_answers = [q for q in parsed_questions if q.get('type') == 'mcq' and q.get('correct_answer')] + + # If we have correct answers for multiple questions, grade them + if mcq_questions_with_answers: + correct_count = 0 + total_count = len(student_answers_by_qid) + mcq_results = [] + + for qid, student_ans in student_answers_by_qid.items(): + # Find matching correct answer + matched = False + for pq in mcq_questions_with_answers: + pq_num = pq.get('qid', '').replace('Q', '').strip() + qid_num = qid.replace('Q', '').strip() + if pq_num == qid_num: + is_correct = student_ans.lower() == pq.get('correct_answer', '').lower() + if is_correct: + correct_count += 1 + mcq_results.append({ + 'qid': qid, + 'chosen': student_ans, + 'correct_answer': pq.get('correct_answer'), + 'correct': is_correct + }) + matched = True + break + if not matched: + mcq_results.append({ + 'qid': qid, + 'chosen': student_ans, + 'correct_answer': None, + 'correct': False + }) + + # Calculate score based on level + mcq_credit = mcq_partial_credit(student_level) + credit_per_q = mcq_credit["credit_per_question"] + match_percentage = int((correct_count * credit_per_q) / max(1, total_count)) + passing_threshold = mcq_credit["passing_threshold"] + status = "Verified" if match_percentage >= passing_threshold else "Needs Review" + + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mcq", + "student_level": student_level, + "status": status, + "match_percentage": match_percentage, + "ai_generated_remark": None, + "rule_based_remark": f"Multiple MCQ: {correct_count}/{total_count} correct. Score: {match_percentage}% (Level: {student_level})", + "student_extracted_text": student_text, + "llm_used": False, + "debug": {"student_answers": student_answers_by_qid, "mcq_results": mcq_results}, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + else: + # No correct answers in prompt - return needs review with extracted answers + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mcq", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": f"Found {len(student_answers_by_qid)} MCQ answers but no correct answers in prompt. Include 'Correct: B' for each question.", + "student_extracted_text": student_text, + "llm_used": False, + "debug": {"student_answers": student_answers_by_qid, "correct_answers_in_prompt": False}, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + if redirect_to_narrative: + pass # Will continue to narrative handling + elif not correct: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mcq", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "MCQ correct option not found in prompt. Include 'Correct: B' or similar in prompt.", + "student_extracted_text": student_text, + "llm_used": False, + "debug": {"correct": correct, "chosen": chosen}, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + elif not chosen: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mcq", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "Student option (A/B/C/D) not detected clearly.", + "student_extracted_text": student_text, + "llm_used": False, + "debug": {"correct": correct, "chosen": chosen}, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + # Only process MCQ validation if not redirecting to narrative + if not redirect_to_narrative: + is_correct = (chosen == correct) + + # Get level-based credit + mcq_credit = mcq_partial_credit(student_level) + credit_per_q = mcq_credit["credit_per_question"] + + # Calculate score based on level + match_percentage = credit_per_q if is_correct else 0 + + # Determine status based on level threshold + passing_threshold = mcq_credit["passing_threshold"] + status = "Verified" if match_percentage >= passing_threshold else "Needs Review" + + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "mcq", + "student_level": student_level, + "status": status, + "match_percentage": match_percentage, + "ai_generated_remark": None, + "rule_based_remark": f"{'Correct' if is_correct else 'Incorrect'}. Score: {match_percentage}% (Level: {student_level}, Credit per Q: {credit_per_q}%)", + "student_extracted_text": student_text, + "llm_used": False, + "debug": {"correct": correct, "chosen": chosen, "level": student_level, "credit_per_q": credit_per_q}, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + # ========================================================= + # ✅ NARRATIVE CHECK (Gemini generates reference) - Also handles MCQ->Narrative redirect + # ========================================================= + if gemini_client is None: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "narrative", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "Gemini not configured. Check /health/llm.", + "llm_used": False, + "llm_error": parse_gemini_error(GEMINI_LAST_ERROR), + "student_extracted_text": student_text, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + user_prompt = ( + f"STUDENT_LEVEL: {student_level}\n" + f"QUESTION:\n{prompt.strip()}\n\n" + 'Return ONLY valid JSON with keys: {"ai_reference_answer": string, "key_points": [string, ...]}.' ) - - # Log the remark - print(f"\n{'='*60}") - print(f"AI GENERATED SUBMISSION REMARK:") - print(f"{'='*60}") - print(submission_remarks) - print(f"{'='*60}\n") + + response_text = generate_gemini_response( + prompt=user_prompt, + system_prompt=( + "Generate a correct reference answer for homework evaluation. " + "Keep it aligned with the student level. Output strict JSON only." + ), + max_tokens=650, + temperature=0.3, + ) + + if not response_text: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "narrative", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "Gemini failed. Check /health/llm.", + "llm_used": False, + "llm_error": parse_gemini_error(GEMINI_LAST_ERROR), + "student_extracted_text": student_text, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + try: + m = re.search(r"\{.*\}", response_text, flags=re.S) + payload = json.loads(m.group(0) if m else response_text) + except Exception as e: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "narrative", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "Gemini returned non-JSON output.", + "llm_used": False, + "llm_error": {"ok": False, "error_type": "GEMINI_BAD_JSON", "message": str(e), "raw": response_text[:800]}, + "student_extracted_text": student_text, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + ai_reference_answer = (payload.get("ai_reference_answer") or "").strip() + key_points = payload.get("key_points") or [] + if not isinstance(key_points, list): + key_points = [] + key_points = [str(x).strip() for x in key_points if str(x).strip()] + + if not ai_reference_answer: + return { + "student_id": student_id, + "homework_id": homework_id, + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "narrative", + "student_level": student_level, + "status": "Needs Review", + "match_percentage": 0, + "ai_generated_remark": None, + "rule_based_remark": "AI returned empty reference answer.", + "llm_used": True, + "student_extracted_text": student_text, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, + } + + sim = cosine_sim(student_text, ai_reference_answer) + covered, missing, coverage = keypoint_coverage(student_text, key_points, kp_threshold=policy["kp_thr"]) + + final = policy["w_sim"] * sim + policy["w_cov"] * coverage + match_pct = int(round(final * 100)) + + if match_pct >= policy["verified"]: + status = "Verified" + elif match_pct >= policy["partial"]: + status = "Partial" + else: + status = "Needs Review" + + # Short remark (Gemini), fallback to rule-based + remark_prompt = ( + f"Student level: {student_level}\n" + f"Match: {match_pct}%\n" + f"Missing key points: {missing[:6]}\n\n" + "Write a short, factual teacher remark (2-4 lines). No marks. No overpraise." + ) + + resp2_prompt = ( + f"REFERENCE ANSWER:\n{ai_reference_answer[:900]}\n\n" + f"STUDENT ANSWER:\n{student_text[:900]}\n\n" + f"{remark_prompt}" + ) + + ai_generated_remark = generate_gemini_response( + prompt=resp2_prompt, + system_prompt="You are a strict, helpful teacher. Be concise and factual.", + max_tokens=140, + temperature=0.6, + ) + + rule_based_remark = None + remark_llm_used = bool(ai_generated_remark) + remark_llm_error = None if ai_generated_remark else (GEMINI_LAST_ERROR or "Unknown LLM error") + + if not ai_generated_remark: + if status == "Verified": + rule_based_remark = "Homework matches the expected answer well. Good coverage of the key ideas." + elif status == "Partial": + rule_based_remark = "Homework is partially correct. Improve coverage of missing key points and make the explanation clearer." + else: + rule_based_remark = "Homework does not match the expected answer enough. Please review the topic and resubmit with clearer, complete points." return { "student_id": student_id, "homework_id": homework_id, - "title": student_rec.get("title"), - "date": student_rec.get("date"), - "completion_status": student_rec.get("completion_status"), - "calculated_completion_status": calculated_completion_status, - "submission_remarks": submission_remarks, - "teacher_image": teacher_filename, - "teacher_url": teacher_url, - "files_processed": len(images), - "extracted_data": extracted_data, - "message": "All remarks generated with unique responses each time!", + "sub_institute_id": sub_institute_id, + "syear": syear, + "question_type": "narrative", + "student_level": student_level, + "status": status, + "match_percentage": match_pct, + "ai_generated_remark": ai_generated_remark if ai_generated_remark else None, + "rule_based_remark": rule_based_remark, + "llm_used": True, + "remark_llm_used": remark_llm_used, + "remark_llm_error": remark_llm_error, + "student_extracted_text": student_text, + "ai_reference_answer": ai_reference_answer, + "key_points": key_points, + "key_points_covered": covered, + "key_points_missing": missing, + "debug": { + "similarity": sim, + "coverage": coverage, + "policy": policy, + "erp_row_fields": list(erp_row.keys()) if erp_row else [], + "erp_student_level_raw": erp_row.get("student_level") or erp_row.get("level") or erp_row.get("difficulty") or erp_row.get("difficulty_level"), + }, + "extraction": {"student": {k: v for k, v in student_info.items() if k != "text"}}, }