Spaces:
Sleeping
Sleeping
| import sys | |
| import platform | |
| import easyocr | |
| from pdf2image import convert_from_path, convert_from_bytes | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from dataclasses import dataclass | |
| from typing import List, Tuple, Optional, Dict, Any | |
| from collections import defaultdict | |
| import numpy as np | |
| import cv2 | |
| import pytesseract | |
| from PIL import Image | |
| import os | |
| import tempfile | |
| import difflib | |
| import re | |
| from fuzzywuzzy import fuzz | |
| from dotenv import load_dotenv | |
| import google.generativeai as genai | |
| import asyncio | |
| import base64 | |
| import io | |
| import json | |
| import pandas as pd | |
| import subprocess | |
| # Import the SupabaseHandler | |
| import uuid | |
| from datetime import datetime | |
| from supabase import create_client, Client | |
| _tesseract_cmd = os.getenv("TESSERACT_CMD") | |
| if _tesseract_cmd: | |
| pytesseract.pytesseract.tesseract_cmd = _tesseract_cmd | |
| elif platform.system() == "Windows": | |
| pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" | |
| def _get_poppler_path(): | |
| env_path = os.getenv("POPPLER_PATH") | |
| if env_path: | |
| return env_path | |
| if platform.system() == "Windows": | |
| # Check common install locations | |
| candidates = [ | |
| r'C:\Program Files\poppler\Library\bin', | |
| r'C:\Program Files\poppler\poppler-24.08.0\Library\bin', | |
| ] | |
| # Also scan for any versioned poppler directory | |
| poppler_base = r'C:\Program Files\poppler' | |
| if os.path.isdir(poppler_base): | |
| for entry in os.listdir(poppler_base): | |
| candidate = os.path.join(poppler_base, entry, 'Library', 'bin') | |
| if candidate not in candidates: | |
| candidates.append(candidate) | |
| for path in candidates: | |
| if os.path.isdir(path): | |
| return path | |
| return None | |
| load_dotenv() | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| app = Flask(__name__) | |
| CORS(app) | |
| reader = easyocr.Reader(['en']) | |
| # Global variables to store processing results | |
| ocr_extracted_texts = [] | |
| last_processed_question_paper_object = None | |
| last_processed_omr_key = None # Global variable to store OMR answer key | |
| last_processed_omr_results = None # Global variable to store OMR processing results | |
| porcessed_omr_results = [] | |
| OMR_IMAGES = [] | |
| class SupabaseHandler: | |
| def __init__(self): | |
| url: str = os.getenv("SUPABASE_URL") | |
| key: str = os.getenv("SUPABASE_ANON_KEY") | |
| if not url or not key: | |
| raise ValueError("Supabase URL and ANON_KEY must be set in environment variables") | |
| self.supabase: Client = create_client(url, key) | |
| def store_evaluation_result(self, teacher_email, evaluation_data, exam_name=None): | |
| """ | |
| Store evaluation result in Supabase with a unique key and exam name | |
| Returns the unique key for retrieval | |
| """ | |
| try: | |
| # Generate unique key | |
| unique_key = str(uuid.uuid4()) | |
| # Prepare data for storage | |
| storage_data = { | |
| "unique_key": unique_key, | |
| "teacher_email": teacher_email, | |
| "evaluation_data": evaluation_data, | |
| "exam_name": exam_name, # Add exam name field | |
| "created_at": datetime.utcnow().isoformat(), | |
| "total_students": evaluation_data.get("total_students", 0) | |
| } | |
| # Insert into Supabase | |
| result = self.supabase.table("evaluation_results").insert(storage_data).execute() | |
| if result.data: | |
| print(f"Successfully stored evaluation result with key: {unique_key} for exam: {exam_name}") | |
| return unique_key | |
| else: | |
| print("Failed to store evaluation result") | |
| return None | |
| except Exception as e: | |
| print(f"Error storing evaluation result: {str(e)}") | |
| return None | |
| def get_evaluation_result(self, unique_key): | |
| """ | |
| Retrieve evaluation result by unique key | |
| """ | |
| try: | |
| result = self.supabase.table("evaluation_results").select("*").eq("unique_key", unique_key).execute() | |
| if result.data and len(result.data) > 0: | |
| return result.data[0] | |
| else: | |
| return None | |
| except Exception as e: | |
| print(f"Error retrieving evaluation result: {str(e)}") | |
| return None | |
| def get_teacher_evaluations(self, teacher_email): | |
| """ | |
| Get all evaluation results for a specific teacher | |
| """ | |
| try: | |
| result = self.supabase.table("evaluation_results").select("unique_key", "created_at", "total_students", "exam_name").eq("teacher_email", teacher_email).order("created_at", desc=True).execute() | |
| if result.data: | |
| return result.data | |
| else: | |
| return [] | |
| except Exception as e: | |
| print(f"Error retrieving teacher evaluations: {str(e)}") | |
| return [] | |
| class QuestionPaper: | |
| def __init__(self, path=None): | |
| self.questions = [] | |
| self.answers = [] | |
| self.path = path | |
| def clean_answers(self): | |
| # Remove unwanted patterns from answers | |
| unwanted_patterns = [ | |
| "Time: 15 MinutesMarks: 20", | |
| "Time: 15 Minutes Marks: 20", | |
| "GENERAL KNOWLEDGE QUESTION PAPER WITH ANSWERS", | |
| "GENERAL KNOWLEDGE QUESTION PAPER", | |
| ] | |
| # Filter out unwanted answers | |
| cleaned_answers = [] | |
| for answer in self.answers: | |
| if answer.strip() and answer.strip() not in unwanted_patterns: | |
| # Also check if it doesn't match any unwanted pattern with regex | |
| is_unwanted = False | |
| for pattern in unwanted_patterns: | |
| if pattern and re.search(re.escape(pattern), answer, re.IGNORECASE): | |
| is_unwanted = True | |
| break | |
| if not is_unwanted: | |
| cleaned_answers.append(answer.strip()) | |
| self.answers = cleaned_answers | |
| def add_question(self, question_text): | |
| self.questions.append(question_text) | |
| def add_answer(self, answer_text): | |
| self.answers.append(answer_text) | |
| def to_dict(self): | |
| return { | |
| 'questions': self.questions, | |
| 'answers': self.answers | |
| } | |
| class OMRAnswerKey: | |
| def __init__(self): | |
| self.answers = {} # Dictionary mapping question numbers to correct options | |
| self.total_marks = 0 | |
| self.marks_per_question = 1 | |
| self.negative_marking = 0 | |
| self.title = "" | |
| self.duration = "" | |
| self.total_questions = 0 | |
| self.path = None | |
| self.questions = [] # List to store questions if needed | |
| self.question_data = [] # List to store complete question data with options | |
| def __str__(self): | |
| return f"OMR Answer Key: {self.title}\nTotal Questions: {self.total_questions}\nAnswers: {self.answers}" | |
| def set_answers(self, answers: dict): | |
| """Set the answer key with question numbers as keys and correct options (A,B,C,D) as values""" | |
| self.answers = {int(k): v.upper() for k, v in answers.items() if v.upper() in ['A', 'B', 'C', 'D']} | |
| self.total_questions = len(self.answers) | |
| def set_marking_scheme(self, marks_per_question: float, negative_marking: float = 0): | |
| """Set the marking scheme for the answer key""" | |
| self.marks_per_question = marks_per_question | |
| self.negative_marking = negative_marking | |
| self.total_marks = self.total_questions * marks_per_question | |
| def set_metadata(self, title: str, duration: str): | |
| """Set metadata for the answer key""" | |
| self.title = title | |
| self.duration = duration | |
| def set_question_data(self, question_data): | |
| """Store complete question data including options""" | |
| self.question_data = question_data | |
| self.questions = [f"{q['number']}. {q['question']}" for q in question_data] | |
| self.answers = {q['number']: q['answer'] for q in question_data if q['answer']} | |
| self.total_questions = len(question_data) | |
| def get_question_details(self, question_number): | |
| """Get complete details for a specific question""" | |
| for q in self.question_data: | |
| print(f"Checking question number: {q['number']} with {question_number}") | |
| if str(q['number']) == str(question_number): | |
| return q | |
| return None | |
| def to_dict(self): | |
| return { | |
| 'title': self.title, | |
| 'duration': self.duration, | |
| 'total_questions': self.total_questions, | |
| 'answers': self.answers, | |
| 'total_marks': self.total_marks, | |
| 'marks_per_question': self.marks_per_question, | |
| 'negative_marking': self.negative_marking, | |
| 'questions': self.questions, | |
| 'question_data': self.question_data # Include complete question data | |
| } | |
| def parse_question_paper_text(text): | |
| """ | |
| Improved parsing function that correctly identifies questions and answers | |
| """ | |
| lines = [line.strip() for line in text.split('\n') if line.strip()] | |
| questions = [] | |
| answers = [] | |
| # Patterns to ignore (headers, footers, etc.) | |
| ignore_patterns = [ | |
| r'GENERAL KNOWLEDGE QUESTION PAPER.*', | |
| r'Time:\s*\d+\s*Minutes.*Marks:\s*\d+', | |
| r'Time:\s*\d+\s*MinutesMarks:\s*\d+', | |
| r'^\s*$' # Empty lines | |
| ] | |
| # Filter out unwanted lines | |
| filtered_lines = [] | |
| for line in lines: | |
| should_ignore = False | |
| for pattern in ignore_patterns: | |
| if re.match(pattern, line, re.IGNORECASE): | |
| should_ignore = True | |
| break | |
| if not should_ignore: | |
| filtered_lines.append(line) | |
| # Pattern to identify questions (starts with number followed by dot/parenthesis) | |
| question_pattern = r'^\d+\s*[.)]\s*(.+)' | |
| i = 0 | |
| while i < len(filtered_lines): | |
| current_line = filtered_lines[i].strip() | |
| # Check if current line is a question | |
| question_match = re.match(question_pattern, current_line) | |
| if question_match: | |
| # This is a question | |
| question_text = question_match.group(1).strip() | |
| questions.append(f"{current_line}") # Keep the full question with number | |
| # Look for the answer in the next line | |
| if i + 1 < len(filtered_lines): | |
| next_line = filtered_lines[i + 1].strip() | |
| # If next line is not a question (doesn't start with number), it's likely an answer | |
| if not re.match(question_pattern, next_line): | |
| answers.append(next_line) | |
| i += 2 # Skip both question and answer | |
| else: | |
| # Next line is also a question, so this question might not have an answer | |
| # Or the answer might be embedded in the same line | |
| # Try to extract answer from the question line itself if it contains common answer patterns | |
| answers.append("") # Placeholder for missing answer | |
| i += 1 | |
| else: | |
| # Last line and it's a question without answer | |
| answers.append("") | |
| i += 1 | |
| else: | |
| # This line doesn't match question pattern, skip it or try to pair it with previous question | |
| if len(questions) > len(answers): | |
| # We have more questions than answers, this might be an answer | |
| answers.append(current_line) | |
| i += 1 | |
| # Ensure we have equal number of questions and answers | |
| while len(answers) < len(questions): | |
| answers.append("") | |
| while len(questions) < len(answers): | |
| questions.append(f"Question {len(questions) + 1}") | |
| return questions, answers | |
| def improved_clean_and_parse_ocr_text(ocr_text): | |
| """ | |
| Improved parsing with better answer extraction logic | |
| """ | |
| # Remove special characters but keep important ones | |
| cleaned_text = re.sub(r'[|@~¥#$%^&*()_+=\[\]{}\\:";\'<>?,./]', ' ', ocr_text) | |
| # Split by newlines and filter out empty strings | |
| lines = [line.strip() for line in cleaned_text.split('\n') if line.strip()] | |
| individual_answers = [] | |
| # Try to find numbered patterns first | |
| numbered_pattern = re.compile(r'(\d+)\s*[.)]\s*([^0-9]+?)(?=\d+\s*[.)]|$)', re.MULTILINE | re.DOTALL) | |
| matches = numbered_pattern.findall(cleaned_text) | |
| if matches: | |
| # If we found numbered patterns, use them | |
| for number, answer in matches: | |
| answer = answer.strip() | |
| if answer and len(answer) > 1: | |
| individual_answers.append(answer) | |
| else: | |
| # Fallback to line-by-line processing | |
| for line in lines: | |
| # Remove leading numbers and punctuation | |
| cleaned_line = re.sub(r'^\d+\s*[.)]\s*', '', line).strip() | |
| if cleaned_line and len(cleaned_line) > 1: | |
| individual_answers.append(cleaned_line) | |
| return individual_answers | |
| def find_best_match(student_answer, correct_answers, threshold=0.6): | |
| """ | |
| Find the best matching correct answer for a student answer | |
| """ | |
| best_score = 0 | |
| best_match = None | |
| for correct_answer in correct_answers: | |
| # Use multiple similarity metrics | |
| ratio_score = difflib.SequenceMatcher(None, student_answer.lower(), correct_answer.lower()).ratio() | |
| fuzzy_score = fuzz.ratio(student_answer.lower(), correct_answer.lower()) / 100.0 | |
| partial_score = fuzz.partial_ratio(student_answer.lower(), correct_answer.lower()) / 100.0 | |
| # Take the maximum of all scores | |
| combined_score = max(ratio_score, fuzzy_score, partial_score) | |
| if combined_score > best_score: | |
| best_score = combined_score | |
| best_match = correct_answer | |
| # Only return match if it meets the threshold | |
| if best_score >= threshold: | |
| return best_match, best_score | |
| else: | |
| return None, best_score | |
| def extract_roll_number(student_answer_path): | |
| """ | |
| Extract roll number from student answer sheet using OCR | |
| """ | |
| try: | |
| student_answer_image = Image.open(student_answer_path) | |
| text = pytesseract.image_to_string(student_answer_image) | |
| # Look for common roll number patterns | |
| roll_patterns = [ | |
| r'(?i)roll\s*no\s*[:\-]?\s*(\w+)', | |
| r'(?i)roll\s*number\s*[:\-]?\s*(\w+)', | |
| r'(?i)roll\s*[:\-]?\s*(\w+)', | |
| r'(?i)reg\s*no\s*[:\-]?\s*(\w+)', | |
| r'(?i)registration\s*[:\-]?\s*(\w+)' | |
| ] | |
| for pattern in roll_patterns: | |
| match = re.search(pattern, text) | |
| if match: | |
| return match.group(1).strip() | |
| # If no explicit roll number found, try to find number sequences | |
| number_sequences = re.findall(r'\b\d{2,}\b', text) | |
| if number_sequences: | |
| return number_sequences[0] # Return first significant number sequence | |
| return "Unknown" | |
| except Exception as e: | |
| print(f"Error extracting roll number: {str(e)}") | |
| return "Unknown" | |
| # OMR Section | |
| class BubbleLocation: | |
| """Stores information about each bubble""" | |
| question_num: int | |
| option: str | |
| center: Tuple[int, int] | |
| radius: int | |
| filled: bool = False | |
| fill_ratio: float = 0.0 | |
| class CorrectedOMRReader: | |
| def __init__(self, image_path: str = None, image_array: np.ndarray = None): | |
| """Initialize the OMR Reader with an image""" | |
| if image_array is not None: | |
| self.image = image_array | |
| self.image_path = None | |
| elif image_path is not None: | |
| self.image = cv2.imread(image_path) | |
| self.image_path = image_path | |
| else: | |
| raise ValueError("Either image_array or image_path must be provided") | |
| if self.image is None: | |
| raise ValueError("Could not load image") | |
| self.gray = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY) | |
| self.height, self.width = self.gray.shape | |
| self.bubbles = [] | |
| self.answers = {} | |
| # Expected grid parameters | |
| self.expected_radius = 15 # Approximate bubble radius | |
| self.grid_params = { | |
| 'rows': 20, # Maximum rows | |
| 'cols': 3, # 3 columns of questions | |
| 'options': 4 # 4 options per question (A, B, C, D) | |
| } | |
| def preprocess_for_detection(self): | |
| """Preprocess specifically for bubble DETECTION (not fill detection)""" | |
| blurred = cv2.GaussianBlur(self.gray, (3, 3), 0) | |
| _, thresh = cv2.threshold(blurred, 200, 255, cv2.THRESH_BINARY) | |
| self.detection_thresh = cv2.bitwise_not(thresh) | |
| return self.detection_thresh | |
| def find_bubble_grid(self): | |
| """Find bubble locations using grid detection""" | |
| bubbles = [] | |
| param_sets = [ | |
| {'dp': 1.0, 'minDist': 20, 'param1': 50, 'param2': 28, 'minRadius': 10, 'maxRadius': 20}, | |
| {'dp': 1.1, 'minDist': 22, 'param1': 45, 'param2': 25, 'minRadius': 11, 'maxRadius': 19}, | |
| {'dp': 1.2, 'minDist': 25, 'param1': 40, 'param2': 30, 'minRadius': 9, 'maxRadius': 21}, | |
| ] | |
| for params in param_sets: | |
| circles = cv2.HoughCircles( | |
| self.gray, | |
| cv2.HOUGH_GRADIENT, | |
| dp=params['dp'], | |
| minDist=params['minDist'], | |
| param1=params['param1'], | |
| param2=params['param2'], | |
| minRadius=params['minRadius'], | |
| maxRadius=params['maxRadius'] | |
| ) | |
| if circles is not None: | |
| circles = np.round(circles[0, :]).astype("int") | |
| for (x, y, r) in circles: | |
| is_dup = False | |
| for bub in bubbles: | |
| if np.sqrt((x - bub[0])**2 + (y - bub[1])**2) < 15: | |
| is_dup = True | |
| break | |
| if not is_dup: | |
| bubbles.append((x, y, r)) | |
| print(f" Found {len(bubbles)} bubbles with Hough Circles") | |
| if len(bubbles) < 180: | |
| template_bubbles = self.template_matching_detection() | |
| bubbles.extend(template_bubbles) | |
| print(f" Added {len(template_bubbles)} bubbles with template matching") | |
| return bubbles | |
| def template_matching_detection(self): | |
| """Use template matching to find bubble locations""" | |
| bubbles = [] | |
| template_size = 30 | |
| template = np.zeros((template_size, template_size), dtype=np.uint8) | |
| cv2.circle(template, (template_size//2, template_size//2), 12, 255, 2) | |
| result = cv2.matchTemplate(self.gray, template, cv2.TM_CCOEFF_NORMED) | |
| threshold = 0.5 | |
| locations = np.where(result >= threshold) | |
| for pt in zip(*locations[::-1]): | |
| center_x = pt[0] + template_size // 2 | |
| center_y = pt[1] + template_size // 2 | |
| too_close = False | |
| for (bx, by, _) in bubbles: | |
| if np.sqrt((center_x - bx)**2 + (center_y - by)**2) < 20: | |
| too_close = True | |
| break | |
| if not too_close: | |
| bubbles.append((center_x, center_y, 12)) | |
| return bubbles | |
| def detect_bubbles_by_contours(self): | |
| """Detect bubbles using contours - focusing on circular shapes""" | |
| bubbles = [] | |
| edge_params = [(30, 100), (50, 150), (20, 80)] | |
| for low, high in edge_params: | |
| edges = cv2.Canny(self.gray, low, high) | |
| contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| for contour in contours: | |
| area = cv2.contourArea(contour) | |
| if 150 < area < 900: | |
| (x, y), radius = cv2.minEnclosingCircle(contour) | |
| perimeter = cv2.arcLength(contour, True) | |
| if perimeter > 0: | |
| circularity = 4 * np.pi * area / (perimeter * perimeter) | |
| if circularity > 0.6 and 8 < radius < 22: | |
| is_dup = False | |
| for bub in bubbles: | |
| if np.sqrt((x - bub[0])**2 + (y - bub[1])**2) < 15: | |
| is_dup = True | |
| break | |
| if not is_dup: | |
| bubbles.append((int(x), int(y), int(radius))) | |
| return bubbles | |
| def organize_and_filter_bubbles(self, all_bubbles): | |
| if not all_bubbles: | |
| return [] | |
| filtered_bubbles = [] | |
| for bubble in all_bubbles: | |
| is_duplicate = False | |
| for existing in filtered_bubbles: | |
| dist = np.sqrt((bubble[0] - existing[0])**2 + (bubble[1] - existing[1])**2) | |
| if dist < 15: | |
| is_duplicate = True | |
| break | |
| if not is_duplicate: | |
| filtered_bubbles.append(bubble) | |
| filtered_bubbles.sort(key=lambda b: (b[1], b[0])) | |
| rows = [] | |
| current_row = [] | |
| row_threshold = 20 | |
| for bubble in filtered_bubbles: | |
| if not current_row: | |
| current_row.append(bubble) | |
| else: | |
| avg_y = np.mean([b[1] for b in current_row]) | |
| if abs(bubble[1] - avg_y) < row_threshold: | |
| current_row.append(bubble) | |
| else: | |
| if len(current_row) >= 4: | |
| current_row.sort(key=lambda b: b[0]) | |
| rows.append(current_row) | |
| current_row = [bubble] | |
| if len(current_row) >= 4: | |
| current_row.sort(key=lambda b: b[0]) | |
| rows.append(current_row) | |
| return rows | |
| def map_to_questions(self, bubble_rows): | |
| mapped_bubbles = [] | |
| options = ['A', 'B', 'C', 'D'] | |
| if not bubble_rows: | |
| return mapped_bubbles | |
| col1_max = self.width * 0.35 | |
| col2_max = self.width * 0.68 | |
| for row_idx, row in enumerate(bubble_rows[:20]): | |
| col1 = [b for b in row if b[0] < col1_max] | |
| col2 = [b for b in row if col1_max <= b[0] < col2_max] | |
| col3 = [b for b in row if b[0] >= col2_max] | |
| if len(col1) >= 4: | |
| col1_sorted = sorted(col1, key=lambda b: b[0])[:4] | |
| q_num = row_idx + 1 | |
| for opt_idx, bubble in enumerate(col1_sorted): | |
| mapped_bubbles.append(BubbleLocation(q_num, options[opt_idx], (bubble[0], bubble[1]), bubble[2])) | |
| if len(col2) >= 4: | |
| col2_sorted = sorted(col2, key=lambda b: b[0])[:4] | |
| q_num = row_idx + 21 | |
| for opt_idx, bubble in enumerate(col2_sorted): | |
| mapped_bubbles.append(BubbleLocation(q_num, options[opt_idx], (bubble[0], bubble[1]), bubble[2])) | |
| if row_idx < 10 and len(col3) >= 4: | |
| col3_sorted = sorted(col3, key=lambda b: b[0])[:4] | |
| q_num = row_idx + 41 | |
| for opt_idx, bubble in enumerate(col3_sorted): | |
| mapped_bubbles.append(BubbleLocation(q_num, options[opt_idx], (bubble[0], bubble[1]), bubble[2])) | |
| return mapped_bubbles | |
| def analyze_bubble_fill(self, bubble: BubbleLocation): | |
| mask = np.zeros(self.gray.shape, dtype=np.uint8) | |
| cv2.circle(mask, bubble.center, max(bubble.radius - 5, 5), 255, -1) | |
| mean_val = cv2.mean(self.gray, mask=mask)[0] | |
| large_ring_mask = np.zeros(self.gray.shape, dtype=np.uint8) | |
| cv2.circle(large_ring_mask, bubble.center, bubble.radius + 10, 255, -1) | |
| cv2.circle(large_ring_mask, bubble.center, bubble.radius + 5, 0, -1) | |
| surrounding_mean = cv2.mean(self.gray, mask=large_ring_mask)[0] | |
| bubble.darkness_score = surrounding_mean - mean_val | |
| darkness_threshold = 50 | |
| absolute_darkness_threshold = 150 # 150 | |
| bubble.filled = (bubble.darkness_score > darkness_threshold) and (mean_val < absolute_darkness_threshold) | |
| pixels = self.gray[mask > 0] | |
| if len(pixels) > 0: | |
| std_dev = np.std(pixels) | |
| if std_dev > 25 and mean_val < 170: | |
| bubble.filled = True | |
| if mean_val < 120: | |
| bubble.filled = True | |
| return bubble.filled | |
| def process(self): | |
| """Main processing pipeline""" | |
| print("Starting corrected OMR processing...") | |
| print("Detecting bubble locations...") | |
| all_bubbles = [] | |
| circles = self.find_bubble_grid() | |
| all_bubbles.extend(circles) | |
| contour_bubbles = self.detect_bubbles_by_contours() | |
| all_bubbles.extend(contour_bubbles) | |
| print(f" Contour bubbles found: {len(contour_bubbles)}") | |
| print(f"Total bubbles detected: {len(all_bubbles)}") | |
| if len(all_bubbles) < 180: | |
| print("Not enough bubbles detected, using grid-based approach...") | |
| grid_bubbles = self.detect_by_grid_assumption() | |
| all_bubbles.extend(grid_bubbles) | |
| print(f"Added {len(grid_bubbles)} bubbles from grid assumption") | |
| print("Organizing bubbles into grid...") | |
| bubble_rows = self.organize_and_filter_bubbles(all_bubbles) | |
| print(f"Organized into {len(bubble_rows)} rows") | |
| print("Mapping bubbles to questions...") | |
| self.bubbles = self.map_to_questions(bubble_rows) | |
| print(f"Mapped {len(self.bubbles)} bubble locations") | |
| print("Analyzing filled bubbles...") | |
| for bubble in self.bubbles: | |
| self.analyze_bubble_fill(bubble) | |
| print("Extracting final answers...") | |
| self.extract_answers() | |
| return self.answers | |
| def detect_by_grid_assumption(self): | |
| bubbles = [] | |
| col_starts = [60, 360, 660] | |
| bubble_spacing_x = 45 | |
| bubble_spacing_y = 28 | |
| start_y = 50 | |
| for col_idx, col_x in enumerate(col_starts): | |
| num_rows = 20 if col_idx < 2 else 10 | |
| for row in range(num_rows): | |
| y = start_y + row * bubble_spacing_y | |
| for opt in range(4): | |
| x = col_x + opt * bubble_spacing_x | |
| exists = False | |
| for existing in bubbles: | |
| if np.sqrt((x - existing[0])**2 + (y - existing[1])**2) < 20: | |
| exists = True | |
| break | |
| if not exists: | |
| bubbles.append((x, y, 13)) | |
| return bubbles | |
| def extract_answers(self): | |
| questions = defaultdict(list) | |
| for bubble in self.bubbles: | |
| questions[bubble.question_num].append(bubble) | |
| self.answers = {} | |
| for q_num in sorted(questions.keys()): | |
| q_bubbles = questions[q_num] | |
| filled = [b for b in q_bubbles if b.filled] | |
| if not filled: | |
| self.answers[q_num] = "---" | |
| elif len(filled) == 1: | |
| self.answers[q_num] = filled[0].option | |
| else: | |
| filled.sort(key=lambda b: b.darkness_score, reverse=True) | |
| self.answers[q_num] = filled[0].option | |
| return self.answers | |
| def visualize_results(self): | |
| result_img = self.image.copy() | |
| for bubble in self.bubbles: | |
| if bubble.filled: | |
| cv2.circle(result_img, bubble.center, bubble.radius, (0, 255, 0), 2) | |
| text = f"Q{bubble.question_num}:{bubble.option}" | |
| cv2.putText(result_img, text, | |
| (bubble.center[0] - 25, bubble.center[1] - bubble.radius - 5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0, 0, 255), 1) | |
| else: | |
| cv2.circle(result_img, bubble.center, bubble.radius, (100, 100, 255), 1) | |
| return result_img | |
| def display_results(self): | |
| print("\n" + "="*60) | |
| print("DETECTED ANSWERS") | |
| print("="*60) | |
| for i in range(1, 21): | |
| row_str = "" | |
| ans1 = self.answers.get(i, "---") | |
| row_str += f"Q{i:2d}: {ans1:^4} | " | |
| if i + 20 <= 40: | |
| ans2 = self.answers.get(i + 20, "---") | |
| row_str += f"Q{i+20:2d}: {ans2:^4} | " | |
| else: | |
| row_str += " " * 13 + "| " | |
| if i + 40 <= 50: | |
| ans3 = self.answers.get(i + 40, "---") | |
| row_str += f"Q{i+40:2d}: {ans3:^4}" | |
| print(row_str) | |
| print("\n" + "="*60) | |
| print("SUMMARY") | |
| print("="*60) | |
| answered = sum(1 for v in self.answers.values() if v != "---") | |
| print(f"Questions detected: {len(self.answers)}") | |
| print(f"Answered: {answered}") | |
| print(f"Unanswered: {len(self.answers) - answered}") | |
| def process_single_image(image_data) -> Dict[str, Any]: | |
| """Process a single image and return results with fixed indexing""" | |
| try: | |
| # Convert image data to numpy array | |
| if isinstance(image_data, str): | |
| # Base64 encoded image | |
| image_bytes = base64.b64decode(image_data) | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| image_array = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| else: | |
| # Direct file upload | |
| image = Image.open(image_data) | |
| image_array = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| # Process the image using the new CorrectedOMRReader | |
| reader = CorrectedOMRReader(image_array=image_array) | |
| answers = reader.process() | |
| # No need for indexing fix in the new implementation | |
| fixed_answers = answers | |
| # Calculate CORRECTED statistics for 50 questions total | |
| total_questions = 50 # Fixed to always be 50 | |
| answered = sum(1 for v in fixed_answers.values() if v is not None) | |
| unanswered = total_questions - answered | |
| # Format answers for JSON (convert None to "null" string) | |
| formatted_answers = {} | |
| for q_num in range(1, total_questions + 1): | |
| answer = fixed_answers.get(q_num) | |
| formatted_answers[str(q_num)] = answer if answer is not None else "null" | |
| return { | |
| "success": True, | |
| "answers": formatted_answers, | |
| "summary": { | |
| "total_questions": total_questions, | |
| "answered": answered, | |
| "unanswered": unanswered | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "answers": {}, | |
| "summary": { | |
| "total_questions": 50, | |
| "answered": 0, | |
| "unanswered": 50 | |
| } | |
| } | |
| def health_check(): | |
| """Health check endpoint""" | |
| return jsonify({ | |
| "status": "healthy", | |
| "message": "OMR API is running" | |
| }) | |
| def home(): | |
| """Home endpoint with API documentation""" | |
| return jsonify({ | |
| "message": "OMR Processing API", | |
| "version": "1.0", | |
| "endpoints": { | |
| "/process_omr": { | |
| "method": "POST", | |
| "description": "Process OMR answer sheets", | |
| "accepts": [ | |
| "Multipart form data with 'images' field", | |
| "JSON with base64 encoded images in 'images' array" | |
| ], | |
| "returns": "JSON with detected answers and summary" | |
| }, | |
| "/health": { | |
| "method": "GET", | |
| "description": "Health check endpoint" | |
| } | |
| }, | |
| "example_response": { | |
| "success": True, | |
| "answers": { | |
| "1": "A", | |
| "2": "B", | |
| "3": "null" | |
| }, | |
| "summary": { | |
| "total_questions": 50, | |
| "answered": 45, | |
| "unanswered": 5 | |
| } | |
| } | |
| }) | |
| # <-----------------> | |
| def easyocr_image(): | |
| if 'images' not in request.files: | |
| return jsonify({'error': 'No image files provided'}), 400 | |
| images = request.files.getlist('images') | |
| extracted_texts = [] | |
| for image_file in images: | |
| try: | |
| # Save the image to a temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_image_file: | |
| image_file.save(temp_image_file.name) | |
| temp_path = temp_image_file.name | |
| try: | |
| image_np = np.frombuffer(open(temp_path, 'rb').read(), np.uint8) | |
| image = cv2.imdecode(image_np, cv2.IMREAD_COLOR) | |
| # Perform OCR | |
| result = reader.readtext(image) | |
| # Extract text from the result | |
| text = " ".join([item[1] for item in result]) | |
| extracted_texts.append(text) | |
| ocr_extracted_texts.append(text) | |
| finally: | |
| # Clean up temp file | |
| if os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| except Exception as e: | |
| extracted_texts.append(f"Error processing image with EasyOCR: {str(e)}") | |
| return jsonify({'extracted_texts': extracted_texts}) | |
| def tesseract_image(): | |
| if 'images' not in request.files: | |
| return jsonify({'error': 'No image files provided'}), 400 | |
| images = request.files.getlist('images') | |
| extracted_texts = [] | |
| for image_file in images: | |
| try: | |
| # Save the image to a temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_image_file: | |
| image_file.save(temp_image_file.name) | |
| temp_path = temp_image_file.name | |
| try: | |
| with Image.open(temp_path) as image: | |
| # Perform OCR using Tesseract | |
| text = pytesseract.image_to_string(image) | |
| extracted_texts.append(text.strip()) | |
| ocr_extracted_texts.append(text.strip()) | |
| finally: | |
| # Clean up the temporary file | |
| if os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| except Exception as e: | |
| extracted_texts.append(f"Error processing image with Tesseract: {str(e)}") | |
| return jsonify({'extracted_texts': extracted_texts}) | |
| def process_question_paper(): | |
| global last_processed_question_paper_object | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file provided'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No file selected'}), 400 | |
| question_paper = QuestionPaper() | |
| try: | |
| # Create Images directory if it doesn't exist | |
| images_dir = os.path.join(app.root_path, 'Images') | |
| os.makedirs(images_dir, exist_ok=True) | |
| if file.filename.lower().endswith('.pdf'): | |
| question_paper_filename = "question_paper.pdf" | |
| question_paper_path = os.path.join(images_dir, question_paper_filename) | |
| file.save(question_paper_path) | |
| # Initialize the global object with the path | |
| question_paper.path = question_paper_path | |
| # For PDF processing | |
| images_from_pdf = convert_from_path(question_paper_path, poppler_path=_get_poppler_path()) | |
| all_text = "" | |
| for page_image in images_from_pdf: | |
| text = pytesseract.image_to_string(page_image) | |
| all_text += text + "\n" | |
| # Use improved parsing | |
| questions, answers = parse_question_paper_text(all_text) | |
| question_paper.questions = questions | |
| question_paper.answers = answers | |
| else: | |
| # Process as image | |
| question_paper_filename = "question_paper.png" | |
| question_paper_path = os.path.join(images_dir, question_paper_filename) | |
| file.save(question_paper_path) | |
| question_paper.path = question_paper_path | |
| image = Image.open(question_paper_path) | |
| text = pytesseract.image_to_string(image) | |
| # Use improved parsing | |
| questions, answers = parse_question_paper_text(text) | |
| question_paper.questions = questions | |
| question_paper.answers = answers | |
| # Clean the answers (remove any remaining unwanted patterns) | |
| question_paper.clean_answers() | |
| # Store the processed question paper globally | |
| last_processed_question_paper_object = question_paper | |
| return jsonify(question_paper.to_dict()) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def gemini_evaluate_answer_sheet_with_roll(question_paper_path, student_answer_path, questions, correct_answers, paddle_results): | |
| """ | |
| Evaluate entire answer sheet using Gemini and extract roll number | |
| """ | |
| try: | |
| model = genai.GenerativeModel('gemini-2.5-flash') | |
| # Create the expected answers list for the prompt | |
| expected_answers_text = "\n".join([f"{i+1}. {answer}" for i, answer in enumerate(correct_answers)]) | |
| prompt_text = f"""You are an OCR Assitant for an evaluvation script. | |
| You will be given an image of a question paper and an image of a student's handwritten answers along with traditional OCR evaluvations. | |
| Your task is assist the traditional OCR in overcoming its limitation with handwritten text the image may have bad quality handwritten text which the OCR may fail to extract and evaluvate properly, this is where you come in. | |
| Your task is to Just do a double check of the OCR results and correct any mistakes or missing answers. and provide the result in a structured way. | |
| Expected correct answers: | |
| {expected_answers_text} | |
| Traditional OCR Evaluation Results: | |
| {paddle_results} | |
| Instructions: | |
| - First, identify and extract the student's roll number from the answer sheet | |
| - Compare the student's handwritten answers with the expected answers above | |
| - Small spelling mistakes should be ignored and considered correct | |
| - If an answer has been crossed out or strikethrough, consider it incorrect | |
| - Be lenient with handwriting recognition issues | |
| - Look for answers by question numbers (1, 2, 3, etc.) | |
| Please evaluate ALL questions and respond in this EXACT JSON format: | |
| {{ | |
| "roll_number": "extracted_roll_number_here", | |
| "evaluations": [ | |
| {{"question_number": 1, "status": "Correct"}}, | |
| {{"question_number": 2, "status": "Wrong"}}, | |
| {{"question_number": 3, "status": "Missing"}}, | |
| ... | |
| ] | |
| }} | |
| For roll_number: Look for patterns like "Roll No:", "Roll Number:", "Reg No:", or any number sequence that appears to be a student identifier. | |
| For each question, use ONLY one of these three status values: | |
| - "Correct" - if the student's answer matches the expected answer (allowing for minor spelling) | |
| - "Wrong" - if the student's answer is clearly different from the expected answer | |
| - "Missing" - if no answer is visible for this question number | |
| Respond with ONLY the JSON format above, no other text. | |
| ! Note | |
| Ignore texts like `GENERAL KNOWLEDGE QUESTION PAPER WITH ANSWERS` and the final output should only have actual questions. | |
| """ | |
| # Handle PDF vs Image for question paper | |
| if question_paper_path.lower().endswith('.pdf'): | |
| # Convert PDF to images | |
| pdf_images = convert_from_path(question_paper_path, poppler_path=_get_poppler_path()) | |
| question_paper_img = pdf_images[0] # Use first page | |
| else: | |
| question_paper_img = Image.open(question_paper_path) | |
| # Load student answer image | |
| student_answer_img = Image.open(student_answer_path) | |
| # Create content for the model | |
| content = [prompt_text, question_paper_img, student_answer_img] | |
| response = model.generate_content(content) | |
| result_text = response.text.strip() | |
| print(f"Gemini response: {result_text}") | |
| # Try to parse JSON response | |
| import json | |
| try: | |
| # Clean the response - sometimes Gemini adds markdown formatting | |
| if "```json" in result_text: | |
| result_text = result_text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in result_text: | |
| result_text = result_text.split("```")[1].strip() | |
| parsed_result = json.loads(result_text) | |
| return parsed_result["roll_number"], parsed_result["evaluations"] | |
| except (json.JSONDecodeError, KeyError) as e: | |
| print(f"Failed to parse JSON response: {e}") | |
| print(f"Raw response: {result_text}") | |
| # Fallback - extract roll number using OCR and create default "Error" results | |
| roll_number = extract_roll_number(student_answer_path) | |
| return roll_number, [{"question_number": i+1, "status": "Error"} for i in range(len(correct_answers))] | |
| except Exception as e: | |
| print(f"Error in Gemini evaluation: {str(e)}") | |
| # Return error status for all questions with OCR extracted roll number | |
| roll_number = extract_roll_number(student_answer_path) | |
| return roll_number, [{"question_number": i+1, "status": "Error"} for i in range(len(correct_answers))] | |
| def quick_match(correct_list, messy_student_list, min_score=80): | |
| """Quick function to match messy student answers""" | |
| from fuzzywuzzy import process | |
| import re | |
| results = [] | |
| used = set() | |
| for item in messy_student_list: | |
| # Extract content | |
| content = re.sub(r'^\d+\.?\s*', '', str(item)).strip() | |
| if content and content != '-': | |
| # Find best match | |
| match = process.extractOne(content, correct_list) | |
| if match and match[1] >= min_score: | |
| q_num = correct_list.index(match[0]) + 1 | |
| if q_num not in used: | |
| used.add(q_num) | |
| results.append((item, q_num, match[0], match[1])) | |
| return results | |
| def process_with_paddle_ocr(image_path, correct_answers): | |
| """ | |
| Process an image with PaddleOCR and perform similarity matching with correct answers | |
| Returns: | |
| tuple: (extracted_text, similarity_scores, average_similarity) | |
| """ | |
| try: | |
| # Initialize PaddleOCR | |
| from paddleocr import PaddleOCR | |
| print("Initializing PaddleOCR...") | |
| ocr = PaddleOCR( | |
| use_doc_orientation_classify=True, | |
| use_doc_unwarping=False, | |
| use_textline_orientation=False | |
| ) | |
| print("PaddleOCR initialized.") | |
| # Read and process the image | |
| # result = ocr.ocr(image_path, cls=True) | |
| print("Preditcing") | |
| result = ocr.predict(image_path) | |
| print("PaddleOCR processing completed.") | |
| # print(f"PaddleOCR result: {result}") | |
| print("Correct Answers are:") | |
| print(correct_answers) | |
| for res in result: | |
| words = res["rec_texts"] | |
| print(f"PaddleOCR extracted words: {words}") | |
| # words = result["rec_texts"] | |
| result = quick_match(correct_answers, words, min_score=85) | |
| print(f"PaddleOCR matched results: {result}") | |
| return result | |
| except Exception as e: | |
| print(f"Error in PaddleOCR processing: {str(e)}") | |
| return None, [], 0 | |
| # OCR Evaluvation Endpoint | |
| def evaluate_answers(): | |
| global ocr_extracted_texts | |
| if 'student_answers' not in request.files: | |
| return jsonify({"error": "Missing student answers"}), 400 | |
| student_answer_files = request.files.getlist('student_answers') | |
| # Get teacher email and exam name from the request | |
| teacher_email = request.form.get('teacher_email', 'unknown@example.com') | |
| exam_name = request.form.get('exam_name', 'Untitled Exam') # Get exam name from form data | |
| # Retrieve the question paper object | |
| question_paper = last_processed_question_paper_object | |
| if last_processed_question_paper_object is None: | |
| return jsonify({'error': 'Question paper not found or processed yet'}), 404 | |
| student_answer_paths = [] | |
| try: | |
| # Save student answer files temporarily | |
| for student_answer_file in student_answer_files: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_ans_file: | |
| student_answer_file.save(temp_ans_file.name) | |
| student_answer_paths.append(temp_ans_file.name) | |
| # Process each student's answer sheet | |
| all_students_results = [] | |
| if question_paper.path and os.path.exists(question_paper.path): | |
| print(f"Starting Gemini evaluation for exam: {exam_name} with {len(student_answer_paths)} students...") | |
| for idx, student_answer_path in enumerate(student_answer_paths): | |
| print(f"Processing answer sheet {idx + 1} with PaddleOCR...") | |
| # First process with PaddleOCR | |
| results = process_with_paddle_ocr( | |
| student_answer_path, | |
| question_paper.answers | |
| ) | |
| roll_number, sheet_evaluations = gemini_evaluate_answer_sheet_with_roll( | |
| question_paper.path, | |
| student_answer_path, | |
| question_paper.questions, | |
| question_paper.answers, | |
| results | |
| ) | |
| # Process the results for this student | |
| student_results = [] | |
| for eval_result in sheet_evaluations: | |
| question_num = eval_result["question_number"] | |
| if 1 <= question_num <= len(question_paper.questions): | |
| student_results.append({ | |
| 'question_number': question_num, | |
| 'question_text': question_paper.questions[question_num - 1], | |
| 'correct_answer': question_paper.answers[question_num - 1], | |
| 'status': eval_result["status"] | |
| }) | |
| # Calculate summary for this student | |
| correct_count = sum(1 for result in student_results if result['status'] == 'Correct') | |
| total_questions = len(student_results) | |
| score_percentage = (correct_count / total_questions) * 100 if total_questions > 0 else 0 | |
| student_summary = { | |
| 'roll_number': roll_number, | |
| 'total_questions': len(question_paper.answers), | |
| 'correct_answers': correct_count, | |
| 'wrong_answers': sum(1 for result in student_results if result['status'] == 'Wrong'), | |
| 'missing_answers': sum(1 for result in student_results if result['status'] == 'Missing'), | |
| 'error_answers': sum(1 for result in student_results if result['status'] == 'Error'), | |
| 'score_percentage': round(score_percentage, 2), | |
| 'evaluation_results': student_results, | |
| 'ocr_results': { | |
| 'extracted_text': results, | |
| } | |
| } | |
| all_students_results.append(student_summary) | |
| final_results = { | |
| 'exam_name': exam_name, # Include exam name in results | |
| 'total_students': len(student_answer_paths), | |
| 'students_evaluated': all_students_results | |
| } | |
| # STORE THE RESULTS IN SUPABASE WITH EXAM NAME | |
| try: | |
| supabase_handler = SupabaseHandler() | |
| unique_key = supabase_handler.store_evaluation_result(teacher_email, final_results, exam_name) | |
| if unique_key: | |
| # Add the unique key to the response | |
| final_results['unique_key'] = unique_key | |
| final_results['storage_success'] = True | |
| print(f"Results stored successfully with key: {unique_key} for exam: {exam_name}") | |
| else: | |
| final_results['storage_success'] = False | |
| final_results['storage_error'] = "Failed to store results in database" | |
| print("Failed to store results in Supabase") | |
| except Exception as storage_error: | |
| print(f"Error storing results: {str(storage_error)}") | |
| final_results['storage_success'] = False | |
| final_results['storage_error'] = str(storage_error) | |
| return jsonify(final_results) | |
| else: | |
| return jsonify({ | |
| 'error': 'Question paper file not found for Gemini evaluation.' | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| finally: | |
| # Clean up temporary student answer files | |
| for path in student_answer_paths: | |
| try: | |
| if os.path.exists(path): | |
| os.unlink(path) | |
| except PermissionError: | |
| pass # File still locked on Windows; OS will clean up temp dir | |
| # Get Evaluation | |
| def get_evaluation_result(unique_key): | |
| """ | |
| Get evaluation result by unique key | |
| """ | |
| try: | |
| supabase_handler = SupabaseHandler() | |
| result = supabase_handler.get_evaluation_result(unique_key) | |
| if result: | |
| return jsonify({ | |
| 'success': True, | |
| 'data': result | |
| }) | |
| else: | |
| return jsonify({ | |
| 'error': 'Evaluation result not found' | |
| }), 404 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # Get Teacher Evaluation | |
| def get_teacher_evaluations(teacher_email): | |
| """ | |
| Get all evaluation results for a specific teacher | |
| """ | |
| try: | |
| supabase_handler = SupabaseHandler() | |
| results = supabase_handler.get_teacher_evaluations(teacher_email) | |
| return jsonify({ | |
| 'success': True, | |
| 'data': results, | |
| 'total_evaluations': len(results) | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # Get OMR Answer Key | |
| def get_omr_answer_key(): | |
| """Get the currently stored OMR answer key""" | |
| global last_processed_omr_key | |
| if last_processed_omr_key is None: | |
| return jsonify({ | |
| 'error': 'No answer key has been processed yet' | |
| }), 404 | |
| return jsonify({ | |
| 'success': True, | |
| 'answer_key': last_processed_omr_key.to_dict() | |
| }) | |
| def omr_gemini_process(error_questions, correct_answers, image_file): | |
| """ | |
| Use Gemini to assist in evaluating OMR sheets, especially for error questions | |
| """ | |
| try: | |
| model = genai.GenerativeModel('gemini-2.5-flash') | |
| prompt_text = f""" | |
| You are an OMR Assistant for an evaluvation script. | |
| Your main purpose is to assist in the process. | |
| Correct Answers to questions sorted by question number: {correct_answers} | |
| Error Question numbers: {error_questions} | |
| Your task: | |
| - From the given image identify the student name and roll number | |
| - if for some reason the traditional OMR Processing failed to detect some answers, those question numbers will be provided to you, you should look into those questions form the given image and correct answers. | |
| - Only provide answer for the questions that are in the error list. | |
| - You can ignore the rest of the question | |
| - if Error question is empty, just extract the roll number and name | |
| Please evaluate ALL questions and respond in this EXACT JSON format: | |
| {{ | |
| "roll_number": "extracted_roll_number_here", | |
| "evaluations": [ | |
| {{"question_number": 1, "status": "Correct"}}, | |
| {{"question_number": 2, "status": "Wrong"}}, | |
| {{"question_number": 3, "status": "Missing"}}, | |
| ... | |
| ] | |
| }} | |
| """ | |
| student_answer_img = image_file | |
| content = [prompt_text, student_answer_img] | |
| response = model.generate_content(content) | |
| result_text = response.text.strip() | |
| print(f"Gemini response: {result_text}") | |
| import json | |
| try: | |
| # Clean the response - sometimes Gemini adds markdown formatting | |
| if "```json" in result_text: | |
| result_text = result_text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in result_text: | |
| result_text = result_text.split("```")[1].strip() | |
| parsed_result = json.loads(result_text) | |
| return parsed_result["roll_number"], parsed_result["evaluations"] | |
| except (json.JSONDecodeError, KeyError) as e: | |
| print(f"Failed to parse JSON response: {e}") | |
| print(f"Raw response: {result_text}") | |
| # Fallback - extract roll number using OCR and create default "Error" results | |
| roll_number = extract_roll_number(os.path.join("OMRChecker", "inputs", "OMRImage.jpg")) | |
| return roll_number, [{"question_number": i+1, "status": "Error"} for i in range(len(correct_answers))] | |
| except Exception as e: | |
| print(f"Error in OMR Gemini processing: {str(e)}") | |
| return "Unknown", [{"question_number": q, "status": "Error"} for q in error_questions] | |
| def evaluate_omr(): | |
| """ | |
| Evaluate OMR answers against stored answer key | |
| """ | |
| global last_processed_omr_key, last_processed_omr_results, porcessed_omr_results, OMR_IMAGES | |
| # Get teacher email and exam name from the request | |
| teacher_email = request.form.get('teacher_email', 'unknown@example.com') | |
| exam_name = request.form.get('exam_name', 'Untitled Exam') # Get exam name from form data | |
| if not last_processed_omr_key: | |
| return jsonify({ | |
| 'error': 'No answer key has been processed. Please process an answer key first.' | |
| }), 400 | |
| if not last_processed_omr_results: | |
| return jsonify({ | |
| 'error': 'No OMR sheet has been processed. Please process an OMR sheet first.' | |
| }), 400 | |
| try: | |
| # Get the marked answers from the processed OMR | |
| if isinstance(last_processed_omr_results, list): | |
| omr_data = last_processed_omr_results[0] # Take first sheet if multiple | |
| else: | |
| omr_data = last_processed_omr_results | |
| student_datas = [] | |
| for idx, omr_data in enumerate(porcessed_omr_results): | |
| marked_answers = omr_data | |
| image_file = OMR_IMAGES[idx] | |
| # Get correct answers from answer key (only for questions that exist) | |
| correct_answers = last_processed_omr_key.answers | |
| total_questions_in_key = len(correct_answers) | |
| # Evaluate answers only for questions that exist in the answer key | |
| evaluation_details = [] | |
| correct_count = 0 | |
| wrong_count = 0 | |
| missing_count = 0 | |
| error_questions = [] | |
| for q_num in sorted(correct_answers.keys()): | |
| print(f"Evaluating Question {q_num}") | |
| print(f"Correct Answer: {correct_answers[q_num]} | Marked Answer: {marked_answers.get(str(q_num))}") | |
| correct_ans = correct_answers[q_num] | |
| marked_ans = marked_answers.get(str(q_num)) | |
| if marked_ans is None or marked_ans == '' or len(str(marked_ans)) > 1 or marked_ans == 'nan': | |
| status = 'Missing' | |
| error_questions.append(q_num) | |
| missing_count += 1 | |
| elif marked_ans.upper() == correct_ans.upper(): | |
| status = 'Correct' | |
| correct_count += 1 | |
| else: | |
| status = 'Wrong' | |
| wrong_count += 1 | |
| evaluation_details.append({ | |
| 'question_number': q_num, | |
| 'question_text': last_processed_omr_key.questions[q_num - 1] if q_num <= len(last_processed_omr_key.questions) else f"Question {q_num}", | |
| 'correct_answer': correct_ans, | |
| 'marked_answer': marked_ans if marked_ans != 'null' else None, | |
| 'status': status | |
| }) | |
| roll_no, gemini_result = omr_gemini_process( | |
| error_questions, | |
| last_processed_omr_key.answers, | |
| image_file | |
| ) | |
| for err_idx in error_questions: | |
| for gemini_eval in gemini_result: | |
| if gemini_eval["question_number"] == err_idx: | |
| correct_ans = last_processed_omr_key.answers[err_idx] | |
| marked_ans = None # Since it was an error question | |
| status = gemini_eval["status"] | |
| if status == "Correct": | |
| correct_count += 1 | |
| # wrong_count -= 1 # Adjust wrong count | |
| missing_count -= 1 # Adjust missing count | |
| elif status == "Wrong": | |
| wrong_count += 1 | |
| missing_count -= 1 # Adjust missing count | |
| elif status == "Missing": | |
| missing_count += 1 | |
| # Update the evaluation details | |
| for eval_detail in evaluation_details: | |
| if eval_detail['question_number'] == err_idx: | |
| eval_detail.update({ | |
| 'marked_answer': marked_ans, | |
| 'status': status | |
| }) | |
| break | |
| break | |
| # Calculate score | |
| total_score = correct_count * last_processed_omr_key.marks_per_question | |
| if last_processed_omr_key.negative_marking > 0: | |
| total_score -= wrong_count * last_processed_omr_key.negative_marking | |
| max_score = total_questions_in_key * last_processed_omr_key.marks_per_question | |
| student_summary = { | |
| 'roll_number': roll_no, | |
| 'total_questions': len(last_processed_omr_key.answers), | |
| 'correct_answers': correct_count, | |
| 'wrong_answers': wrong_count, | |
| 'missing_answers': missing_count, | |
| 'error_answers': len(error_questions), | |
| 'score_percentage': correct_count / len(last_processed_omr_key.answers) * 100 if len(last_processed_omr_key.answers) > 0 else 0, | |
| 'evaluation_results': evaluation_details, | |
| 'ocr_results': { | |
| 'extracted_text': gemini_result, | |
| } | |
| } | |
| student_datas.append(student_summary) | |
| # Format the data in the required structure for Supabase | |
| formatted_evaluation_data = { | |
| 'exam_name': exam_name, # Include exam name in results | |
| 'total_students': len(student_datas), | |
| 'students_evaluated': student_datas | |
| } | |
| # Store results in Supabase (optional — skip if credentials not configured) | |
| unique_key = None | |
| try: | |
| supabase_handler = SupabaseHandler() | |
| unique_key = supabase_handler.store_evaluation_result(teacher_email, formatted_evaluation_data, exam_name) | |
| except Exception as supa_err: | |
| print(f"Supabase storage skipped: {supa_err}") | |
| # Prepare answer key info | |
| answer_key_info = { | |
| "title": getattr(last_processed_omr_key, 'title', 'Untitled'), | |
| "marks_per_question": last_processed_omr_key.marks_per_question, | |
| "negative_marking": last_processed_omr_key.negative_marking | |
| } | |
| # Return response in the same format as stored in Supabase | |
| final_result = { | |
| "success": True, | |
| "unique_key": unique_key, | |
| #**formatted_evaluation_data, # Include all the formatted data | |
| "additional_info": { | |
| "answer_key_info": answer_key_info | |
| } | |
| } | |
| return jsonify(final_result) | |
| except Exception as e: | |
| return jsonify({ | |
| "success": False, | |
| "error": f"Evaluation failed: {str(e)}" | |
| }), 500 | |
| def process_with_gemini(evaluation_details, evaluation_summary, omr_data): | |
| """ | |
| Use Gemini to independently evaluate the OMR sheet and extract student details | |
| """ | |
| global last_processed_omr_key | |
| try: | |
| model = genai.GenerativeModel('gemini-2.5-flash') | |
| # Prepare the questions and correct answers for Gemini | |
| questions_and_answers = "" | |
| for i, (q_num, correct_answer) in enumerate(sorted(last_processed_omr_key.answers.items())): | |
| question_text = last_processed_omr_key.questions[i] if i < len(last_processed_omr_key.questions) else f"Question {q_num}" | |
| questions_and_answers += f"Question {q_num}: {question_text}\nCorrect Answer: {correct_answer}\n\n" | |
| prompt = f""" | |
| You are a teacher grading an OMR answer sheet. | |
| STUDENT INFO: Extract the student's name and roll number from the image. | |
| GRADING TASK: For each question, identify which bubble (A, B, C, or D) is filled/darkened, then compare with the correct answer. | |
| QUESTIONS AND CORRECT ANSWERS: | |
| {questions_and_answers} | |
| IMPORTANT: Look carefully at each row of bubbles. A filled bubble will be darkened/shaded, while empty bubbles will be white/clear. | |
| Respond in this EXACT JSON format: | |
| {{ | |
| "student_info": {{ | |
| "name": "extracted student name", | |
| "roll_no": "extracted roll number" | |
| }}, | |
| "gemini_evaluation": [ | |
| {{"question": 1, "marked_answer": "C", "correct_answer": "C", "status": "Correct"}}, | |
| {{"question": 2, "marked_answer": "D", "correct_answer": "D", "status": "Correct"}}, | |
| // ... continue for all questions | |
| ] | |
| }} | |
| For status: use "Correct", "Wrong", or "Missing" only. | |
| For marked_answer: use "A", "B", "C", "D", or null if no bubble is clearly filled. | |
| """ | |
| # Get the image - we need to retrieve it from the last processed OMR | |
| # Since we don't store the image directly, we'll need to work with what we have | |
| # For now, let's assume we have access to the image file | |
| # Check if we have image data stored | |
| if 'image_data' in omr_data: | |
| # If we have base64 image data | |
| image_data = omr_data['image_data'] | |
| image_bytes = base64.b64decode(image_data) | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| elif 'filename' in omr_data: | |
| # Try to find the image file | |
| try: | |
| # Look for the image in common locations | |
| possible_paths = [ | |
| f"Images/{omr_data['filename']}", | |
| f"temp/{omr_data['filename']}", | |
| omr_data['filename'] | |
| ] | |
| image = None | |
| for path in possible_paths: | |
| if os.path.exists(path): | |
| image = Image.open(path) | |
| break | |
| if image is None: | |
| # If we can't find the image, return a fallback result | |
| return { | |
| "student_info": { | |
| "name": "Image not available", | |
| "roll_number": "Image not available" | |
| }, | |
| "verification": { | |
| "evaluation_correct": "unknown", | |
| "confidence": "low", | |
| "discrepancies": ["Original image not available for verification"], | |
| "notes": "Could not verify due to missing image file" | |
| }, | |
| "gemini_evaluation": [] | |
| } | |
| except Exception as e: | |
| print(f"Error loading image: {str(e)}") | |
| return { | |
| "student_info": { | |
| "name": "Error loading image", | |
| "roll_number": "Error loading image" | |
| }, | |
| "verification": { | |
| "evaluation_correct": "unknown", | |
| "confidence": "low", | |
| "discrepancies": [f"Error loading image: {str(e)}"], | |
| "notes": "Image processing failed" | |
| }, | |
| "gemini_evaluation": [] | |
| } | |
| else: | |
| # No image reference available | |
| return { | |
| "student_info": { | |
| "name": "No image data", | |
| "roll_number": "No image data" | |
| }, | |
| "verification": { | |
| "evaluation_correct": "unknown", | |
| "confidence": "low", | |
| "discrepancies": ["No image data available"], | |
| "notes": "Cannot verify without image" | |
| }, | |
| "gemini_evaluation": [] | |
| } | |
| # Generate content with Gemini | |
| response = model.generate_content([prompt, image]) | |
| result_text = response.text.strip() | |
| print(f"Gemini raw response: {result_text}") | |
| # Parse the JSON response | |
| try: | |
| # Clean the response - remove markdown formatting if present | |
| if "```json" in result_text: | |
| result_text = result_text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in result_text: | |
| result_text = result_text.split("```")[1].strip() | |
| parsed_result = json.loads(result_text) | |
| # Update summary counts and score based on the evaluation | |
| if 'gemini_evaluation' in parsed_result: | |
| correct_count = sum(1 for item in parsed_result['gemini_evaluation'] if item.get('status') == 'Correct') | |
| wrong_count = sum(1 for item in parsed_result['gemini_evaluation'] if item.get('status') == 'Wrong') | |
| missing_count = sum(1 for item in parsed_result['gemini_evaluation'] if item.get('status') == 'Missing') | |
| score = (correct_count * last_processed_omr_key.marks_per_question) - (wrong_count * last_processed_omr_key.negative_marking) | |
| max_score = len(last_processed_omr_key.answers) * last_processed_omr_key.marks_per_question | |
| parsed_result['summary'] = { | |
| "total_questions": len(last_processed_omr_key.answers), | |
| "correct_count": correct_count, | |
| "wrong_count": wrong_count, | |
| "missing_count": missing_count, | |
| "score": score, | |
| "max_score": max_score, | |
| "percentage": round((score / max_score) * 100, 2) if max_score > 0 else 0 | |
| } | |
| return parsed_result | |
| except json.JSONDecodeError as e: | |
| print(f"Failed to parse Gemini JSON response: {e}") | |
| print(f"Raw response: {result_text}") | |
| # Fallback response with extracted text attempt | |
| return { | |
| "student_info": { | |
| "name": "Parse error", | |
| "roll_number": "Parse error" | |
| }, | |
| "verification": { | |
| "evaluation_correct": "unknown", | |
| "confidence": "low", | |
| "discrepancies": ["Failed to parse Gemini response"], | |
| "notes": f"JSON parse error: {str(e)}" | |
| }, | |
| "gemini_evaluation": [], | |
| "raw_response": result_text # Include raw response for debugging | |
| } | |
| # print(f"Error in Gemini processing: {str(e)}") | |
| except Exception as e: | |
| return { | |
| "student_info": { | |
| "name": "Processing error", | |
| "roll_number": "Processing error" | |
| }, | |
| "verification": { | |
| "evaluation_correct": "unknown", | |
| "confidence": "low", | |
| "discrepancies": [f"Gemini processing error: {str(e)}"], | |
| "notes": "Failed to process with Gemini" | |
| }, | |
| "gemini_evaluation": [] | |
| } | |
| def compare_evaluations(our_evaluation, gemini_evaluation): | |
| """ | |
| Compare our automated evaluation with Gemini's independent evaluation | |
| """ | |
| if not gemini_evaluation: | |
| return { | |
| "comparison_available": False, | |
| "reason": "Gemini evaluation not available" | |
| } | |
| matches = 0 | |
| differences = [] | |
| total_compared = 0 | |
| # Create a lookup for our evaluation | |
| our_eval_lookup = {detail['question_number']: detail for detail in our_evaluation} | |
| for gemini_item in gemini_evaluation: | |
| q_num = gemini_item.get('question') | |
| if q_num in our_eval_lookup: | |
| total_compared += 1 | |
| our_status = our_eval_lookup[q_num]['status'] | |
| gemini_status = gemini_item.get('status') | |
| if our_status == gemini_status: | |
| matches += 1 | |
| else: | |
| differences.append({ | |
| "question": q_num, | |
| "our_evaluation": { | |
| "marked_answer": our_eval_lookup[q_num]['marked_answer'], | |
| "status": our_status | |
| }, | |
| "gemini_evaluation": { | |
| "marked_answer": gemini_item.get('marked_answer'), | |
| "status": gemini_status | |
| } | |
| }) | |
| agreement_rate = (matches / total_compared) * 100 if total_compared > 0 else 0 | |
| return { | |
| "comparison_available": True, | |
| "total_questions_compared": total_compared, | |
| "agreements": matches, | |
| "differences_count": len(differences), | |
| "agreement_rate": round(agreement_rate, 2), | |
| "differences": differences | |
| } | |
| # Also need to modify the process_omr endpoint to store image data for later use | |
| def process_omr_enhanced(): | |
| """ | |
| Enhanced OMR processing that stores image data for later Gemini processing | |
| """ | |
| global last_processed_omr_results | |
| global OMR_IMAGES | |
| global porcessed_omr_results | |
| OMR_IMAGES = [] | |
| porcessed_omr_results = [] | |
| try: | |
| results = [] | |
| print("Starting OMR processing...") | |
| # Check if files were uploaded | |
| if 'images' in request.files: | |
| files = request.files.getlist('images') | |
| results = [] | |
| for idx, file in enumerate(files): | |
| if file.filename == '': | |
| continue | |
| print(f"===================================== Processing file {file.filename} =====================================") | |
| name, extension = os.path.splitext(file.filename) | |
| filename = os.path.join("OMRChecker", "inputs", "OMRImage" + extension) | |
| file.save(filename) | |
| OMR_IMAGES.append(Image.open(filename)) | |
| result = subprocess.run([sys.executable, os.path.join('OMRChecker', 'main.py'), '--inputDir=' + os.path.join('OMRChecker', 'inputs')]) | |
| print("OMR Finished Processing Successfully") | |
| folder = os.path.join("outputs", "Results") | |
| csv_files = [f for f in os.listdir(folder) if f.endswith(".csv")] | |
| print("CSV FILES:", csv_files) | |
| result_file = os.path.join(folder, csv_files[0]) | |
| print("Found Result File", result_file) | |
| df = pd.read_csv(result_file) | |
| # Convert to JSON | |
| data_json = df.to_json(orient="records") | |
| parsed_json = json.loads(data_json) | |
| columns_dict = df.to_dict(orient="list") | |
| print(columns_dict) | |
| questions_only = {k.replace("q", ""): v[0] for k, v in columns_dict.items() if k.startswith("q")} | |
| last_processed_omr_results = questions_only | |
| porcessed_omr_results.append(questions_only) | |
| if os.path.exists(result_file): | |
| os.remove(result_file) | |
| print(f"{result_file} deleted") | |
| return jsonify(parsed_json) | |
| else: | |
| return jsonify({ | |
| "success": False, | |
| "error": "No images provided. Use 'images' field for file uploads.", | |
| "results": [] | |
| }), 400 | |
| except Exception as e: | |
| return jsonify({ | |
| "success": False, | |
| "error": f"Server error: {str(e)}", | |
| "results": [] | |
| }), 500 | |
| def get_question_details(question_number): | |
| """Get detailed information about a specific question""" | |
| global last_processed_omr_key | |
| if last_processed_omr_key is None: | |
| return jsonify({ | |
| 'error': 'No answer key has been processed yet' | |
| }), 404 | |
| question_data = last_processed_omr_key.get_question_details(question_number) | |
| if question_data is None: | |
| return jsonify({ | |
| 'error': f'Question number {question_number} not found' | |
| }), 404 | |
| return jsonify({ | |
| 'success': True, | |
| 'question_data': question_data | |
| }) | |
| def debug_parsing(): | |
| """ | |
| Debug endpoint to see how OCR text is being parsed | |
| """ | |
| if not ocr_extracted_texts: | |
| return jsonify({'error': 'No OCR extracted texts available.'}), 400 | |
| debug_results = [] | |
| for ocr_text in ocr_extracted_texts: | |
| parsed_answers = improved_clean_and_parse_ocr_text(ocr_text) | |
| debug_results.append({ | |
| 'original_ocr_text': ocr_text, | |
| 'parsed_answers': parsed_answers | |
| }) | |
| return jsonify({'debug_results': debug_results}) | |
| def extract_omr_metadata(text: str) -> tuple: | |
| """Extract title and duration from the question paper text""" | |
| title = "" | |
| duration = "" | |
| # Look for title (usually in first few lines, often in caps) | |
| lines = text.split('\n') | |
| for line in lines[:5]: # Check first 5 lines | |
| if line.strip().upper() == line.strip() and len(line.strip()) > 10: | |
| title = line.strip() | |
| break | |
| # Look for duration/time | |
| time_pattern = r'Time:\s*(\d+)\s*(minutes|mins|min)' | |
| duration_match = re.search(time_pattern, text, re.IGNORECASE) | |
| if duration_match: | |
| duration = f"{duration_match.group(1)} minutes" | |
| return title, duration | |
| def extract_omr_answers(text: str) -> dict: | |
| """Extract answers from the question paper text""" | |
| answers = {} | |
| questions = [] | |
| question_data = [] | |
| current_question = None | |
| print("\nStarting answer extraction...") | |
| # Split text into lines and process line by line | |
| lines = [line.strip() for line in text.split('\n') if line.strip()] | |
| # Skip header lines until we find the first question | |
| started = False | |
| current_dict = None | |
| for line in lines: | |
| print(f"Processing line: {line}") | |
| # Skip header or empty lines | |
| if not started: | |
| if line.startswith('1.'): | |
| started = True | |
| else: | |
| continue | |
| # Check for new question | |
| question_match = re.match(r'^(\d+)[.)](.*?)$', line) | |
| if question_match: | |
| # Save previous question if exists | |
| if current_dict: | |
| question_data.append(current_dict) | |
| # Start new question | |
| q_num = int(question_match.group(1)) | |
| q_text = question_match.group(2).strip() | |
| current_dict = { | |
| 'number': q_num, | |
| 'question': q_text, | |
| 'options': {}, | |
| 'answer': None | |
| } | |
| continue | |
| # Check for options | |
| option_match = re.match(r'^([A-D])[).](.*?)$', line) | |
| if option_match and current_dict is not None: | |
| opt_letter = option_match.group(1) | |
| opt_text = option_match.group(2).strip() | |
| current_dict['options'][opt_letter] = opt_text | |
| continue | |
| # Check for answer | |
| answer_match = re.match(r'^\s*Answer[:\s]*([A-D]|.+)$', line, re.IGNORECASE) | |
| if answer_match and current_dict is not None: | |
| answer = answer_match.group(1).strip() | |
| # print(f"For Question: {current_dict['number']}, Options are:") | |
| # print(current_dict['options']) | |
| for opt_letter, opt_text in current_dict['options'].items(): | |
| if answer.lower() == opt_text.lower(): | |
| answer = opt_letter | |
| break | |
| current_dict['answer'] = answer | |
| continue | |
| # Add last question | |
| if current_dict: | |
| question_data.append(current_dict) | |
| print("\nExtracted Question Data:") | |
| for q in question_data: | |
| print(f"\nQuestion {q['number']}:") | |
| print(f"Text: {q['question']}") | |
| print(f"Options: {q['options']}") | |
| print(f"Answer: {q['answer']}") | |
| # Add to return format | |
| if q['answer']: | |
| answers[q['number']] = q['answer'] | |
| questions.append(f"{q['number']}. {q['question']}") | |
| print(f"\nExtracted {len(questions)} questions and {len(answers)} answers") | |
| print("Questions:", questions) | |
| print("Answers:", answers) | |
| return answers, questions | |
| def debug_text_extraction(text: str): | |
| """Helper function to debug text extraction issues""" | |
| print("=== Extracted Text ===") | |
| print(text) | |
| print("\n=== Line by Line Analysis ===") | |
| for line in text.split('\n'): | |
| if line.strip(): | |
| print(f"Line: {line.strip()}") | |
| def process_omr_answer_key(): | |
| """ | |
| Process OMR answer key from either: | |
| 1. JSON format with direct answers | |
| 2. PDF/Image of question paper with answers marked | |
| For JSON format: | |
| { | |
| "answers": { | |
| "1": "A", | |
| "2": "B", | |
| ... | |
| }, | |
| "marks_per_question": 1.0, # optional, defaults to 1 | |
| "negative_marking": 0.0 # optional, defaults to 0 | |
| } | |
| For PDF/Image: | |
| multipart/form-data with 'file' field containing the question paper | |
| """ | |
| global last_processed_omr_key | |
| try: | |
| omr_key = OMRAnswerKey() | |
| # Check if file upload or JSON | |
| if 'file' in request.files: | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No file selected'}), 400 | |
| # Create Images directory if it doesn't exist | |
| images_dir = os.path.join(app.root_path, 'Images') | |
| os.makedirs(images_dir, exist_ok=True) | |
| if file.filename.lower().endswith('.pdf'): | |
| # Save and process PDF | |
| answer_key_path = os.path.join(images_dir, "omr_answer_key.pdf") | |
| file.save(answer_key_path) | |
| omr_key.path = answer_key_path | |
| # Convert PDF to images and extract text | |
| all_text = "" | |
| try: | |
| print(f"\nProcessing PDF file: {answer_key_path}") | |
| images_from_pdf = convert_from_path( | |
| answer_key_path, | |
| poppler_path=_get_poppler_path(), | |
| dpi=300 # Increase DPI for better quality | |
| ) | |
| print(f"Converted PDF to {len(images_from_pdf)} images") | |
| for idx, page_image in enumerate(images_from_pdf): | |
| print(f"\nProcessing page {idx + 1}") | |
| # Preprocess the image for better OCR | |
| # Convert to numpy array | |
| img_np = np.array(page_image) | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) | |
| # Apply thresholding to get black and white image | |
| _, threshold = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| # Save processed image for debugging | |
| debug_image_path = os.path.join(images_dir, f"debug_page_{idx + 1}.png") | |
| cv2.imwrite(debug_image_path, threshold) | |
| print(f"Saved processed image to {debug_image_path}") | |
| # Configure Tesseract parameters for better accuracy | |
| custom_config = r'--oem 3 --psm 6' | |
| text = pytesseract.image_to_string(threshold, config=custom_config) | |
| print(f"Extracted text length: {len(text)}") | |
| all_text += text + "\n" | |
| print("\nTotal extracted text length:", len(all_text)) | |
| except Exception as e: | |
| print(f"Error during PDF processing: {str(e)}") | |
| raise | |
| # Debug the extracted text | |
| print("\nDebugging PDF extraction:") | |
| debug_text_extraction(all_text) | |
| # Extract metadata and answers | |
| title, duration = extract_omr_metadata(all_text) | |
| answers, questions = extract_omr_answers(all_text) | |
| print("\nExtracted answers:", answers) | |
| omr_key.set_metadata(title, duration) | |
| omr_key.set_answers(answers) | |
| omr_key.questions = questions | |
| else: | |
| # Process as image | |
| answer_key_path = os.path.join(images_dir, "omr_answer_key.png") | |
| file.save(answer_key_path) | |
| omr_key.path = answer_key_path | |
| image = Image.open(answer_key_path) | |
| text = pytesseract.image_to_string(image) | |
| # Debug the extracted text | |
| print("\nDebugging Image extraction:") | |
| debug_text_extraction(text) | |
| # Extract metadata and answers | |
| title, duration = extract_omr_metadata(text) | |
| answers, questions = extract_omr_answers(text) | |
| print("\nStructured Extraction Results:") | |
| print("Title:", title) | |
| print("Duration:", duration) | |
| print("\nQuestions found:", len(questions)) | |
| print("Answers found:", len(answers)) | |
| print("\nAnswers:", answers) | |
| omr_key.set_metadata(title, duration) | |
| omr_key.set_answers(answers) | |
| omr_key.questions = questions | |
| # Set default marking scheme | |
| marks_per_question = float(request.form.get('marks_per_question', 1.0)) | |
| negative_marking = float(request.form.get('negative_marking', 0.0)) | |
| else: | |
| # Process JSON input | |
| if not request.is_json: | |
| return jsonify({'error': 'Request must be JSON or file upload'}), 400 | |
| data = request.get_json() | |
| if 'answers' not in data: | |
| return jsonify({'error': 'Answer key must be provided'}), 400 | |
| # Validate answer format | |
| answer_key = data['answers'] | |
| for q_num, answer in answer_key.items(): | |
| try: | |
| q_num = int(q_num) | |
| if not isinstance(answer, str) or answer.upper() not in ['A', 'B', 'C', 'D']: | |
| return jsonify({ | |
| 'error': f'Invalid answer format for question {q_num}. Must be A, B, C, or D' | |
| }), 400 | |
| except ValueError: | |
| return jsonify({ | |
| 'error': f'Question numbers must be integers, got {q_num}' | |
| }), 400 | |
| # Set the answers | |
| omr_key.set_answers(answer_key) | |
| # Set metadata if provided | |
| title = data.get('title', '') | |
| duration = data.get('duration', '') | |
| omr_key.set_metadata(title, duration) | |
| # Set marking scheme | |
| marks_per_question = float(data.get('marks_per_question', 1.0)) | |
| negative_marking = float(data.get('negative_marking', 0.0)) | |
| # Set marking scheme | |
| omr_key.set_marking_scheme(marks_per_question, negative_marking) | |
| # Store globally | |
| last_processed_omr_key = omr_key | |
| return jsonify({ | |
| 'success': True, | |
| 'message': 'OMR answer key processed successfully', | |
| 'answer_key': omr_key.to_dict() | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'error': f'Failed to process answer key: {str(e)}' | |
| }), 500 | |
| if __name__ == '__main__': | |
| app.run( | |
| host="0.0.0.0", | |
| port=int(os.environ.get("PORT", 5000)), | |
| debug=os.environ.get("FLASK_DEBUG", "false").lower() == "true" | |
| ) |