import sys import platform import easyocr from pdf2image import convert_from_path, convert_from_bytes from flask import Flask, request, jsonify from flask_cors import CORS from dataclasses import dataclass from typing import List, Tuple, Optional, Dict, Any from collections import defaultdict import numpy as np import cv2 import pytesseract from PIL import Image import os import tempfile import difflib import re from fuzzywuzzy import fuzz from dotenv import load_dotenv import google.generativeai as genai import asyncio import base64 import io import json import pandas as pd import subprocess # Import the SupabaseHandler import uuid from datetime import datetime from supabase import create_client, Client _tesseract_cmd = os.getenv("TESSERACT_CMD") if _tesseract_cmd: pytesseract.pytesseract.tesseract_cmd = _tesseract_cmd elif platform.system() == "Windows": pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" def _get_poppler_path(): env_path = os.getenv("POPPLER_PATH") if env_path: return env_path if platform.system() == "Windows": # Check common install locations candidates = [ r'C:\Program Files\poppler\Library\bin', r'C:\Program Files\poppler\poppler-24.08.0\Library\bin', ] # Also scan for any versioned poppler directory poppler_base = r'C:\Program Files\poppler' if os.path.isdir(poppler_base): for entry in os.listdir(poppler_base): candidate = os.path.join(poppler_base, entry, 'Library', 'bin') if candidate not in candidates: candidates.append(candidate) for path in candidates: if os.path.isdir(path): return path return None load_dotenv() GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") genai.configure(api_key=GEMINI_API_KEY) app = Flask(__name__) CORS(app) reader = easyocr.Reader(['en']) # Global variables to store processing results ocr_extracted_texts = [] last_processed_question_paper_object = None last_processed_omr_key = None # Global variable to store OMR answer key last_processed_omr_results = None # Global variable to store OMR processing results porcessed_omr_results = [] OMR_IMAGES = [] class SupabaseHandler: def __init__(self): url: str = os.getenv("SUPABASE_URL") key: str = os.getenv("SUPABASE_ANON_KEY") if not url or not key: raise ValueError("Supabase URL and ANON_KEY must be set in environment variables") self.supabase: Client = create_client(url, key) def store_evaluation_result(self, teacher_email, evaluation_data, exam_name=None): """ Store evaluation result in Supabase with a unique key and exam name Returns the unique key for retrieval """ try: # Generate unique key unique_key = str(uuid.uuid4()) # Prepare data for storage storage_data = { "unique_key": unique_key, "teacher_email": teacher_email, "evaluation_data": evaluation_data, "exam_name": exam_name, # Add exam name field "created_at": datetime.utcnow().isoformat(), "total_students": evaluation_data.get("total_students", 0) } # Insert into Supabase result = self.supabase.table("evaluation_results").insert(storage_data).execute() if result.data: print(f"Successfully stored evaluation result with key: {unique_key} for exam: {exam_name}") return unique_key else: print("Failed to store evaluation result") return None except Exception as e: print(f"Error storing evaluation result: {str(e)}") return None def get_evaluation_result(self, unique_key): """ Retrieve evaluation result by unique key """ try: result = self.supabase.table("evaluation_results").select("*").eq("unique_key", unique_key).execute() if result.data and len(result.data) > 0: return result.data[0] else: return None except Exception as e: print(f"Error retrieving evaluation result: {str(e)}") return None def get_teacher_evaluations(self, teacher_email): """ Get all evaluation results for a specific teacher """ try: result = self.supabase.table("evaluation_results").select("unique_key", "created_at", "total_students", "exam_name").eq("teacher_email", teacher_email).order("created_at", desc=True).execute() if result.data: return result.data else: return [] except Exception as e: print(f"Error retrieving teacher evaluations: {str(e)}") return [] class QuestionPaper: def __init__(self, path=None): self.questions = [] self.answers = [] self.path = path def clean_answers(self): # Remove unwanted patterns from answers unwanted_patterns = [ "Time: 15 MinutesMarks: 20", "Time: 15 Minutes Marks: 20", "GENERAL KNOWLEDGE QUESTION PAPER WITH ANSWERS", "GENERAL KNOWLEDGE QUESTION PAPER", ] # Filter out unwanted answers cleaned_answers = [] for answer in self.answers: if answer.strip() and answer.strip() not in unwanted_patterns: # Also check if it doesn't match any unwanted pattern with regex is_unwanted = False for pattern in unwanted_patterns: if pattern and re.search(re.escape(pattern), answer, re.IGNORECASE): is_unwanted = True break if not is_unwanted: cleaned_answers.append(answer.strip()) self.answers = cleaned_answers def add_question(self, question_text): self.questions.append(question_text) def add_answer(self, answer_text): self.answers.append(answer_text) def to_dict(self): return { 'questions': self.questions, 'answers': self.answers } class OMRAnswerKey: def __init__(self): self.answers = {} # Dictionary mapping question numbers to correct options self.total_marks = 0 self.marks_per_question = 1 self.negative_marking = 0 self.title = "" self.duration = "" self.total_questions = 0 self.path = None self.questions = [] # List to store questions if needed self.question_data = [] # List to store complete question data with options def __str__(self): return f"OMR Answer Key: {self.title}\nTotal Questions: {self.total_questions}\nAnswers: {self.answers}" def set_answers(self, answers: dict): """Set the answer key with question numbers as keys and correct options (A,B,C,D) as values""" self.answers = {int(k): v.upper() for k, v in answers.items() if v.upper() in ['A', 'B', 'C', 'D']} self.total_questions = len(self.answers) def set_marking_scheme(self, marks_per_question: float, negative_marking: float = 0): """Set the marking scheme for the answer key""" self.marks_per_question = marks_per_question self.negative_marking = negative_marking self.total_marks = self.total_questions * marks_per_question def set_metadata(self, title: str, duration: str): """Set metadata for the answer key""" self.title = title self.duration = duration def set_question_data(self, question_data): """Store complete question data including options""" self.question_data = question_data self.questions = [f"{q['number']}. {q['question']}" for q in question_data] self.answers = {q['number']: q['answer'] for q in question_data if q['answer']} self.total_questions = len(question_data) def get_question_details(self, question_number): """Get complete details for a specific question""" for q in self.question_data: print(f"Checking question number: {q['number']} with {question_number}") if str(q['number']) == str(question_number): return q return None def to_dict(self): return { 'title': self.title, 'duration': self.duration, 'total_questions': self.total_questions, 'answers': self.answers, 'total_marks': self.total_marks, 'marks_per_question': self.marks_per_question, 'negative_marking': self.negative_marking, 'questions': self.questions, 'question_data': self.question_data # Include complete question data } def parse_question_paper_text(text): """ Improved parsing function that correctly identifies questions and answers """ lines = [line.strip() for line in text.split('\n') if line.strip()] questions = [] answers = [] # Patterns to ignore (headers, footers, etc.) ignore_patterns = [ r'GENERAL KNOWLEDGE QUESTION PAPER.*', r'Time:\s*\d+\s*Minutes.*Marks:\s*\d+', r'Time:\s*\d+\s*MinutesMarks:\s*\d+', r'^\s*$' # Empty lines ] # Filter out unwanted lines filtered_lines = [] for line in lines: should_ignore = False for pattern in ignore_patterns: if re.match(pattern, line, re.IGNORECASE): should_ignore = True break if not should_ignore: filtered_lines.append(line) # Pattern to identify questions (starts with number followed by dot/parenthesis) question_pattern = r'^\d+\s*[.)]\s*(.+)' i = 0 while i < len(filtered_lines): current_line = filtered_lines[i].strip() # Check if current line is a question question_match = re.match(question_pattern, current_line) if question_match: # This is a question question_text = question_match.group(1).strip() questions.append(f"{current_line}") # Keep the full question with number # Look for the answer in the next line if i + 1 < len(filtered_lines): next_line = filtered_lines[i + 1].strip() # If next line is not a question (doesn't start with number), it's likely an answer if not re.match(question_pattern, next_line): answers.append(next_line) i += 2 # Skip both question and answer else: # Next line is also a question, so this question might not have an answer # Or the answer might be embedded in the same line # Try to extract answer from the question line itself if it contains common answer patterns answers.append("") # Placeholder for missing answer i += 1 else: # Last line and it's a question without answer answers.append("") i += 1 else: # This line doesn't match question pattern, skip it or try to pair it with previous question if len(questions) > len(answers): # We have more questions than answers, this might be an answer answers.append(current_line) i += 1 # Ensure we have equal number of questions and answers while len(answers) < len(questions): answers.append("") while len(questions) < len(answers): questions.append(f"Question {len(questions) + 1}") return questions, answers def improved_clean_and_parse_ocr_text(ocr_text): """ Improved parsing with better answer extraction logic """ # Remove special characters but keep important ones cleaned_text = re.sub(r'[|@~¥#$%^&*()_+=\[\]{}\\:";\'<>?,./]', ' ', ocr_text) # Split by newlines and filter out empty strings lines = [line.strip() for line in cleaned_text.split('\n') if line.strip()] individual_answers = [] # Try to find numbered patterns first numbered_pattern = re.compile(r'(\d+)\s*[.)]\s*([^0-9]+?)(?=\d+\s*[.)]|$)', re.MULTILINE | re.DOTALL) matches = numbered_pattern.findall(cleaned_text) if matches: # If we found numbered patterns, use them for number, answer in matches: answer = answer.strip() if answer and len(answer) > 1: individual_answers.append(answer) else: # Fallback to line-by-line processing for line in lines: # Remove leading numbers and punctuation cleaned_line = re.sub(r'^\d+\s*[.)]\s*', '', line).strip() if cleaned_line and len(cleaned_line) > 1: individual_answers.append(cleaned_line) return individual_answers def find_best_match(student_answer, correct_answers, threshold=0.6): """ Find the best matching correct answer for a student answer """ best_score = 0 best_match = None for correct_answer in correct_answers: # Use multiple similarity metrics ratio_score = difflib.SequenceMatcher(None, student_answer.lower(), correct_answer.lower()).ratio() fuzzy_score = fuzz.ratio(student_answer.lower(), correct_answer.lower()) / 100.0 partial_score = fuzz.partial_ratio(student_answer.lower(), correct_answer.lower()) / 100.0 # Take the maximum of all scores combined_score = max(ratio_score, fuzzy_score, partial_score) if combined_score > best_score: best_score = combined_score best_match = correct_answer # Only return match if it meets the threshold if best_score >= threshold: return best_match, best_score else: return None, best_score def extract_roll_number(student_answer_path): """ Extract roll number from student answer sheet using OCR """ try: student_answer_image = Image.open(student_answer_path) text = pytesseract.image_to_string(student_answer_image) # Look for common roll number patterns roll_patterns = [ r'(?i)roll\s*no\s*[:\-]?\s*(\w+)', r'(?i)roll\s*number\s*[:\-]?\s*(\w+)', r'(?i)roll\s*[:\-]?\s*(\w+)', r'(?i)reg\s*no\s*[:\-]?\s*(\w+)', r'(?i)registration\s*[:\-]?\s*(\w+)' ] for pattern in roll_patterns: match = re.search(pattern, text) if match: return match.group(1).strip() # If no explicit roll number found, try to find number sequences number_sequences = re.findall(r'\b\d{2,}\b', text) if number_sequences: return number_sequences[0] # Return first significant number sequence return "Unknown" except Exception as e: print(f"Error extracting roll number: {str(e)}") return "Unknown" # OMR Section @dataclass class BubbleLocation: """Stores information about each bubble""" question_num: int option: str center: Tuple[int, int] radius: int filled: bool = False fill_ratio: float = 0.0 class CorrectedOMRReader: def __init__(self, image_path: str = None, image_array: np.ndarray = None): """Initialize the OMR Reader with an image""" if image_array is not None: self.image = image_array self.image_path = None elif image_path is not None: self.image = cv2.imread(image_path) self.image_path = image_path else: raise ValueError("Either image_array or image_path must be provided") if self.image is None: raise ValueError("Could not load image") self.gray = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY) self.height, self.width = self.gray.shape self.bubbles = [] self.answers = {} # Expected grid parameters self.expected_radius = 15 # Approximate bubble radius self.grid_params = { 'rows': 20, # Maximum rows 'cols': 3, # 3 columns of questions 'options': 4 # 4 options per question (A, B, C, D) } def preprocess_for_detection(self): """Preprocess specifically for bubble DETECTION (not fill detection)""" blurred = cv2.GaussianBlur(self.gray, (3, 3), 0) _, thresh = cv2.threshold(blurred, 200, 255, cv2.THRESH_BINARY) self.detection_thresh = cv2.bitwise_not(thresh) return self.detection_thresh def find_bubble_grid(self): """Find bubble locations using grid detection""" bubbles = [] param_sets = [ {'dp': 1.0, 'minDist': 20, 'param1': 50, 'param2': 28, 'minRadius': 10, 'maxRadius': 20}, {'dp': 1.1, 'minDist': 22, 'param1': 45, 'param2': 25, 'minRadius': 11, 'maxRadius': 19}, {'dp': 1.2, 'minDist': 25, 'param1': 40, 'param2': 30, 'minRadius': 9, 'maxRadius': 21}, ] for params in param_sets: circles = cv2.HoughCircles( self.gray, cv2.HOUGH_GRADIENT, dp=params['dp'], minDist=params['minDist'], param1=params['param1'], param2=params['param2'], minRadius=params['minRadius'], maxRadius=params['maxRadius'] ) if circles is not None: circles = np.round(circles[0, :]).astype("int") for (x, y, r) in circles: is_dup = False for bub in bubbles: if np.sqrt((x - bub[0])**2 + (y - bub[1])**2) < 15: is_dup = True break if not is_dup: bubbles.append((x, y, r)) print(f" Found {len(bubbles)} bubbles with Hough Circles") if len(bubbles) < 180: template_bubbles = self.template_matching_detection() bubbles.extend(template_bubbles) print(f" Added {len(template_bubbles)} bubbles with template matching") return bubbles def template_matching_detection(self): """Use template matching to find bubble locations""" bubbles = [] template_size = 30 template = np.zeros((template_size, template_size), dtype=np.uint8) cv2.circle(template, (template_size//2, template_size//2), 12, 255, 2) result = cv2.matchTemplate(self.gray, template, cv2.TM_CCOEFF_NORMED) threshold = 0.5 locations = np.where(result >= threshold) for pt in zip(*locations[::-1]): center_x = pt[0] + template_size // 2 center_y = pt[1] + template_size // 2 too_close = False for (bx, by, _) in bubbles: if np.sqrt((center_x - bx)**2 + (center_y - by)**2) < 20: too_close = True break if not too_close: bubbles.append((center_x, center_y, 12)) return bubbles def detect_bubbles_by_contours(self): """Detect bubbles using contours - focusing on circular shapes""" bubbles = [] edge_params = [(30, 100), (50, 150), (20, 80)] for low, high in edge_params: edges = cv2.Canny(self.gray, low, high) contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) for contour in contours: area = cv2.contourArea(contour) if 150 < area < 900: (x, y), radius = cv2.minEnclosingCircle(contour) perimeter = cv2.arcLength(contour, True) if perimeter > 0: circularity = 4 * np.pi * area / (perimeter * perimeter) if circularity > 0.6 and 8 < radius < 22: is_dup = False for bub in bubbles: if np.sqrt((x - bub[0])**2 + (y - bub[1])**2) < 15: is_dup = True break if not is_dup: bubbles.append((int(x), int(y), int(radius))) return bubbles def organize_and_filter_bubbles(self, all_bubbles): if not all_bubbles: return [] filtered_bubbles = [] for bubble in all_bubbles: is_duplicate = False for existing in filtered_bubbles: dist = np.sqrt((bubble[0] - existing[0])**2 + (bubble[1] - existing[1])**2) if dist < 15: is_duplicate = True break if not is_duplicate: filtered_bubbles.append(bubble) filtered_bubbles.sort(key=lambda b: (b[1], b[0])) rows = [] current_row = [] row_threshold = 20 for bubble in filtered_bubbles: if not current_row: current_row.append(bubble) else: avg_y = np.mean([b[1] for b in current_row]) if abs(bubble[1] - avg_y) < row_threshold: current_row.append(bubble) else: if len(current_row) >= 4: current_row.sort(key=lambda b: b[0]) rows.append(current_row) current_row = [bubble] if len(current_row) >= 4: current_row.sort(key=lambda b: b[0]) rows.append(current_row) return rows def map_to_questions(self, bubble_rows): mapped_bubbles = [] options = ['A', 'B', 'C', 'D'] if not bubble_rows: return mapped_bubbles col1_max = self.width * 0.35 col2_max = self.width * 0.68 for row_idx, row in enumerate(bubble_rows[:20]): col1 = [b for b in row if b[0] < col1_max] col2 = [b for b in row if col1_max <= b[0] < col2_max] col3 = [b for b in row if b[0] >= col2_max] if len(col1) >= 4: col1_sorted = sorted(col1, key=lambda b: b[0])[:4] q_num = row_idx + 1 for opt_idx, bubble in enumerate(col1_sorted): mapped_bubbles.append(BubbleLocation(q_num, options[opt_idx], (bubble[0], bubble[1]), bubble[2])) if len(col2) >= 4: col2_sorted = sorted(col2, key=lambda b: b[0])[:4] q_num = row_idx + 21 for opt_idx, bubble in enumerate(col2_sorted): mapped_bubbles.append(BubbleLocation(q_num, options[opt_idx], (bubble[0], bubble[1]), bubble[2])) if row_idx < 10 and len(col3) >= 4: col3_sorted = sorted(col3, key=lambda b: b[0])[:4] q_num = row_idx + 41 for opt_idx, bubble in enumerate(col3_sorted): mapped_bubbles.append(BubbleLocation(q_num, options[opt_idx], (bubble[0], bubble[1]), bubble[2])) return mapped_bubbles def analyze_bubble_fill(self, bubble: BubbleLocation): mask = np.zeros(self.gray.shape, dtype=np.uint8) cv2.circle(mask, bubble.center, max(bubble.radius - 5, 5), 255, -1) mean_val = cv2.mean(self.gray, mask=mask)[0] large_ring_mask = np.zeros(self.gray.shape, dtype=np.uint8) cv2.circle(large_ring_mask, bubble.center, bubble.radius + 10, 255, -1) cv2.circle(large_ring_mask, bubble.center, bubble.radius + 5, 0, -1) surrounding_mean = cv2.mean(self.gray, mask=large_ring_mask)[0] bubble.darkness_score = surrounding_mean - mean_val darkness_threshold = 50 absolute_darkness_threshold = 150 # 150 bubble.filled = (bubble.darkness_score > darkness_threshold) and (mean_val < absolute_darkness_threshold) pixels = self.gray[mask > 0] if len(pixels) > 0: std_dev = np.std(pixels) if std_dev > 25 and mean_val < 170: bubble.filled = True if mean_val < 120: bubble.filled = True return bubble.filled def process(self): """Main processing pipeline""" print("Starting corrected OMR processing...") print("Detecting bubble locations...") all_bubbles = [] circles = self.find_bubble_grid() all_bubbles.extend(circles) contour_bubbles = self.detect_bubbles_by_contours() all_bubbles.extend(contour_bubbles) print(f" Contour bubbles found: {len(contour_bubbles)}") print(f"Total bubbles detected: {len(all_bubbles)}") if len(all_bubbles) < 180: print("Not enough bubbles detected, using grid-based approach...") grid_bubbles = self.detect_by_grid_assumption() all_bubbles.extend(grid_bubbles) print(f"Added {len(grid_bubbles)} bubbles from grid assumption") print("Organizing bubbles into grid...") bubble_rows = self.organize_and_filter_bubbles(all_bubbles) print(f"Organized into {len(bubble_rows)} rows") print("Mapping bubbles to questions...") self.bubbles = self.map_to_questions(bubble_rows) print(f"Mapped {len(self.bubbles)} bubble locations") print("Analyzing filled bubbles...") for bubble in self.bubbles: self.analyze_bubble_fill(bubble) print("Extracting final answers...") self.extract_answers() return self.answers def detect_by_grid_assumption(self): bubbles = [] col_starts = [60, 360, 660] bubble_spacing_x = 45 bubble_spacing_y = 28 start_y = 50 for col_idx, col_x in enumerate(col_starts): num_rows = 20 if col_idx < 2 else 10 for row in range(num_rows): y = start_y + row * bubble_spacing_y for opt in range(4): x = col_x + opt * bubble_spacing_x exists = False for existing in bubbles: if np.sqrt((x - existing[0])**2 + (y - existing[1])**2) < 20: exists = True break if not exists: bubbles.append((x, y, 13)) return bubbles def extract_answers(self): questions = defaultdict(list) for bubble in self.bubbles: questions[bubble.question_num].append(bubble) self.answers = {} for q_num in sorted(questions.keys()): q_bubbles = questions[q_num] filled = [b for b in q_bubbles if b.filled] if not filled: self.answers[q_num] = "---" elif len(filled) == 1: self.answers[q_num] = filled[0].option else: filled.sort(key=lambda b: b.darkness_score, reverse=True) self.answers[q_num] = filled[0].option return self.answers def visualize_results(self): result_img = self.image.copy() for bubble in self.bubbles: if bubble.filled: cv2.circle(result_img, bubble.center, bubble.radius, (0, 255, 0), 2) text = f"Q{bubble.question_num}:{bubble.option}" cv2.putText(result_img, text, (bubble.center[0] - 25, bubble.center[1] - bubble.radius - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0, 0, 255), 1) else: cv2.circle(result_img, bubble.center, bubble.radius, (100, 100, 255), 1) return result_img def display_results(self): print("\n" + "="*60) print("DETECTED ANSWERS") print("="*60) for i in range(1, 21): row_str = "" ans1 = self.answers.get(i, "---") row_str += f"Q{i:2d}: {ans1:^4} | " if i + 20 <= 40: ans2 = self.answers.get(i + 20, "---") row_str += f"Q{i+20:2d}: {ans2:^4} | " else: row_str += " " * 13 + "| " if i + 40 <= 50: ans3 = self.answers.get(i + 40, "---") row_str += f"Q{i+40:2d}: {ans3:^4}" print(row_str) print("\n" + "="*60) print("SUMMARY") print("="*60) answered = sum(1 for v in self.answers.values() if v != "---") print(f"Questions detected: {len(self.answers)}") print(f"Answered: {answered}") print(f"Unanswered: {len(self.answers) - answered}") def process_single_image(image_data) -> Dict[str, Any]: """Process a single image and return results with fixed indexing""" try: # Convert image data to numpy array if isinstance(image_data, str): # Base64 encoded image image_bytes = base64.b64decode(image_data) image = Image.open(io.BytesIO(image_bytes)) image_array = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) else: # Direct file upload image = Image.open(image_data) image_array = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) # Process the image using the new CorrectedOMRReader reader = CorrectedOMRReader(image_array=image_array) answers = reader.process() # No need for indexing fix in the new implementation fixed_answers = answers # Calculate CORRECTED statistics for 50 questions total total_questions = 50 # Fixed to always be 50 answered = sum(1 for v in fixed_answers.values() if v is not None) unanswered = total_questions - answered # Format answers for JSON (convert None to "null" string) formatted_answers = {} for q_num in range(1, total_questions + 1): answer = fixed_answers.get(q_num) formatted_answers[str(q_num)] = answer if answer is not None else "null" return { "success": True, "answers": formatted_answers, "summary": { "total_questions": total_questions, "answered": answered, "unanswered": unanswered } } except Exception as e: return { "success": False, "error": str(e), "answers": {}, "summary": { "total_questions": 50, "answered": 0, "unanswered": 50 } } @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ "status": "healthy", "message": "OMR API is running" }) @app.route('/', methods=['GET']) def home(): """Home endpoint with API documentation""" return jsonify({ "message": "OMR Processing API", "version": "1.0", "endpoints": { "/process_omr": { "method": "POST", "description": "Process OMR answer sheets", "accepts": [ "Multipart form data with 'images' field", "JSON with base64 encoded images in 'images' array" ], "returns": "JSON with detected answers and summary" }, "/health": { "method": "GET", "description": "Health check endpoint" } }, "example_response": { "success": True, "answers": { "1": "A", "2": "B", "3": "null" }, "summary": { "total_questions": 50, "answered": 45, "unanswered": 5 } } }) # <-----------------> @app.route('/easyocr', methods=['POST']) def easyocr_image(): if 'images' not in request.files: return jsonify({'error': 'No image files provided'}), 400 images = request.files.getlist('images') extracted_texts = [] for image_file in images: try: # Save the image to a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_image_file: image_file.save(temp_image_file.name) temp_path = temp_image_file.name try: image_np = np.frombuffer(open(temp_path, 'rb').read(), np.uint8) image = cv2.imdecode(image_np, cv2.IMREAD_COLOR) # Perform OCR result = reader.readtext(image) # Extract text from the result text = " ".join([item[1] for item in result]) extracted_texts.append(text) ocr_extracted_texts.append(text) finally: # Clean up temp file if os.path.exists(temp_path): os.unlink(temp_path) except Exception as e: extracted_texts.append(f"Error processing image with EasyOCR: {str(e)}") return jsonify({'extracted_texts': extracted_texts}) @app.route('/tesseract', methods=['POST']) def tesseract_image(): if 'images' not in request.files: return jsonify({'error': 'No image files provided'}), 400 images = request.files.getlist('images') extracted_texts = [] for image_file in images: try: # Save the image to a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_image_file: image_file.save(temp_image_file.name) temp_path = temp_image_file.name try: with Image.open(temp_path) as image: # Perform OCR using Tesseract text = pytesseract.image_to_string(image) extracted_texts.append(text.strip()) ocr_extracted_texts.append(text.strip()) finally: # Clean up the temporary file if os.path.exists(temp_path): os.unlink(temp_path) except Exception as e: extracted_texts.append(f"Error processing image with Tesseract: {str(e)}") return jsonify({'extracted_texts': extracted_texts}) @app.route('/process_question_paper', methods=['POST']) def process_question_paper(): global last_processed_question_paper_object if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 question_paper = QuestionPaper() try: # Create Images directory if it doesn't exist images_dir = os.path.join(app.root_path, 'Images') os.makedirs(images_dir, exist_ok=True) if file.filename.lower().endswith('.pdf'): question_paper_filename = "question_paper.pdf" question_paper_path = os.path.join(images_dir, question_paper_filename) file.save(question_paper_path) # Initialize the global object with the path question_paper.path = question_paper_path # For PDF processing images_from_pdf = convert_from_path(question_paper_path, poppler_path=_get_poppler_path()) all_text = "" for page_image in images_from_pdf: text = pytesseract.image_to_string(page_image) all_text += text + "\n" # Use improved parsing questions, answers = parse_question_paper_text(all_text) question_paper.questions = questions question_paper.answers = answers else: # Process as image question_paper_filename = "question_paper.png" question_paper_path = os.path.join(images_dir, question_paper_filename) file.save(question_paper_path) question_paper.path = question_paper_path image = Image.open(question_paper_path) text = pytesseract.image_to_string(image) # Use improved parsing questions, answers = parse_question_paper_text(text) question_paper.questions = questions question_paper.answers = answers # Clean the answers (remove any remaining unwanted patterns) question_paper.clean_answers() # Store the processed question paper globally last_processed_question_paper_object = question_paper return jsonify(question_paper.to_dict()) except Exception as e: return jsonify({'error': str(e)}), 500 def gemini_evaluate_answer_sheet_with_roll(question_paper_path, student_answer_path, questions, correct_answers, paddle_results): """ Evaluate entire answer sheet using Gemini and extract roll number """ try: model = genai.GenerativeModel('gemini-2.5-flash') # Create the expected answers list for the prompt expected_answers_text = "\n".join([f"{i+1}. {answer}" for i, answer in enumerate(correct_answers)]) prompt_text = f"""You are an OCR Assitant for an evaluvation script. You will be given an image of a question paper and an image of a student's handwritten answers along with traditional OCR evaluvations. Your task is assist the traditional OCR in overcoming its limitation with handwritten text the image may have bad quality handwritten text which the OCR may fail to extract and evaluvate properly, this is where you come in. Your task is to Just do a double check of the OCR results and correct any mistakes or missing answers. and provide the result in a structured way. Expected correct answers: {expected_answers_text} Traditional OCR Evaluation Results: {paddle_results} Instructions: - First, identify and extract the student's roll number from the answer sheet - Compare the student's handwritten answers with the expected answers above - Small spelling mistakes should be ignored and considered correct - If an answer has been crossed out or strikethrough, consider it incorrect - Be lenient with handwriting recognition issues - Look for answers by question numbers (1, 2, 3, etc.) Please evaluate ALL questions and respond in this EXACT JSON format: {{ "roll_number": "extracted_roll_number_here", "evaluations": [ {{"question_number": 1, "status": "Correct"}}, {{"question_number": 2, "status": "Wrong"}}, {{"question_number": 3, "status": "Missing"}}, ... ] }} For roll_number: Look for patterns like "Roll No:", "Roll Number:", "Reg No:", or any number sequence that appears to be a student identifier. For each question, use ONLY one of these three status values: - "Correct" - if the student's answer matches the expected answer (allowing for minor spelling) - "Wrong" - if the student's answer is clearly different from the expected answer - "Missing" - if no answer is visible for this question number Respond with ONLY the JSON format above, no other text. ! Note Ignore texts like `GENERAL KNOWLEDGE QUESTION PAPER WITH ANSWERS` and the final output should only have actual questions. """ # Handle PDF vs Image for question paper if question_paper_path.lower().endswith('.pdf'): # Convert PDF to images pdf_images = convert_from_path(question_paper_path, poppler_path=_get_poppler_path()) question_paper_img = pdf_images[0] # Use first page else: question_paper_img = Image.open(question_paper_path) # Load student answer image student_answer_img = Image.open(student_answer_path) # Create content for the model content = [prompt_text, question_paper_img, student_answer_img] response = model.generate_content(content) result_text = response.text.strip() print(f"Gemini response: {result_text}") # Try to parse JSON response import json try: # Clean the response - sometimes Gemini adds markdown formatting if "```json" in result_text: result_text = result_text.split("```json")[1].split("```")[0].strip() elif "```" in result_text: result_text = result_text.split("```")[1].strip() parsed_result = json.loads(result_text) return parsed_result["roll_number"], parsed_result["evaluations"] except (json.JSONDecodeError, KeyError) as e: print(f"Failed to parse JSON response: {e}") print(f"Raw response: {result_text}") # Fallback - extract roll number using OCR and create default "Error" results roll_number = extract_roll_number(student_answer_path) return roll_number, [{"question_number": i+1, "status": "Error"} for i in range(len(correct_answers))] except Exception as e: print(f"Error in Gemini evaluation: {str(e)}") # Return error status for all questions with OCR extracted roll number roll_number = extract_roll_number(student_answer_path) return roll_number, [{"question_number": i+1, "status": "Error"} for i in range(len(correct_answers))] def quick_match(correct_list, messy_student_list, min_score=80): """Quick function to match messy student answers""" from fuzzywuzzy import process import re results = [] used = set() for item in messy_student_list: # Extract content content = re.sub(r'^\d+\.?\s*', '', str(item)).strip() if content and content != '-': # Find best match match = process.extractOne(content, correct_list) if match and match[1] >= min_score: q_num = correct_list.index(match[0]) + 1 if q_num not in used: used.add(q_num) results.append((item, q_num, match[0], match[1])) return results def process_with_paddle_ocr(image_path, correct_answers): """ Process an image with PaddleOCR and perform similarity matching with correct answers Returns: tuple: (extracted_text, similarity_scores, average_similarity) """ try: # Initialize PaddleOCR from paddleocr import PaddleOCR print("Initializing PaddleOCR...") ocr = PaddleOCR( use_doc_orientation_classify=True, use_doc_unwarping=False, use_textline_orientation=False ) print("PaddleOCR initialized.") # Read and process the image # result = ocr.ocr(image_path, cls=True) print("Preditcing") result = ocr.predict(image_path) print("PaddleOCR processing completed.") # print(f"PaddleOCR result: {result}") print("Correct Answers are:") print(correct_answers) for res in result: words = res["rec_texts"] print(f"PaddleOCR extracted words: {words}") # words = result["rec_texts"] result = quick_match(correct_answers, words, min_score=85) print(f"PaddleOCR matched results: {result}") return result except Exception as e: print(f"Error in PaddleOCR processing: {str(e)}") return None, [], 0 # OCR Evaluvation Endpoint @app.route('/evaluate_answers', methods=['POST']) def evaluate_answers(): global ocr_extracted_texts if 'student_answers' not in request.files: return jsonify({"error": "Missing student answers"}), 400 student_answer_files = request.files.getlist('student_answers') # Get teacher email and exam name from the request teacher_email = request.form.get('teacher_email', 'unknown@example.com') exam_name = request.form.get('exam_name', 'Untitled Exam') # Get exam name from form data # Retrieve the question paper object question_paper = last_processed_question_paper_object if last_processed_question_paper_object is None: return jsonify({'error': 'Question paper not found or processed yet'}), 404 student_answer_paths = [] try: # Save student answer files temporarily for student_answer_file in student_answer_files: with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_ans_file: student_answer_file.save(temp_ans_file.name) student_answer_paths.append(temp_ans_file.name) # Process each student's answer sheet all_students_results = [] if question_paper.path and os.path.exists(question_paper.path): print(f"Starting Gemini evaluation for exam: {exam_name} with {len(student_answer_paths)} students...") for idx, student_answer_path in enumerate(student_answer_paths): print(f"Processing answer sheet {idx + 1} with PaddleOCR...") # First process with PaddleOCR results = process_with_paddle_ocr( student_answer_path, question_paper.answers ) roll_number, sheet_evaluations = gemini_evaluate_answer_sheet_with_roll( question_paper.path, student_answer_path, question_paper.questions, question_paper.answers, results ) # Process the results for this student student_results = [] for eval_result in sheet_evaluations: question_num = eval_result["question_number"] if 1 <= question_num <= len(question_paper.questions): student_results.append({ 'question_number': question_num, 'question_text': question_paper.questions[question_num - 1], 'correct_answer': question_paper.answers[question_num - 1], 'status': eval_result["status"] }) # Calculate summary for this student correct_count = sum(1 for result in student_results if result['status'] == 'Correct') total_questions = len(student_results) score_percentage = (correct_count / total_questions) * 100 if total_questions > 0 else 0 student_summary = { 'roll_number': roll_number, 'total_questions': len(question_paper.answers), 'correct_answers': correct_count, 'wrong_answers': sum(1 for result in student_results if result['status'] == 'Wrong'), 'missing_answers': sum(1 for result in student_results if result['status'] == 'Missing'), 'error_answers': sum(1 for result in student_results if result['status'] == 'Error'), 'score_percentage': round(score_percentage, 2), 'evaluation_results': student_results, 'ocr_results': { 'extracted_text': results, } } all_students_results.append(student_summary) final_results = { 'exam_name': exam_name, # Include exam name in results 'total_students': len(student_answer_paths), 'students_evaluated': all_students_results } # STORE THE RESULTS IN SUPABASE WITH EXAM NAME try: supabase_handler = SupabaseHandler() unique_key = supabase_handler.store_evaluation_result(teacher_email, final_results, exam_name) if unique_key: # Add the unique key to the response final_results['unique_key'] = unique_key final_results['storage_success'] = True print(f"Results stored successfully with key: {unique_key} for exam: {exam_name}") else: final_results['storage_success'] = False final_results['storage_error'] = "Failed to store results in database" print("Failed to store results in Supabase") except Exception as storage_error: print(f"Error storing results: {str(storage_error)}") final_results['storage_success'] = False final_results['storage_error'] = str(storage_error) return jsonify(final_results) else: return jsonify({ 'error': 'Question paper file not found for Gemini evaluation.' }) except Exception as e: return jsonify({'error': str(e)}), 500 finally: # Clean up temporary student answer files for path in student_answer_paths: try: if os.path.exists(path): os.unlink(path) except PermissionError: pass # File still locked on Windows; OS will clean up temp dir # Get Evaluation @app.route('/get_evaluation_result/', methods=['GET']) def get_evaluation_result(unique_key): """ Get evaluation result by unique key """ try: supabase_handler = SupabaseHandler() result = supabase_handler.get_evaluation_result(unique_key) if result: return jsonify({ 'success': True, 'data': result }) else: return jsonify({ 'error': 'Evaluation result not found' }), 404 except Exception as e: return jsonify({'error': str(e)}), 500 # Get Teacher Evaluation @app.route('/get_teacher_evaluations/', methods=['GET']) def get_teacher_evaluations(teacher_email): """ Get all evaluation results for a specific teacher """ try: supabase_handler = SupabaseHandler() results = supabase_handler.get_teacher_evaluations(teacher_email) return jsonify({ 'success': True, 'data': results, 'total_evaluations': len(results) }) except Exception as e: return jsonify({'error': str(e)}), 500 # Get OMR Answer Key @app.route('/get_omr_answer_key', methods=['GET']) def get_omr_answer_key(): """Get the currently stored OMR answer key""" global last_processed_omr_key if last_processed_omr_key is None: return jsonify({ 'error': 'No answer key has been processed yet' }), 404 return jsonify({ 'success': True, 'answer_key': last_processed_omr_key.to_dict() }) def omr_gemini_process(error_questions, correct_answers, image_file): """ Use Gemini to assist in evaluating OMR sheets, especially for error questions """ try: model = genai.GenerativeModel('gemini-2.5-flash') prompt_text = f""" You are an OMR Assistant for an evaluvation script. Your main purpose is to assist in the process. Correct Answers to questions sorted by question number: {correct_answers} Error Question numbers: {error_questions} Your task: - From the given image identify the student name and roll number - if for some reason the traditional OMR Processing failed to detect some answers, those question numbers will be provided to you, you should look into those questions form the given image and correct answers. - Only provide answer for the questions that are in the error list. - You can ignore the rest of the question - if Error question is empty, just extract the roll number and name Please evaluate ALL questions and respond in this EXACT JSON format: {{ "roll_number": "extracted_roll_number_here", "evaluations": [ {{"question_number": 1, "status": "Correct"}}, {{"question_number": 2, "status": "Wrong"}}, {{"question_number": 3, "status": "Missing"}}, ... ] }} """ student_answer_img = image_file content = [prompt_text, student_answer_img] response = model.generate_content(content) result_text = response.text.strip() print(f"Gemini response: {result_text}") import json try: # Clean the response - sometimes Gemini adds markdown formatting if "```json" in result_text: result_text = result_text.split("```json")[1].split("```")[0].strip() elif "```" in result_text: result_text = result_text.split("```")[1].strip() parsed_result = json.loads(result_text) return parsed_result["roll_number"], parsed_result["evaluations"] except (json.JSONDecodeError, KeyError) as e: print(f"Failed to parse JSON response: {e}") print(f"Raw response: {result_text}") # Fallback - extract roll number using OCR and create default "Error" results roll_number = extract_roll_number(os.path.join("OMRChecker", "inputs", "OMRImage.jpg")) return roll_number, [{"question_number": i+1, "status": "Error"} for i in range(len(correct_answers))] except Exception as e: print(f"Error in OMR Gemini processing: {str(e)}") return "Unknown", [{"question_number": q, "status": "Error"} for q in error_questions] @app.route('/evaluate_omr', methods=['POST']) def evaluate_omr(): """ Evaluate OMR answers against stored answer key """ global last_processed_omr_key, last_processed_omr_results, porcessed_omr_results, OMR_IMAGES # Get teacher email and exam name from the request teacher_email = request.form.get('teacher_email', 'unknown@example.com') exam_name = request.form.get('exam_name', 'Untitled Exam') # Get exam name from form data if not last_processed_omr_key: return jsonify({ 'error': 'No answer key has been processed. Please process an answer key first.' }), 400 if not last_processed_omr_results: return jsonify({ 'error': 'No OMR sheet has been processed. Please process an OMR sheet first.' }), 400 try: # Get the marked answers from the processed OMR if isinstance(last_processed_omr_results, list): omr_data = last_processed_omr_results[0] # Take first sheet if multiple else: omr_data = last_processed_omr_results student_datas = [] for idx, omr_data in enumerate(porcessed_omr_results): marked_answers = omr_data image_file = OMR_IMAGES[idx] # Get correct answers from answer key (only for questions that exist) correct_answers = last_processed_omr_key.answers total_questions_in_key = len(correct_answers) # Evaluate answers only for questions that exist in the answer key evaluation_details = [] correct_count = 0 wrong_count = 0 missing_count = 0 error_questions = [] for q_num in sorted(correct_answers.keys()): print(f"Evaluating Question {q_num}") print(f"Correct Answer: {correct_answers[q_num]} | Marked Answer: {marked_answers.get(str(q_num))}") correct_ans = correct_answers[q_num] marked_ans = marked_answers.get(str(q_num)) if marked_ans is None or marked_ans == '' or len(str(marked_ans)) > 1 or marked_ans == 'nan': status = 'Missing' error_questions.append(q_num) missing_count += 1 elif marked_ans.upper() == correct_ans.upper(): status = 'Correct' correct_count += 1 else: status = 'Wrong' wrong_count += 1 evaluation_details.append({ 'question_number': q_num, 'question_text': last_processed_omr_key.questions[q_num - 1] if q_num <= len(last_processed_omr_key.questions) else f"Question {q_num}", 'correct_answer': correct_ans, 'marked_answer': marked_ans if marked_ans != 'null' else None, 'status': status }) roll_no, gemini_result = omr_gemini_process( error_questions, last_processed_omr_key.answers, image_file ) for err_idx in error_questions: for gemini_eval in gemini_result: if gemini_eval["question_number"] == err_idx: correct_ans = last_processed_omr_key.answers[err_idx] marked_ans = None # Since it was an error question status = gemini_eval["status"] if status == "Correct": correct_count += 1 # wrong_count -= 1 # Adjust wrong count missing_count -= 1 # Adjust missing count elif status == "Wrong": wrong_count += 1 missing_count -= 1 # Adjust missing count elif status == "Missing": missing_count += 1 # Update the evaluation details for eval_detail in evaluation_details: if eval_detail['question_number'] == err_idx: eval_detail.update({ 'marked_answer': marked_ans, 'status': status }) break break # Calculate score total_score = correct_count * last_processed_omr_key.marks_per_question if last_processed_omr_key.negative_marking > 0: total_score -= wrong_count * last_processed_omr_key.negative_marking max_score = total_questions_in_key * last_processed_omr_key.marks_per_question student_summary = { 'roll_number': roll_no, 'total_questions': len(last_processed_omr_key.answers), 'correct_answers': correct_count, 'wrong_answers': wrong_count, 'missing_answers': missing_count, 'error_answers': len(error_questions), 'score_percentage': correct_count / len(last_processed_omr_key.answers) * 100 if len(last_processed_omr_key.answers) > 0 else 0, 'evaluation_results': evaluation_details, 'ocr_results': { 'extracted_text': gemini_result, } } student_datas.append(student_summary) # Format the data in the required structure for Supabase formatted_evaluation_data = { 'exam_name': exam_name, # Include exam name in results 'total_students': len(student_datas), 'students_evaluated': student_datas } # Store results in Supabase (optional — skip if credentials not configured) unique_key = None try: supabase_handler = SupabaseHandler() unique_key = supabase_handler.store_evaluation_result(teacher_email, formatted_evaluation_data, exam_name) except Exception as supa_err: print(f"Supabase storage skipped: {supa_err}") # Prepare answer key info answer_key_info = { "title": getattr(last_processed_omr_key, 'title', 'Untitled'), "marks_per_question": last_processed_omr_key.marks_per_question, "negative_marking": last_processed_omr_key.negative_marking } # Return response in the same format as stored in Supabase final_result = { "success": True, "unique_key": unique_key, #**formatted_evaluation_data, # Include all the formatted data "additional_info": { "answer_key_info": answer_key_info } } return jsonify(final_result) except Exception as e: return jsonify({ "success": False, "error": f"Evaluation failed: {str(e)}" }), 500 def process_with_gemini(evaluation_details, evaluation_summary, omr_data): """ Use Gemini to independently evaluate the OMR sheet and extract student details """ global last_processed_omr_key try: model = genai.GenerativeModel('gemini-2.5-flash') # Prepare the questions and correct answers for Gemini questions_and_answers = "" for i, (q_num, correct_answer) in enumerate(sorted(last_processed_omr_key.answers.items())): question_text = last_processed_omr_key.questions[i] if i < len(last_processed_omr_key.questions) else f"Question {q_num}" questions_and_answers += f"Question {q_num}: {question_text}\nCorrect Answer: {correct_answer}\n\n" prompt = f""" You are a teacher grading an OMR answer sheet. STUDENT INFO: Extract the student's name and roll number from the image. GRADING TASK: For each question, identify which bubble (A, B, C, or D) is filled/darkened, then compare with the correct answer. QUESTIONS AND CORRECT ANSWERS: {questions_and_answers} IMPORTANT: Look carefully at each row of bubbles. A filled bubble will be darkened/shaded, while empty bubbles will be white/clear. Respond in this EXACT JSON format: {{ "student_info": {{ "name": "extracted student name", "roll_no": "extracted roll number" }}, "gemini_evaluation": [ {{"question": 1, "marked_answer": "C", "correct_answer": "C", "status": "Correct"}}, {{"question": 2, "marked_answer": "D", "correct_answer": "D", "status": "Correct"}}, // ... continue for all questions ] }} For status: use "Correct", "Wrong", or "Missing" only. For marked_answer: use "A", "B", "C", "D", or null if no bubble is clearly filled. """ # Get the image - we need to retrieve it from the last processed OMR # Since we don't store the image directly, we'll need to work with what we have # For now, let's assume we have access to the image file # Check if we have image data stored if 'image_data' in omr_data: # If we have base64 image data image_data = omr_data['image_data'] image_bytes = base64.b64decode(image_data) image = Image.open(io.BytesIO(image_bytes)) elif 'filename' in omr_data: # Try to find the image file try: # Look for the image in common locations possible_paths = [ f"Images/{omr_data['filename']}", f"temp/{omr_data['filename']}", omr_data['filename'] ] image = None for path in possible_paths: if os.path.exists(path): image = Image.open(path) break if image is None: # If we can't find the image, return a fallback result return { "student_info": { "name": "Image not available", "roll_number": "Image not available" }, "verification": { "evaluation_correct": "unknown", "confidence": "low", "discrepancies": ["Original image not available for verification"], "notes": "Could not verify due to missing image file" }, "gemini_evaluation": [] } except Exception as e: print(f"Error loading image: {str(e)}") return { "student_info": { "name": "Error loading image", "roll_number": "Error loading image" }, "verification": { "evaluation_correct": "unknown", "confidence": "low", "discrepancies": [f"Error loading image: {str(e)}"], "notes": "Image processing failed" }, "gemini_evaluation": [] } else: # No image reference available return { "student_info": { "name": "No image data", "roll_number": "No image data" }, "verification": { "evaluation_correct": "unknown", "confidence": "low", "discrepancies": ["No image data available"], "notes": "Cannot verify without image" }, "gemini_evaluation": [] } # Generate content with Gemini response = model.generate_content([prompt, image]) result_text = response.text.strip() print(f"Gemini raw response: {result_text}") # Parse the JSON response try: # Clean the response - remove markdown formatting if present if "```json" in result_text: result_text = result_text.split("```json")[1].split("```")[0].strip() elif "```" in result_text: result_text = result_text.split("```")[1].strip() parsed_result = json.loads(result_text) # Update summary counts and score based on the evaluation if 'gemini_evaluation' in parsed_result: correct_count = sum(1 for item in parsed_result['gemini_evaluation'] if item.get('status') == 'Correct') wrong_count = sum(1 for item in parsed_result['gemini_evaluation'] if item.get('status') == 'Wrong') missing_count = sum(1 for item in parsed_result['gemini_evaluation'] if item.get('status') == 'Missing') score = (correct_count * last_processed_omr_key.marks_per_question) - (wrong_count * last_processed_omr_key.negative_marking) max_score = len(last_processed_omr_key.answers) * last_processed_omr_key.marks_per_question parsed_result['summary'] = { "total_questions": len(last_processed_omr_key.answers), "correct_count": correct_count, "wrong_count": wrong_count, "missing_count": missing_count, "score": score, "max_score": max_score, "percentage": round((score / max_score) * 100, 2) if max_score > 0 else 0 } return parsed_result except json.JSONDecodeError as e: print(f"Failed to parse Gemini JSON response: {e}") print(f"Raw response: {result_text}") # Fallback response with extracted text attempt return { "student_info": { "name": "Parse error", "roll_number": "Parse error" }, "verification": { "evaluation_correct": "unknown", "confidence": "low", "discrepancies": ["Failed to parse Gemini response"], "notes": f"JSON parse error: {str(e)}" }, "gemini_evaluation": [], "raw_response": result_text # Include raw response for debugging } # print(f"Error in Gemini processing: {str(e)}") except Exception as e: return { "student_info": { "name": "Processing error", "roll_number": "Processing error" }, "verification": { "evaluation_correct": "unknown", "confidence": "low", "discrepancies": [f"Gemini processing error: {str(e)}"], "notes": "Failed to process with Gemini" }, "gemini_evaluation": [] } def compare_evaluations(our_evaluation, gemini_evaluation): """ Compare our automated evaluation with Gemini's independent evaluation """ if not gemini_evaluation: return { "comparison_available": False, "reason": "Gemini evaluation not available" } matches = 0 differences = [] total_compared = 0 # Create a lookup for our evaluation our_eval_lookup = {detail['question_number']: detail for detail in our_evaluation} for gemini_item in gemini_evaluation: q_num = gemini_item.get('question') if q_num in our_eval_lookup: total_compared += 1 our_status = our_eval_lookup[q_num]['status'] gemini_status = gemini_item.get('status') if our_status == gemini_status: matches += 1 else: differences.append({ "question": q_num, "our_evaluation": { "marked_answer": our_eval_lookup[q_num]['marked_answer'], "status": our_status }, "gemini_evaluation": { "marked_answer": gemini_item.get('marked_answer'), "status": gemini_status } }) agreement_rate = (matches / total_compared) * 100 if total_compared > 0 else 0 return { "comparison_available": True, "total_questions_compared": total_compared, "agreements": matches, "differences_count": len(differences), "agreement_rate": round(agreement_rate, 2), "differences": differences } # Also need to modify the process_omr endpoint to store image data for later use @app.route('/process_omr', methods=['POST']) def process_omr_enhanced(): """ Enhanced OMR processing that stores image data for later Gemini processing """ global last_processed_omr_results global OMR_IMAGES global porcessed_omr_results OMR_IMAGES = [] porcessed_omr_results = [] try: results = [] print("Starting OMR processing...") # Check if files were uploaded if 'images' in request.files: files = request.files.getlist('images') results = [] for idx, file in enumerate(files): if file.filename == '': continue print(f"===================================== Processing file {file.filename} =====================================") name, extension = os.path.splitext(file.filename) filename = os.path.join("OMRChecker", "inputs", "OMRImage" + extension) file.save(filename) OMR_IMAGES.append(Image.open(filename)) result = subprocess.run([sys.executable, os.path.join('OMRChecker', 'main.py'), '--inputDir=' + os.path.join('OMRChecker', 'inputs')]) print("OMR Finished Processing Successfully") folder = os.path.join("outputs", "Results") csv_files = [f for f in os.listdir(folder) if f.endswith(".csv")] print("CSV FILES:", csv_files) result_file = os.path.join(folder, csv_files[0]) print("Found Result File", result_file) df = pd.read_csv(result_file) # Convert to JSON data_json = df.to_json(orient="records") parsed_json = json.loads(data_json) columns_dict = df.to_dict(orient="list") print(columns_dict) questions_only = {k.replace("q", ""): v[0] for k, v in columns_dict.items() if k.startswith("q")} last_processed_omr_results = questions_only porcessed_omr_results.append(questions_only) if os.path.exists(result_file): os.remove(result_file) print(f"{result_file} deleted") return jsonify(parsed_json) else: return jsonify({ "success": False, "error": "No images provided. Use 'images' field for file uploads.", "results": [] }), 400 except Exception as e: return jsonify({ "success": False, "error": f"Server error: {str(e)}", "results": [] }), 500 @app.route('/get_question_details/', methods=['GET']) def get_question_details(question_number): """Get detailed information about a specific question""" global last_processed_omr_key if last_processed_omr_key is None: return jsonify({ 'error': 'No answer key has been processed yet' }), 404 question_data = last_processed_omr_key.get_question_details(question_number) if question_data is None: return jsonify({ 'error': f'Question number {question_number} not found' }), 404 return jsonify({ 'success': True, 'question_data': question_data }) @app.route('/debug_parsing', methods=['GET']) def debug_parsing(): """ Debug endpoint to see how OCR text is being parsed """ if not ocr_extracted_texts: return jsonify({'error': 'No OCR extracted texts available.'}), 400 debug_results = [] for ocr_text in ocr_extracted_texts: parsed_answers = improved_clean_and_parse_ocr_text(ocr_text) debug_results.append({ 'original_ocr_text': ocr_text, 'parsed_answers': parsed_answers }) return jsonify({'debug_results': debug_results}) def extract_omr_metadata(text: str) -> tuple: """Extract title and duration from the question paper text""" title = "" duration = "" # Look for title (usually in first few lines, often in caps) lines = text.split('\n') for line in lines[:5]: # Check first 5 lines if line.strip().upper() == line.strip() and len(line.strip()) > 10: title = line.strip() break # Look for duration/time time_pattern = r'Time:\s*(\d+)\s*(minutes|mins|min)' duration_match = re.search(time_pattern, text, re.IGNORECASE) if duration_match: duration = f"{duration_match.group(1)} minutes" return title, duration def extract_omr_answers(text: str) -> dict: """Extract answers from the question paper text""" answers = {} questions = [] question_data = [] current_question = None print("\nStarting answer extraction...") # Split text into lines and process line by line lines = [line.strip() for line in text.split('\n') if line.strip()] # Skip header lines until we find the first question started = False current_dict = None for line in lines: print(f"Processing line: {line}") # Skip header or empty lines if not started: if line.startswith('1.'): started = True else: continue # Check for new question question_match = re.match(r'^(\d+)[.)](.*?)$', line) if question_match: # Save previous question if exists if current_dict: question_data.append(current_dict) # Start new question q_num = int(question_match.group(1)) q_text = question_match.group(2).strip() current_dict = { 'number': q_num, 'question': q_text, 'options': {}, 'answer': None } continue # Check for options option_match = re.match(r'^([A-D])[).](.*?)$', line) if option_match and current_dict is not None: opt_letter = option_match.group(1) opt_text = option_match.group(2).strip() current_dict['options'][opt_letter] = opt_text continue # Check for answer answer_match = re.match(r'^\s*Answer[:\s]*([A-D]|.+)$', line, re.IGNORECASE) if answer_match and current_dict is not None: answer = answer_match.group(1).strip() # print(f"For Question: {current_dict['number']}, Options are:") # print(current_dict['options']) for opt_letter, opt_text in current_dict['options'].items(): if answer.lower() == opt_text.lower(): answer = opt_letter break current_dict['answer'] = answer continue # Add last question if current_dict: question_data.append(current_dict) print("\nExtracted Question Data:") for q in question_data: print(f"\nQuestion {q['number']}:") print(f"Text: {q['question']}") print(f"Options: {q['options']}") print(f"Answer: {q['answer']}") # Add to return format if q['answer']: answers[q['number']] = q['answer'] questions.append(f"{q['number']}. {q['question']}") print(f"\nExtracted {len(questions)} questions and {len(answers)} answers") print("Questions:", questions) print("Answers:", answers) return answers, questions def debug_text_extraction(text: str): """Helper function to debug text extraction issues""" print("=== Extracted Text ===") print(text) print("\n=== Line by Line Analysis ===") for line in text.split('\n'): if line.strip(): print(f"Line: {line.strip()}") @app.route('/process_omr_answer_key', methods=['POST']) def process_omr_answer_key(): """ Process OMR answer key from either: 1. JSON format with direct answers 2. PDF/Image of question paper with answers marked For JSON format: { "answers": { "1": "A", "2": "B", ... }, "marks_per_question": 1.0, # optional, defaults to 1 "negative_marking": 0.0 # optional, defaults to 0 } For PDF/Image: multipart/form-data with 'file' field containing the question paper """ global last_processed_omr_key try: omr_key = OMRAnswerKey() # Check if file upload or JSON if 'file' in request.files: file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 # Create Images directory if it doesn't exist images_dir = os.path.join(app.root_path, 'Images') os.makedirs(images_dir, exist_ok=True) if file.filename.lower().endswith('.pdf'): # Save and process PDF answer_key_path = os.path.join(images_dir, "omr_answer_key.pdf") file.save(answer_key_path) omr_key.path = answer_key_path # Convert PDF to images and extract text all_text = "" try: print(f"\nProcessing PDF file: {answer_key_path}") images_from_pdf = convert_from_path( answer_key_path, poppler_path=_get_poppler_path(), dpi=300 # Increase DPI for better quality ) print(f"Converted PDF to {len(images_from_pdf)} images") for idx, page_image in enumerate(images_from_pdf): print(f"\nProcessing page {idx + 1}") # Preprocess the image for better OCR # Convert to numpy array img_np = np.array(page_image) # Convert to grayscale gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) # Apply thresholding to get black and white image _, threshold = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # Save processed image for debugging debug_image_path = os.path.join(images_dir, f"debug_page_{idx + 1}.png") cv2.imwrite(debug_image_path, threshold) print(f"Saved processed image to {debug_image_path}") # Configure Tesseract parameters for better accuracy custom_config = r'--oem 3 --psm 6' text = pytesseract.image_to_string(threshold, config=custom_config) print(f"Extracted text length: {len(text)}") all_text += text + "\n" print("\nTotal extracted text length:", len(all_text)) except Exception as e: print(f"Error during PDF processing: {str(e)}") raise # Debug the extracted text print("\nDebugging PDF extraction:") debug_text_extraction(all_text) # Extract metadata and answers title, duration = extract_omr_metadata(all_text) answers, questions = extract_omr_answers(all_text) print("\nExtracted answers:", answers) omr_key.set_metadata(title, duration) omr_key.set_answers(answers) omr_key.questions = questions else: # Process as image answer_key_path = os.path.join(images_dir, "omr_answer_key.png") file.save(answer_key_path) omr_key.path = answer_key_path image = Image.open(answer_key_path) text = pytesseract.image_to_string(image) # Debug the extracted text print("\nDebugging Image extraction:") debug_text_extraction(text) # Extract metadata and answers title, duration = extract_omr_metadata(text) answers, questions = extract_omr_answers(text) print("\nStructured Extraction Results:") print("Title:", title) print("Duration:", duration) print("\nQuestions found:", len(questions)) print("Answers found:", len(answers)) print("\nAnswers:", answers) omr_key.set_metadata(title, duration) omr_key.set_answers(answers) omr_key.questions = questions # Set default marking scheme marks_per_question = float(request.form.get('marks_per_question', 1.0)) negative_marking = float(request.form.get('negative_marking', 0.0)) else: # Process JSON input if not request.is_json: return jsonify({'error': 'Request must be JSON or file upload'}), 400 data = request.get_json() if 'answers' not in data: return jsonify({'error': 'Answer key must be provided'}), 400 # Validate answer format answer_key = data['answers'] for q_num, answer in answer_key.items(): try: q_num = int(q_num) if not isinstance(answer, str) or answer.upper() not in ['A', 'B', 'C', 'D']: return jsonify({ 'error': f'Invalid answer format for question {q_num}. Must be A, B, C, or D' }), 400 except ValueError: return jsonify({ 'error': f'Question numbers must be integers, got {q_num}' }), 400 # Set the answers omr_key.set_answers(answer_key) # Set metadata if provided title = data.get('title', '') duration = data.get('duration', '') omr_key.set_metadata(title, duration) # Set marking scheme marks_per_question = float(data.get('marks_per_question', 1.0)) negative_marking = float(data.get('negative_marking', 0.0)) # Set marking scheme omr_key.set_marking_scheme(marks_per_question, negative_marking) # Store globally last_processed_omr_key = omr_key return jsonify({ 'success': True, 'message': 'OMR answer key processed successfully', 'answer_key': omr_key.to_dict() }) except Exception as e: return jsonify({ 'error': f'Failed to process answer key: {str(e)}' }), 500 if __name__ == '__main__': app.run( host="0.0.0.0", port=int(os.environ.get("PORT", 5000)), debug=os.environ.get("FLASK_DEBUG", "false").lower() == "true" )