ocr-omr-backend / app.py
AswinMathew's picture
Deploy OCR/OMR backend to HF Spaces
b8548e4 verified
raw
history blame
90.5 kB
import sys
import platform
import easyocr
from pdf2image import convert_from_path, convert_from_bytes
from flask import Flask, request, jsonify
from flask_cors import CORS
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict, Any
from collections import defaultdict
import numpy as np
import cv2
import pytesseract
from PIL import Image
import os
import tempfile
import difflib
import re
from fuzzywuzzy import fuzz
from dotenv import load_dotenv
import google.generativeai as genai
import asyncio
import base64
import io
import json
import pandas as pd
import subprocess
# Import the SupabaseHandler
import uuid
from datetime import datetime
from supabase import create_client, Client
_tesseract_cmd = os.getenv("TESSERACT_CMD")
if _tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = _tesseract_cmd
elif platform.system() == "Windows":
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
def _get_poppler_path():
env_path = os.getenv("POPPLER_PATH")
if env_path:
return env_path
if platform.system() == "Windows":
# Check common install locations
candidates = [
r'C:\Program Files\poppler\Library\bin',
r'C:\Program Files\poppler\poppler-24.08.0\Library\bin',
]
# Also scan for any versioned poppler directory
poppler_base = r'C:\Program Files\poppler'
if os.path.isdir(poppler_base):
for entry in os.listdir(poppler_base):
candidate = os.path.join(poppler_base, entry, 'Library', 'bin')
if candidate not in candidates:
candidates.append(candidate)
for path in candidates:
if os.path.isdir(path):
return path
return None
load_dotenv()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)
app = Flask(__name__)
CORS(app)
reader = easyocr.Reader(['en'])
# Global variables to store processing results
ocr_extracted_texts = []
last_processed_question_paper_object = None
last_processed_omr_key = None # Global variable to store OMR answer key
last_processed_omr_results = None # Global variable to store OMR processing results
porcessed_omr_results = []
OMR_IMAGES = []
class SupabaseHandler:
def __init__(self):
url: str = os.getenv("SUPABASE_URL")
key: str = os.getenv("SUPABASE_ANON_KEY")
if not url or not key:
raise ValueError("Supabase URL and ANON_KEY must be set in environment variables")
self.supabase: Client = create_client(url, key)
def store_evaluation_result(self, teacher_email, evaluation_data, exam_name=None):
"""
Store evaluation result in Supabase with a unique key and exam name
Returns the unique key for retrieval
"""
try:
# Generate unique key
unique_key = str(uuid.uuid4())
# Prepare data for storage
storage_data = {
"unique_key": unique_key,
"teacher_email": teacher_email,
"evaluation_data": evaluation_data,
"exam_name": exam_name, # Add exam name field
"created_at": datetime.utcnow().isoformat(),
"total_students": evaluation_data.get("total_students", 0)
}
# Insert into Supabase
result = self.supabase.table("evaluation_results").insert(storage_data).execute()
if result.data:
print(f"Successfully stored evaluation result with key: {unique_key} for exam: {exam_name}")
return unique_key
else:
print("Failed to store evaluation result")
return None
except Exception as e:
print(f"Error storing evaluation result: {str(e)}")
return None
def get_evaluation_result(self, unique_key):
"""
Retrieve evaluation result by unique key
"""
try:
result = self.supabase.table("evaluation_results").select("*").eq("unique_key", unique_key).execute()
if result.data and len(result.data) > 0:
return result.data[0]
else:
return None
except Exception as e:
print(f"Error retrieving evaluation result: {str(e)}")
return None
def get_teacher_evaluations(self, teacher_email):
"""
Get all evaluation results for a specific teacher
"""
try:
result = self.supabase.table("evaluation_results").select("unique_key", "created_at", "total_students", "exam_name").eq("teacher_email", teacher_email).order("created_at", desc=True).execute()
if result.data:
return result.data
else:
return []
except Exception as e:
print(f"Error retrieving teacher evaluations: {str(e)}")
return []
class QuestionPaper:
def __init__(self, path=None):
self.questions = []
self.answers = []
self.path = path
def clean_answers(self):
# Remove unwanted patterns from answers
unwanted_patterns = [
"Time: 15 MinutesMarks: 20",
"Time: 15 Minutes Marks: 20",
"GENERAL KNOWLEDGE QUESTION PAPER WITH ANSWERS",
"GENERAL KNOWLEDGE QUESTION PAPER",
]
# Filter out unwanted answers
cleaned_answers = []
for answer in self.answers:
if answer.strip() and answer.strip() not in unwanted_patterns:
# Also check if it doesn't match any unwanted pattern with regex
is_unwanted = False
for pattern in unwanted_patterns:
if pattern and re.search(re.escape(pattern), answer, re.IGNORECASE):
is_unwanted = True
break
if not is_unwanted:
cleaned_answers.append(answer.strip())
self.answers = cleaned_answers
def add_question(self, question_text):
self.questions.append(question_text)
def add_answer(self, answer_text):
self.answers.append(answer_text)
def to_dict(self):
return {
'questions': self.questions,
'answers': self.answers
}
class OMRAnswerKey:
def __init__(self):
self.answers = {} # Dictionary mapping question numbers to correct options
self.total_marks = 0
self.marks_per_question = 1
self.negative_marking = 0
self.title = ""
self.duration = ""
self.total_questions = 0
self.path = None
self.questions = [] # List to store questions if needed
self.question_data = [] # List to store complete question data with options
def __str__(self):
return f"OMR Answer Key: {self.title}\nTotal Questions: {self.total_questions}\nAnswers: {self.answers}"
def set_answers(self, answers: dict):
"""Set the answer key with question numbers as keys and correct options (A,B,C,D) as values"""
self.answers = {int(k): v.upper() for k, v in answers.items() if v.upper() in ['A', 'B', 'C', 'D']}
self.total_questions = len(self.answers)
def set_marking_scheme(self, marks_per_question: float, negative_marking: float = 0):
"""Set the marking scheme for the answer key"""
self.marks_per_question = marks_per_question
self.negative_marking = negative_marking
self.total_marks = self.total_questions * marks_per_question
def set_metadata(self, title: str, duration: str):
"""Set metadata for the answer key"""
self.title = title
self.duration = duration
def set_question_data(self, question_data):
"""Store complete question data including options"""
self.question_data = question_data
self.questions = [f"{q['number']}. {q['question']}" for q in question_data]
self.answers = {q['number']: q['answer'] for q in question_data if q['answer']}
self.total_questions = len(question_data)
def get_question_details(self, question_number):
"""Get complete details for a specific question"""
for q in self.question_data:
print(f"Checking question number: {q['number']} with {question_number}")
if str(q['number']) == str(question_number):
return q
return None
def to_dict(self):
return {
'title': self.title,
'duration': self.duration,
'total_questions': self.total_questions,
'answers': self.answers,
'total_marks': self.total_marks,
'marks_per_question': self.marks_per_question,
'negative_marking': self.negative_marking,
'questions': self.questions,
'question_data': self.question_data # Include complete question data
}
def parse_question_paper_text(text):
"""
Improved parsing function that correctly identifies questions and answers
"""
lines = [line.strip() for line in text.split('\n') if line.strip()]
questions = []
answers = []
# Patterns to ignore (headers, footers, etc.)
ignore_patterns = [
r'GENERAL KNOWLEDGE QUESTION PAPER.*',
r'Time:\s*\d+\s*Minutes.*Marks:\s*\d+',
r'Time:\s*\d+\s*MinutesMarks:\s*\d+',
r'^\s*$' # Empty lines
]
# Filter out unwanted lines
filtered_lines = []
for line in lines:
should_ignore = False
for pattern in ignore_patterns:
if re.match(pattern, line, re.IGNORECASE):
should_ignore = True
break
if not should_ignore:
filtered_lines.append(line)
# Pattern to identify questions (starts with number followed by dot/parenthesis)
question_pattern = r'^\d+\s*[.)]\s*(.+)'
i = 0
while i < len(filtered_lines):
current_line = filtered_lines[i].strip()
# Check if current line is a question
question_match = re.match(question_pattern, current_line)
if question_match:
# This is a question
question_text = question_match.group(1).strip()
questions.append(f"{current_line}") # Keep the full question with number
# Look for the answer in the next line
if i + 1 < len(filtered_lines):
next_line = filtered_lines[i + 1].strip()
# If next line is not a question (doesn't start with number), it's likely an answer
if not re.match(question_pattern, next_line):
answers.append(next_line)
i += 2 # Skip both question and answer
else:
# Next line is also a question, so this question might not have an answer
# Or the answer might be embedded in the same line
# Try to extract answer from the question line itself if it contains common answer patterns
answers.append("") # Placeholder for missing answer
i += 1
else:
# Last line and it's a question without answer
answers.append("")
i += 1
else:
# This line doesn't match question pattern, skip it or try to pair it with previous question
if len(questions) > len(answers):
# We have more questions than answers, this might be an answer
answers.append(current_line)
i += 1
# Ensure we have equal number of questions and answers
while len(answers) < len(questions):
answers.append("")
while len(questions) < len(answers):
questions.append(f"Question {len(questions) + 1}")
return questions, answers
def improved_clean_and_parse_ocr_text(ocr_text):
"""
Improved parsing with better answer extraction logic
"""
# Remove special characters but keep important ones
cleaned_text = re.sub(r'[|@~¥#$%^&*()_+=\[\]{}\\:";\'<>?,./]', ' ', ocr_text)
# Split by newlines and filter out empty strings
lines = [line.strip() for line in cleaned_text.split('\n') if line.strip()]
individual_answers = []
# Try to find numbered patterns first
numbered_pattern = re.compile(r'(\d+)\s*[.)]\s*([^0-9]+?)(?=\d+\s*[.)]|$)', re.MULTILINE | re.DOTALL)
matches = numbered_pattern.findall(cleaned_text)
if matches:
# If we found numbered patterns, use them
for number, answer in matches:
answer = answer.strip()
if answer and len(answer) > 1:
individual_answers.append(answer)
else:
# Fallback to line-by-line processing
for line in lines:
# Remove leading numbers and punctuation
cleaned_line = re.sub(r'^\d+\s*[.)]\s*', '', line).strip()
if cleaned_line and len(cleaned_line) > 1:
individual_answers.append(cleaned_line)
return individual_answers
def find_best_match(student_answer, correct_answers, threshold=0.6):
"""
Find the best matching correct answer for a student answer
"""
best_score = 0
best_match = None
for correct_answer in correct_answers:
# Use multiple similarity metrics
ratio_score = difflib.SequenceMatcher(None, student_answer.lower(), correct_answer.lower()).ratio()
fuzzy_score = fuzz.ratio(student_answer.lower(), correct_answer.lower()) / 100.0
partial_score = fuzz.partial_ratio(student_answer.lower(), correct_answer.lower()) / 100.0
# Take the maximum of all scores
combined_score = max(ratio_score, fuzzy_score, partial_score)
if combined_score > best_score:
best_score = combined_score
best_match = correct_answer
# Only return match if it meets the threshold
if best_score >= threshold:
return best_match, best_score
else:
return None, best_score
def extract_roll_number(student_answer_path):
"""
Extract roll number from student answer sheet using OCR
"""
try:
student_answer_image = Image.open(student_answer_path)
text = pytesseract.image_to_string(student_answer_image)
# Look for common roll number patterns
roll_patterns = [
r'(?i)roll\s*no\s*[:\-]?\s*(\w+)',
r'(?i)roll\s*number\s*[:\-]?\s*(\w+)',
r'(?i)roll\s*[:\-]?\s*(\w+)',
r'(?i)reg\s*no\s*[:\-]?\s*(\w+)',
r'(?i)registration\s*[:\-]?\s*(\w+)'
]
for pattern in roll_patterns:
match = re.search(pattern, text)
if match:
return match.group(1).strip()
# If no explicit roll number found, try to find number sequences
number_sequences = re.findall(r'\b\d{2,}\b', text)
if number_sequences:
return number_sequences[0] # Return first significant number sequence
return "Unknown"
except Exception as e:
print(f"Error extracting roll number: {str(e)}")
return "Unknown"
# OMR Section
@dataclass
class BubbleLocation:
"""Stores information about each bubble"""
question_num: int
option: str
center: Tuple[int, int]
radius: int
filled: bool = False
fill_ratio: float = 0.0
class CorrectedOMRReader:
def __init__(self, image_path: str = None, image_array: np.ndarray = None):
"""Initialize the OMR Reader with an image"""
if image_array is not None:
self.image = image_array
self.image_path = None
elif image_path is not None:
self.image = cv2.imread(image_path)
self.image_path = image_path
else:
raise ValueError("Either image_array or image_path must be provided")
if self.image is None:
raise ValueError("Could not load image")
self.gray = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)
self.height, self.width = self.gray.shape
self.bubbles = []
self.answers = {}
# Expected grid parameters
self.expected_radius = 15 # Approximate bubble radius
self.grid_params = {
'rows': 20, # Maximum rows
'cols': 3, # 3 columns of questions
'options': 4 # 4 options per question (A, B, C, D)
}
def preprocess_for_detection(self):
"""Preprocess specifically for bubble DETECTION (not fill detection)"""
blurred = cv2.GaussianBlur(self.gray, (3, 3), 0)
_, thresh = cv2.threshold(blurred, 200, 255, cv2.THRESH_BINARY)
self.detection_thresh = cv2.bitwise_not(thresh)
return self.detection_thresh
def find_bubble_grid(self):
"""Find bubble locations using grid detection"""
bubbles = []
param_sets = [
{'dp': 1.0, 'minDist': 20, 'param1': 50, 'param2': 28, 'minRadius': 10, 'maxRadius': 20},
{'dp': 1.1, 'minDist': 22, 'param1': 45, 'param2': 25, 'minRadius': 11, 'maxRadius': 19},
{'dp': 1.2, 'minDist': 25, 'param1': 40, 'param2': 30, 'minRadius': 9, 'maxRadius': 21},
]
for params in param_sets:
circles = cv2.HoughCircles(
self.gray,
cv2.HOUGH_GRADIENT,
dp=params['dp'],
minDist=params['minDist'],
param1=params['param1'],
param2=params['param2'],
minRadius=params['minRadius'],
maxRadius=params['maxRadius']
)
if circles is not None:
circles = np.round(circles[0, :]).astype("int")
for (x, y, r) in circles:
is_dup = False
for bub in bubbles:
if np.sqrt((x - bub[0])**2 + (y - bub[1])**2) < 15:
is_dup = True
break
if not is_dup:
bubbles.append((x, y, r))
print(f" Found {len(bubbles)} bubbles with Hough Circles")
if len(bubbles) < 180:
template_bubbles = self.template_matching_detection()
bubbles.extend(template_bubbles)
print(f" Added {len(template_bubbles)} bubbles with template matching")
return bubbles
def template_matching_detection(self):
"""Use template matching to find bubble locations"""
bubbles = []
template_size = 30
template = np.zeros((template_size, template_size), dtype=np.uint8)
cv2.circle(template, (template_size//2, template_size//2), 12, 255, 2)
result = cv2.matchTemplate(self.gray, template, cv2.TM_CCOEFF_NORMED)
threshold = 0.5
locations = np.where(result >= threshold)
for pt in zip(*locations[::-1]):
center_x = pt[0] + template_size // 2
center_y = pt[1] + template_size // 2
too_close = False
for (bx, by, _) in bubbles:
if np.sqrt((center_x - bx)**2 + (center_y - by)**2) < 20:
too_close = True
break
if not too_close:
bubbles.append((center_x, center_y, 12))
return bubbles
def detect_bubbles_by_contours(self):
"""Detect bubbles using contours - focusing on circular shapes"""
bubbles = []
edge_params = [(30, 100), (50, 150), (20, 80)]
for low, high in edge_params:
edges = cv2.Canny(self.gray, low, high)
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
area = cv2.contourArea(contour)
if 150 < area < 900:
(x, y), radius = cv2.minEnclosingCircle(contour)
perimeter = cv2.arcLength(contour, True)
if perimeter > 0:
circularity = 4 * np.pi * area / (perimeter * perimeter)
if circularity > 0.6 and 8 < radius < 22:
is_dup = False
for bub in bubbles:
if np.sqrt((x - bub[0])**2 + (y - bub[1])**2) < 15:
is_dup = True
break
if not is_dup:
bubbles.append((int(x), int(y), int(radius)))
return bubbles
def organize_and_filter_bubbles(self, all_bubbles):
if not all_bubbles:
return []
filtered_bubbles = []
for bubble in all_bubbles:
is_duplicate = False
for existing in filtered_bubbles:
dist = np.sqrt((bubble[0] - existing[0])**2 + (bubble[1] - existing[1])**2)
if dist < 15:
is_duplicate = True
break
if not is_duplicate:
filtered_bubbles.append(bubble)
filtered_bubbles.sort(key=lambda b: (b[1], b[0]))
rows = []
current_row = []
row_threshold = 20
for bubble in filtered_bubbles:
if not current_row:
current_row.append(bubble)
else:
avg_y = np.mean([b[1] for b in current_row])
if abs(bubble[1] - avg_y) < row_threshold:
current_row.append(bubble)
else:
if len(current_row) >= 4:
current_row.sort(key=lambda b: b[0])
rows.append(current_row)
current_row = [bubble]
if len(current_row) >= 4:
current_row.sort(key=lambda b: b[0])
rows.append(current_row)
return rows
def map_to_questions(self, bubble_rows):
mapped_bubbles = []
options = ['A', 'B', 'C', 'D']
if not bubble_rows:
return mapped_bubbles
col1_max = self.width * 0.35
col2_max = self.width * 0.68
for row_idx, row in enumerate(bubble_rows[:20]):
col1 = [b for b in row if b[0] < col1_max]
col2 = [b for b in row if col1_max <= b[0] < col2_max]
col3 = [b for b in row if b[0] >= col2_max]
if len(col1) >= 4:
col1_sorted = sorted(col1, key=lambda b: b[0])[:4]
q_num = row_idx + 1
for opt_idx, bubble in enumerate(col1_sorted):
mapped_bubbles.append(BubbleLocation(q_num, options[opt_idx], (bubble[0], bubble[1]), bubble[2]))
if len(col2) >= 4:
col2_sorted = sorted(col2, key=lambda b: b[0])[:4]
q_num = row_idx + 21
for opt_idx, bubble in enumerate(col2_sorted):
mapped_bubbles.append(BubbleLocation(q_num, options[opt_idx], (bubble[0], bubble[1]), bubble[2]))
if row_idx < 10 and len(col3) >= 4:
col3_sorted = sorted(col3, key=lambda b: b[0])[:4]
q_num = row_idx + 41
for opt_idx, bubble in enumerate(col3_sorted):
mapped_bubbles.append(BubbleLocation(q_num, options[opt_idx], (bubble[0], bubble[1]), bubble[2]))
return mapped_bubbles
def analyze_bubble_fill(self, bubble: BubbleLocation):
mask = np.zeros(self.gray.shape, dtype=np.uint8)
cv2.circle(mask, bubble.center, max(bubble.radius - 5, 5), 255, -1)
mean_val = cv2.mean(self.gray, mask=mask)[0]
large_ring_mask = np.zeros(self.gray.shape, dtype=np.uint8)
cv2.circle(large_ring_mask, bubble.center, bubble.radius + 10, 255, -1)
cv2.circle(large_ring_mask, bubble.center, bubble.radius + 5, 0, -1)
surrounding_mean = cv2.mean(self.gray, mask=large_ring_mask)[0]
bubble.darkness_score = surrounding_mean - mean_val
darkness_threshold = 50
absolute_darkness_threshold = 150 # 150
bubble.filled = (bubble.darkness_score > darkness_threshold) and (mean_val < absolute_darkness_threshold)
pixels = self.gray[mask > 0]
if len(pixels) > 0:
std_dev = np.std(pixels)
if std_dev > 25 and mean_val < 170:
bubble.filled = True
if mean_val < 120:
bubble.filled = True
return bubble.filled
def process(self):
"""Main processing pipeline"""
print("Starting corrected OMR processing...")
print("Detecting bubble locations...")
all_bubbles = []
circles = self.find_bubble_grid()
all_bubbles.extend(circles)
contour_bubbles = self.detect_bubbles_by_contours()
all_bubbles.extend(contour_bubbles)
print(f" Contour bubbles found: {len(contour_bubbles)}")
print(f"Total bubbles detected: {len(all_bubbles)}")
if len(all_bubbles) < 180:
print("Not enough bubbles detected, using grid-based approach...")
grid_bubbles = self.detect_by_grid_assumption()
all_bubbles.extend(grid_bubbles)
print(f"Added {len(grid_bubbles)} bubbles from grid assumption")
print("Organizing bubbles into grid...")
bubble_rows = self.organize_and_filter_bubbles(all_bubbles)
print(f"Organized into {len(bubble_rows)} rows")
print("Mapping bubbles to questions...")
self.bubbles = self.map_to_questions(bubble_rows)
print(f"Mapped {len(self.bubbles)} bubble locations")
print("Analyzing filled bubbles...")
for bubble in self.bubbles:
self.analyze_bubble_fill(bubble)
print("Extracting final answers...")
self.extract_answers()
return self.answers
def detect_by_grid_assumption(self):
bubbles = []
col_starts = [60, 360, 660]
bubble_spacing_x = 45
bubble_spacing_y = 28
start_y = 50
for col_idx, col_x in enumerate(col_starts):
num_rows = 20 if col_idx < 2 else 10
for row in range(num_rows):
y = start_y + row * bubble_spacing_y
for opt in range(4):
x = col_x + opt * bubble_spacing_x
exists = False
for existing in bubbles:
if np.sqrt((x - existing[0])**2 + (y - existing[1])**2) < 20:
exists = True
break
if not exists:
bubbles.append((x, y, 13))
return bubbles
def extract_answers(self):
questions = defaultdict(list)
for bubble in self.bubbles:
questions[bubble.question_num].append(bubble)
self.answers = {}
for q_num in sorted(questions.keys()):
q_bubbles = questions[q_num]
filled = [b for b in q_bubbles if b.filled]
if not filled:
self.answers[q_num] = "---"
elif len(filled) == 1:
self.answers[q_num] = filled[0].option
else:
filled.sort(key=lambda b: b.darkness_score, reverse=True)
self.answers[q_num] = filled[0].option
return self.answers
def visualize_results(self):
result_img = self.image.copy()
for bubble in self.bubbles:
if bubble.filled:
cv2.circle(result_img, bubble.center, bubble.radius, (0, 255, 0), 2)
text = f"Q{bubble.question_num}:{bubble.option}"
cv2.putText(result_img, text,
(bubble.center[0] - 25, bubble.center[1] - bubble.radius - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0, 0, 255), 1)
else:
cv2.circle(result_img, bubble.center, bubble.radius, (100, 100, 255), 1)
return result_img
def display_results(self):
print("\n" + "="*60)
print("DETECTED ANSWERS")
print("="*60)
for i in range(1, 21):
row_str = ""
ans1 = self.answers.get(i, "---")
row_str += f"Q{i:2d}: {ans1:^4} | "
if i + 20 <= 40:
ans2 = self.answers.get(i + 20, "---")
row_str += f"Q{i+20:2d}: {ans2:^4} | "
else:
row_str += " " * 13 + "| "
if i + 40 <= 50:
ans3 = self.answers.get(i + 40, "---")
row_str += f"Q{i+40:2d}: {ans3:^4}"
print(row_str)
print("\n" + "="*60)
print("SUMMARY")
print("="*60)
answered = sum(1 for v in self.answers.values() if v != "---")
print(f"Questions detected: {len(self.answers)}")
print(f"Answered: {answered}")
print(f"Unanswered: {len(self.answers) - answered}")
def process_single_image(image_data) -> Dict[str, Any]:
"""Process a single image and return results with fixed indexing"""
try:
# Convert image data to numpy array
if isinstance(image_data, str):
# Base64 encoded image
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
image_array = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
else:
# Direct file upload
image = Image.open(image_data)
image_array = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# Process the image using the new CorrectedOMRReader
reader = CorrectedOMRReader(image_array=image_array)
answers = reader.process()
# No need for indexing fix in the new implementation
fixed_answers = answers
# Calculate CORRECTED statistics for 50 questions total
total_questions = 50 # Fixed to always be 50
answered = sum(1 for v in fixed_answers.values() if v is not None)
unanswered = total_questions - answered
# Format answers for JSON (convert None to "null" string)
formatted_answers = {}
for q_num in range(1, total_questions + 1):
answer = fixed_answers.get(q_num)
formatted_answers[str(q_num)] = answer if answer is not None else "null"
return {
"success": True,
"answers": formatted_answers,
"summary": {
"total_questions": total_questions,
"answered": answered,
"unanswered": unanswered
}
}
except Exception as e:
return {
"success": False,
"error": str(e),
"answers": {},
"summary": {
"total_questions": 50,
"answered": 0,
"unanswered": 50
}
}
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
"status": "healthy",
"message": "OMR API is running"
})
@app.route('/', methods=['GET'])
def home():
"""Home endpoint with API documentation"""
return jsonify({
"message": "OMR Processing API",
"version": "1.0",
"endpoints": {
"/process_omr": {
"method": "POST",
"description": "Process OMR answer sheets",
"accepts": [
"Multipart form data with 'images' field",
"JSON with base64 encoded images in 'images' array"
],
"returns": "JSON with detected answers and summary"
},
"/health": {
"method": "GET",
"description": "Health check endpoint"
}
},
"example_response": {
"success": True,
"answers": {
"1": "A",
"2": "B",
"3": "null"
},
"summary": {
"total_questions": 50,
"answered": 45,
"unanswered": 5
}
}
})
# <----------------->
@app.route('/easyocr', methods=['POST'])
def easyocr_image():
if 'images' not in request.files:
return jsonify({'error': 'No image files provided'}), 400
images = request.files.getlist('images')
extracted_texts = []
for image_file in images:
try:
# Save the image to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_image_file:
image_file.save(temp_image_file.name)
temp_path = temp_image_file.name
try:
image_np = np.frombuffer(open(temp_path, 'rb').read(), np.uint8)
image = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
# Perform OCR
result = reader.readtext(image)
# Extract text from the result
text = " ".join([item[1] for item in result])
extracted_texts.append(text)
ocr_extracted_texts.append(text)
finally:
# Clean up temp file
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception as e:
extracted_texts.append(f"Error processing image with EasyOCR: {str(e)}")
return jsonify({'extracted_texts': extracted_texts})
@app.route('/tesseract', methods=['POST'])
def tesseract_image():
if 'images' not in request.files:
return jsonify({'error': 'No image files provided'}), 400
images = request.files.getlist('images')
extracted_texts = []
for image_file in images:
try:
# Save the image to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_image_file:
image_file.save(temp_image_file.name)
temp_path = temp_image_file.name
try:
with Image.open(temp_path) as image:
# Perform OCR using Tesseract
text = pytesseract.image_to_string(image)
extracted_texts.append(text.strip())
ocr_extracted_texts.append(text.strip())
finally:
# Clean up the temporary file
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception as e:
extracted_texts.append(f"Error processing image with Tesseract: {str(e)}")
return jsonify({'extracted_texts': extracted_texts})
@app.route('/process_question_paper', methods=['POST'])
def process_question_paper():
global last_processed_question_paper_object
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
question_paper = QuestionPaper()
try:
# Create Images directory if it doesn't exist
images_dir = os.path.join(app.root_path, 'Images')
os.makedirs(images_dir, exist_ok=True)
if file.filename.lower().endswith('.pdf'):
question_paper_filename = "question_paper.pdf"
question_paper_path = os.path.join(images_dir, question_paper_filename)
file.save(question_paper_path)
# Initialize the global object with the path
question_paper.path = question_paper_path
# For PDF processing
images_from_pdf = convert_from_path(question_paper_path, poppler_path=_get_poppler_path())
all_text = ""
for page_image in images_from_pdf:
text = pytesseract.image_to_string(page_image)
all_text += text + "\n"
# Use improved parsing
questions, answers = parse_question_paper_text(all_text)
question_paper.questions = questions
question_paper.answers = answers
else:
# Process as image
question_paper_filename = "question_paper.png"
question_paper_path = os.path.join(images_dir, question_paper_filename)
file.save(question_paper_path)
question_paper.path = question_paper_path
image = Image.open(question_paper_path)
text = pytesseract.image_to_string(image)
# Use improved parsing
questions, answers = parse_question_paper_text(text)
question_paper.questions = questions
question_paper.answers = answers
# Clean the answers (remove any remaining unwanted patterns)
question_paper.clean_answers()
# Store the processed question paper globally
last_processed_question_paper_object = question_paper
return jsonify(question_paper.to_dict())
except Exception as e:
return jsonify({'error': str(e)}), 500
def gemini_evaluate_answer_sheet_with_roll(question_paper_path, student_answer_path, questions, correct_answers, paddle_results):
"""
Evaluate entire answer sheet using Gemini and extract roll number
"""
try:
model = genai.GenerativeModel('gemini-2.5-flash')
# Create the expected answers list for the prompt
expected_answers_text = "\n".join([f"{i+1}. {answer}" for i, answer in enumerate(correct_answers)])
prompt_text = f"""You are an OCR Assitant for an evaluvation script.
You will be given an image of a question paper and an image of a student's handwritten answers along with traditional OCR evaluvations.
Your task is assist the traditional OCR in overcoming its limitation with handwritten text the image may have bad quality handwritten text which the OCR may fail to extract and evaluvate properly, this is where you come in.
Your task is to Just do a double check of the OCR results and correct any mistakes or missing answers. and provide the result in a structured way.
Expected correct answers:
{expected_answers_text}
Traditional OCR Evaluation Results:
{paddle_results}
Instructions:
- First, identify and extract the student's roll number from the answer sheet
- Compare the student's handwritten answers with the expected answers above
- Small spelling mistakes should be ignored and considered correct
- If an answer has been crossed out or strikethrough, consider it incorrect
- Be lenient with handwriting recognition issues
- Look for answers by question numbers (1, 2, 3, etc.)
Please evaluate ALL questions and respond in this EXACT JSON format:
{{
"roll_number": "extracted_roll_number_here",
"evaluations": [
{{"question_number": 1, "status": "Correct"}},
{{"question_number": 2, "status": "Wrong"}},
{{"question_number": 3, "status": "Missing"}},
...
]
}}
For roll_number: Look for patterns like "Roll No:", "Roll Number:", "Reg No:", or any number sequence that appears to be a student identifier.
For each question, use ONLY one of these three status values:
- "Correct" - if the student's answer matches the expected answer (allowing for minor spelling)
- "Wrong" - if the student's answer is clearly different from the expected answer
- "Missing" - if no answer is visible for this question number
Respond with ONLY the JSON format above, no other text.
! Note
Ignore texts like `GENERAL KNOWLEDGE QUESTION PAPER WITH ANSWERS` and the final output should only have actual questions.
"""
# Handle PDF vs Image for question paper
if question_paper_path.lower().endswith('.pdf'):
# Convert PDF to images
pdf_images = convert_from_path(question_paper_path, poppler_path=_get_poppler_path())
question_paper_img = pdf_images[0] # Use first page
else:
question_paper_img = Image.open(question_paper_path)
# Load student answer image
student_answer_img = Image.open(student_answer_path)
# Create content for the model
content = [prompt_text, question_paper_img, student_answer_img]
response = model.generate_content(content)
result_text = response.text.strip()
print(f"Gemini response: {result_text}")
# Try to parse JSON response
import json
try:
# Clean the response - sometimes Gemini adds markdown formatting
if "```json" in result_text:
result_text = result_text.split("```json")[1].split("```")[0].strip()
elif "```" in result_text:
result_text = result_text.split("```")[1].strip()
parsed_result = json.loads(result_text)
return parsed_result["roll_number"], parsed_result["evaluations"]
except (json.JSONDecodeError, KeyError) as e:
print(f"Failed to parse JSON response: {e}")
print(f"Raw response: {result_text}")
# Fallback - extract roll number using OCR and create default "Error" results
roll_number = extract_roll_number(student_answer_path)
return roll_number, [{"question_number": i+1, "status": "Error"} for i in range(len(correct_answers))]
except Exception as e:
print(f"Error in Gemini evaluation: {str(e)}")
# Return error status for all questions with OCR extracted roll number
roll_number = extract_roll_number(student_answer_path)
return roll_number, [{"question_number": i+1, "status": "Error"} for i in range(len(correct_answers))]
def quick_match(correct_list, messy_student_list, min_score=80):
"""Quick function to match messy student answers"""
from fuzzywuzzy import process
import re
results = []
used = set()
for item in messy_student_list:
# Extract content
content = re.sub(r'^\d+\.?\s*', '', str(item)).strip()
if content and content != '-':
# Find best match
match = process.extractOne(content, correct_list)
if match and match[1] >= min_score:
q_num = correct_list.index(match[0]) + 1
if q_num not in used:
used.add(q_num)
results.append((item, q_num, match[0], match[1]))
return results
def process_with_paddle_ocr(image_path, correct_answers):
"""
Process an image with PaddleOCR and perform similarity matching with correct answers
Returns:
tuple: (extracted_text, similarity_scores, average_similarity)
"""
try:
# Initialize PaddleOCR
from paddleocr import PaddleOCR
print("Initializing PaddleOCR...")
ocr = PaddleOCR(
use_doc_orientation_classify=True,
use_doc_unwarping=False,
use_textline_orientation=False
)
print("PaddleOCR initialized.")
# Read and process the image
# result = ocr.ocr(image_path, cls=True)
print("Preditcing")
result = ocr.predict(image_path)
print("PaddleOCR processing completed.")
# print(f"PaddleOCR result: {result}")
print("Correct Answers are:")
print(correct_answers)
for res in result:
words = res["rec_texts"]
print(f"PaddleOCR extracted words: {words}")
# words = result["rec_texts"]
result = quick_match(correct_answers, words, min_score=85)
print(f"PaddleOCR matched results: {result}")
return result
except Exception as e:
print(f"Error in PaddleOCR processing: {str(e)}")
return None, [], 0
# OCR Evaluvation Endpoint
@app.route('/evaluate_answers', methods=['POST'])
def evaluate_answers():
global ocr_extracted_texts
if 'student_answers' not in request.files:
return jsonify({"error": "Missing student answers"}), 400
student_answer_files = request.files.getlist('student_answers')
# Get teacher email and exam name from the request
teacher_email = request.form.get('teacher_email', 'unknown@example.com')
exam_name = request.form.get('exam_name', 'Untitled Exam') # Get exam name from form data
# Retrieve the question paper object
question_paper = last_processed_question_paper_object
if last_processed_question_paper_object is None:
return jsonify({'error': 'Question paper not found or processed yet'}), 404
student_answer_paths = []
try:
# Save student answer files temporarily
for student_answer_file in student_answer_files:
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_ans_file:
student_answer_file.save(temp_ans_file.name)
student_answer_paths.append(temp_ans_file.name)
# Process each student's answer sheet
all_students_results = []
if question_paper.path and os.path.exists(question_paper.path):
print(f"Starting Gemini evaluation for exam: {exam_name} with {len(student_answer_paths)} students...")
for idx, student_answer_path in enumerate(student_answer_paths):
print(f"Processing answer sheet {idx + 1} with PaddleOCR...")
# First process with PaddleOCR
results = process_with_paddle_ocr(
student_answer_path,
question_paper.answers
)
roll_number, sheet_evaluations = gemini_evaluate_answer_sheet_with_roll(
question_paper.path,
student_answer_path,
question_paper.questions,
question_paper.answers,
results
)
# Process the results for this student
student_results = []
for eval_result in sheet_evaluations:
question_num = eval_result["question_number"]
if 1 <= question_num <= len(question_paper.questions):
student_results.append({
'question_number': question_num,
'question_text': question_paper.questions[question_num - 1],
'correct_answer': question_paper.answers[question_num - 1],
'status': eval_result["status"]
})
# Calculate summary for this student
correct_count = sum(1 for result in student_results if result['status'] == 'Correct')
total_questions = len(student_results)
score_percentage = (correct_count / total_questions) * 100 if total_questions > 0 else 0
student_summary = {
'roll_number': roll_number,
'total_questions': len(question_paper.answers),
'correct_answers': correct_count,
'wrong_answers': sum(1 for result in student_results if result['status'] == 'Wrong'),
'missing_answers': sum(1 for result in student_results if result['status'] == 'Missing'),
'error_answers': sum(1 for result in student_results if result['status'] == 'Error'),
'score_percentage': round(score_percentage, 2),
'evaluation_results': student_results,
'ocr_results': {
'extracted_text': results,
}
}
all_students_results.append(student_summary)
final_results = {
'exam_name': exam_name, # Include exam name in results
'total_students': len(student_answer_paths),
'students_evaluated': all_students_results
}
# STORE THE RESULTS IN SUPABASE WITH EXAM NAME
try:
supabase_handler = SupabaseHandler()
unique_key = supabase_handler.store_evaluation_result(teacher_email, final_results, exam_name)
if unique_key:
# Add the unique key to the response
final_results['unique_key'] = unique_key
final_results['storage_success'] = True
print(f"Results stored successfully with key: {unique_key} for exam: {exam_name}")
else:
final_results['storage_success'] = False
final_results['storage_error'] = "Failed to store results in database"
print("Failed to store results in Supabase")
except Exception as storage_error:
print(f"Error storing results: {str(storage_error)}")
final_results['storage_success'] = False
final_results['storage_error'] = str(storage_error)
return jsonify(final_results)
else:
return jsonify({
'error': 'Question paper file not found for Gemini evaluation.'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
# Clean up temporary student answer files
for path in student_answer_paths:
try:
if os.path.exists(path):
os.unlink(path)
except PermissionError:
pass # File still locked on Windows; OS will clean up temp dir
# Get Evaluation
@app.route('/get_evaluation_result/<unique_key>', methods=['GET'])
def get_evaluation_result(unique_key):
"""
Get evaluation result by unique key
"""
try:
supabase_handler = SupabaseHandler()
result = supabase_handler.get_evaluation_result(unique_key)
if result:
return jsonify({
'success': True,
'data': result
})
else:
return jsonify({
'error': 'Evaluation result not found'
}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
# Get Teacher Evaluation
@app.route('/get_teacher_evaluations/<teacher_email>', methods=['GET'])
def get_teacher_evaluations(teacher_email):
"""
Get all evaluation results for a specific teacher
"""
try:
supabase_handler = SupabaseHandler()
results = supabase_handler.get_teacher_evaluations(teacher_email)
return jsonify({
'success': True,
'data': results,
'total_evaluations': len(results)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# Get OMR Answer Key
@app.route('/get_omr_answer_key', methods=['GET'])
def get_omr_answer_key():
"""Get the currently stored OMR answer key"""
global last_processed_omr_key
if last_processed_omr_key is None:
return jsonify({
'error': 'No answer key has been processed yet'
}), 404
return jsonify({
'success': True,
'answer_key': last_processed_omr_key.to_dict()
})
def omr_gemini_process(error_questions, correct_answers, image_file):
"""
Use Gemini to assist in evaluating OMR sheets, especially for error questions
"""
try:
model = genai.GenerativeModel('gemini-2.5-flash')
prompt_text = f"""
You are an OMR Assistant for an evaluvation script.
Your main purpose is to assist in the process.
Correct Answers to questions sorted by question number: {correct_answers}
Error Question numbers: {error_questions}
Your task:
- From the given image identify the student name and roll number
- if for some reason the traditional OMR Processing failed to detect some answers, those question numbers will be provided to you, you should look into those questions form the given image and correct answers.
- Only provide answer for the questions that are in the error list.
- You can ignore the rest of the question
- if Error question is empty, just extract the roll number and name
Please evaluate ALL questions and respond in this EXACT JSON format:
{{
"roll_number": "extracted_roll_number_here",
"evaluations": [
{{"question_number": 1, "status": "Correct"}},
{{"question_number": 2, "status": "Wrong"}},
{{"question_number": 3, "status": "Missing"}},
...
]
}}
"""
student_answer_img = image_file
content = [prompt_text, student_answer_img]
response = model.generate_content(content)
result_text = response.text.strip()
print(f"Gemini response: {result_text}")
import json
try:
# Clean the response - sometimes Gemini adds markdown formatting
if "```json" in result_text:
result_text = result_text.split("```json")[1].split("```")[0].strip()
elif "```" in result_text:
result_text = result_text.split("```")[1].strip()
parsed_result = json.loads(result_text)
return parsed_result["roll_number"], parsed_result["evaluations"]
except (json.JSONDecodeError, KeyError) as e:
print(f"Failed to parse JSON response: {e}")
print(f"Raw response: {result_text}")
# Fallback - extract roll number using OCR and create default "Error" results
roll_number = extract_roll_number(os.path.join("OMRChecker", "inputs", "OMRImage.jpg"))
return roll_number, [{"question_number": i+1, "status": "Error"} for i in range(len(correct_answers))]
except Exception as e:
print(f"Error in OMR Gemini processing: {str(e)}")
return "Unknown", [{"question_number": q, "status": "Error"} for q in error_questions]
@app.route('/evaluate_omr', methods=['POST'])
def evaluate_omr():
"""
Evaluate OMR answers against stored answer key
"""
global last_processed_omr_key, last_processed_omr_results, porcessed_omr_results, OMR_IMAGES
# Get teacher email and exam name from the request
teacher_email = request.form.get('teacher_email', 'unknown@example.com')
exam_name = request.form.get('exam_name', 'Untitled Exam') # Get exam name from form data
if not last_processed_omr_key:
return jsonify({
'error': 'No answer key has been processed. Please process an answer key first.'
}), 400
if not last_processed_omr_results:
return jsonify({
'error': 'No OMR sheet has been processed. Please process an OMR sheet first.'
}), 400
try:
# Get the marked answers from the processed OMR
if isinstance(last_processed_omr_results, list):
omr_data = last_processed_omr_results[0] # Take first sheet if multiple
else:
omr_data = last_processed_omr_results
student_datas = []
for idx, omr_data in enumerate(porcessed_omr_results):
marked_answers = omr_data
image_file = OMR_IMAGES[idx]
# Get correct answers from answer key (only for questions that exist)
correct_answers = last_processed_omr_key.answers
total_questions_in_key = len(correct_answers)
# Evaluate answers only for questions that exist in the answer key
evaluation_details = []
correct_count = 0
wrong_count = 0
missing_count = 0
error_questions = []
for q_num in sorted(correct_answers.keys()):
print(f"Evaluating Question {q_num}")
print(f"Correct Answer: {correct_answers[q_num]} | Marked Answer: {marked_answers.get(str(q_num))}")
correct_ans = correct_answers[q_num]
marked_ans = marked_answers.get(str(q_num))
if marked_ans is None or marked_ans == '' or len(str(marked_ans)) > 1 or marked_ans == 'nan':
status = 'Missing'
error_questions.append(q_num)
missing_count += 1
elif marked_ans.upper() == correct_ans.upper():
status = 'Correct'
correct_count += 1
else:
status = 'Wrong'
wrong_count += 1
evaluation_details.append({
'question_number': q_num,
'question_text': last_processed_omr_key.questions[q_num - 1] if q_num <= len(last_processed_omr_key.questions) else f"Question {q_num}",
'correct_answer': correct_ans,
'marked_answer': marked_ans if marked_ans != 'null' else None,
'status': status
})
roll_no, gemini_result = omr_gemini_process(
error_questions,
last_processed_omr_key.answers,
image_file
)
for err_idx in error_questions:
for gemini_eval in gemini_result:
if gemini_eval["question_number"] == err_idx:
correct_ans = last_processed_omr_key.answers[err_idx]
marked_ans = None # Since it was an error question
status = gemini_eval["status"]
if status == "Correct":
correct_count += 1
# wrong_count -= 1 # Adjust wrong count
missing_count -= 1 # Adjust missing count
elif status == "Wrong":
wrong_count += 1
missing_count -= 1 # Adjust missing count
elif status == "Missing":
missing_count += 1
# Update the evaluation details
for eval_detail in evaluation_details:
if eval_detail['question_number'] == err_idx:
eval_detail.update({
'marked_answer': marked_ans,
'status': status
})
break
break
# Calculate score
total_score = correct_count * last_processed_omr_key.marks_per_question
if last_processed_omr_key.negative_marking > 0:
total_score -= wrong_count * last_processed_omr_key.negative_marking
max_score = total_questions_in_key * last_processed_omr_key.marks_per_question
student_summary = {
'roll_number': roll_no,
'total_questions': len(last_processed_omr_key.answers),
'correct_answers': correct_count,
'wrong_answers': wrong_count,
'missing_answers': missing_count,
'error_answers': len(error_questions),
'score_percentage': correct_count / len(last_processed_omr_key.answers) * 100 if len(last_processed_omr_key.answers) > 0 else 0,
'evaluation_results': evaluation_details,
'ocr_results': {
'extracted_text': gemini_result,
}
}
student_datas.append(student_summary)
# Format the data in the required structure for Supabase
formatted_evaluation_data = {
'exam_name': exam_name, # Include exam name in results
'total_students': len(student_datas),
'students_evaluated': student_datas
}
# Store results in Supabase (optional — skip if credentials not configured)
unique_key = None
try:
supabase_handler = SupabaseHandler()
unique_key = supabase_handler.store_evaluation_result(teacher_email, formatted_evaluation_data, exam_name)
except Exception as supa_err:
print(f"Supabase storage skipped: {supa_err}")
# Prepare answer key info
answer_key_info = {
"title": getattr(last_processed_omr_key, 'title', 'Untitled'),
"marks_per_question": last_processed_omr_key.marks_per_question,
"negative_marking": last_processed_omr_key.negative_marking
}
# Return response in the same format as stored in Supabase
final_result = {
"success": True,
"unique_key": unique_key,
#**formatted_evaluation_data, # Include all the formatted data
"additional_info": {
"answer_key_info": answer_key_info
}
}
return jsonify(final_result)
except Exception as e:
return jsonify({
"success": False,
"error": f"Evaluation failed: {str(e)}"
}), 500
def process_with_gemini(evaluation_details, evaluation_summary, omr_data):
"""
Use Gemini to independently evaluate the OMR sheet and extract student details
"""
global last_processed_omr_key
try:
model = genai.GenerativeModel('gemini-2.5-flash')
# Prepare the questions and correct answers for Gemini
questions_and_answers = ""
for i, (q_num, correct_answer) in enumerate(sorted(last_processed_omr_key.answers.items())):
question_text = last_processed_omr_key.questions[i] if i < len(last_processed_omr_key.questions) else f"Question {q_num}"
questions_and_answers += f"Question {q_num}: {question_text}\nCorrect Answer: {correct_answer}\n\n"
prompt = f"""
You are a teacher grading an OMR answer sheet.
STUDENT INFO: Extract the student's name and roll number from the image.
GRADING TASK: For each question, identify which bubble (A, B, C, or D) is filled/darkened, then compare with the correct answer.
QUESTIONS AND CORRECT ANSWERS:
{questions_and_answers}
IMPORTANT: Look carefully at each row of bubbles. A filled bubble will be darkened/shaded, while empty bubbles will be white/clear.
Respond in this EXACT JSON format:
{{
"student_info": {{
"name": "extracted student name",
"roll_no": "extracted roll number"
}},
"gemini_evaluation": [
{{"question": 1, "marked_answer": "C", "correct_answer": "C", "status": "Correct"}},
{{"question": 2, "marked_answer": "D", "correct_answer": "D", "status": "Correct"}},
// ... continue for all questions
]
}}
For status: use "Correct", "Wrong", or "Missing" only.
For marked_answer: use "A", "B", "C", "D", or null if no bubble is clearly filled.
"""
# Get the image - we need to retrieve it from the last processed OMR
# Since we don't store the image directly, we'll need to work with what we have
# For now, let's assume we have access to the image file
# Check if we have image data stored
if 'image_data' in omr_data:
# If we have base64 image data
image_data = omr_data['image_data']
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
elif 'filename' in omr_data:
# Try to find the image file
try:
# Look for the image in common locations
possible_paths = [
f"Images/{omr_data['filename']}",
f"temp/{omr_data['filename']}",
omr_data['filename']
]
image = None
for path in possible_paths:
if os.path.exists(path):
image = Image.open(path)
break
if image is None:
# If we can't find the image, return a fallback result
return {
"student_info": {
"name": "Image not available",
"roll_number": "Image not available"
},
"verification": {
"evaluation_correct": "unknown",
"confidence": "low",
"discrepancies": ["Original image not available for verification"],
"notes": "Could not verify due to missing image file"
},
"gemini_evaluation": []
}
except Exception as e:
print(f"Error loading image: {str(e)}")
return {
"student_info": {
"name": "Error loading image",
"roll_number": "Error loading image"
},
"verification": {
"evaluation_correct": "unknown",
"confidence": "low",
"discrepancies": [f"Error loading image: {str(e)}"],
"notes": "Image processing failed"
},
"gemini_evaluation": []
}
else:
# No image reference available
return {
"student_info": {
"name": "No image data",
"roll_number": "No image data"
},
"verification": {
"evaluation_correct": "unknown",
"confidence": "low",
"discrepancies": ["No image data available"],
"notes": "Cannot verify without image"
},
"gemini_evaluation": []
}
# Generate content with Gemini
response = model.generate_content([prompt, image])
result_text = response.text.strip()
print(f"Gemini raw response: {result_text}")
# Parse the JSON response
try:
# Clean the response - remove markdown formatting if present
if "```json" in result_text:
result_text = result_text.split("```json")[1].split("```")[0].strip()
elif "```" in result_text:
result_text = result_text.split("```")[1].strip()
parsed_result = json.loads(result_text)
# Update summary counts and score based on the evaluation
if 'gemini_evaluation' in parsed_result:
correct_count = sum(1 for item in parsed_result['gemini_evaluation'] if item.get('status') == 'Correct')
wrong_count = sum(1 for item in parsed_result['gemini_evaluation'] if item.get('status') == 'Wrong')
missing_count = sum(1 for item in parsed_result['gemini_evaluation'] if item.get('status') == 'Missing')
score = (correct_count * last_processed_omr_key.marks_per_question) - (wrong_count * last_processed_omr_key.negative_marking)
max_score = len(last_processed_omr_key.answers) * last_processed_omr_key.marks_per_question
parsed_result['summary'] = {
"total_questions": len(last_processed_omr_key.answers),
"correct_count": correct_count,
"wrong_count": wrong_count,
"missing_count": missing_count,
"score": score,
"max_score": max_score,
"percentage": round((score / max_score) * 100, 2) if max_score > 0 else 0
}
return parsed_result
except json.JSONDecodeError as e:
print(f"Failed to parse Gemini JSON response: {e}")
print(f"Raw response: {result_text}")
# Fallback response with extracted text attempt
return {
"student_info": {
"name": "Parse error",
"roll_number": "Parse error"
},
"verification": {
"evaluation_correct": "unknown",
"confidence": "low",
"discrepancies": ["Failed to parse Gemini response"],
"notes": f"JSON parse error: {str(e)}"
},
"gemini_evaluation": [],
"raw_response": result_text # Include raw response for debugging
}
# print(f"Error in Gemini processing: {str(e)}")
except Exception as e:
return {
"student_info": {
"name": "Processing error",
"roll_number": "Processing error"
},
"verification": {
"evaluation_correct": "unknown",
"confidence": "low",
"discrepancies": [f"Gemini processing error: {str(e)}"],
"notes": "Failed to process with Gemini"
},
"gemini_evaluation": []
}
def compare_evaluations(our_evaluation, gemini_evaluation):
"""
Compare our automated evaluation with Gemini's independent evaluation
"""
if not gemini_evaluation:
return {
"comparison_available": False,
"reason": "Gemini evaluation not available"
}
matches = 0
differences = []
total_compared = 0
# Create a lookup for our evaluation
our_eval_lookup = {detail['question_number']: detail for detail in our_evaluation}
for gemini_item in gemini_evaluation:
q_num = gemini_item.get('question')
if q_num in our_eval_lookup:
total_compared += 1
our_status = our_eval_lookup[q_num]['status']
gemini_status = gemini_item.get('status')
if our_status == gemini_status:
matches += 1
else:
differences.append({
"question": q_num,
"our_evaluation": {
"marked_answer": our_eval_lookup[q_num]['marked_answer'],
"status": our_status
},
"gemini_evaluation": {
"marked_answer": gemini_item.get('marked_answer'),
"status": gemini_status
}
})
agreement_rate = (matches / total_compared) * 100 if total_compared > 0 else 0
return {
"comparison_available": True,
"total_questions_compared": total_compared,
"agreements": matches,
"differences_count": len(differences),
"agreement_rate": round(agreement_rate, 2),
"differences": differences
}
# Also need to modify the process_omr endpoint to store image data for later use
@app.route('/process_omr', methods=['POST'])
def process_omr_enhanced():
"""
Enhanced OMR processing that stores image data for later Gemini processing
"""
global last_processed_omr_results
global OMR_IMAGES
global porcessed_omr_results
OMR_IMAGES = []
porcessed_omr_results = []
try:
results = []
print("Starting OMR processing...")
# Check if files were uploaded
if 'images' in request.files:
files = request.files.getlist('images')
results = []
for idx, file in enumerate(files):
if file.filename == '':
continue
print(f"===================================== Processing file {file.filename} =====================================")
name, extension = os.path.splitext(file.filename)
filename = os.path.join("OMRChecker", "inputs", "OMRImage" + extension)
file.save(filename)
OMR_IMAGES.append(Image.open(filename))
result = subprocess.run([sys.executable, os.path.join('OMRChecker', 'main.py'), '--inputDir=' + os.path.join('OMRChecker', 'inputs')])
print("OMR Finished Processing Successfully")
folder = os.path.join("outputs", "Results")
csv_files = [f for f in os.listdir(folder) if f.endswith(".csv")]
print("CSV FILES:", csv_files)
result_file = os.path.join(folder, csv_files[0])
print("Found Result File", result_file)
df = pd.read_csv(result_file)
# Convert to JSON
data_json = df.to_json(orient="records")
parsed_json = json.loads(data_json)
columns_dict = df.to_dict(orient="list")
print(columns_dict)
questions_only = {k.replace("q", ""): v[0] for k, v in columns_dict.items() if k.startswith("q")}
last_processed_omr_results = questions_only
porcessed_omr_results.append(questions_only)
if os.path.exists(result_file):
os.remove(result_file)
print(f"{result_file} deleted")
return jsonify(parsed_json)
else:
return jsonify({
"success": False,
"error": "No images provided. Use 'images' field for file uploads.",
"results": []
}), 400
except Exception as e:
return jsonify({
"success": False,
"error": f"Server error: {str(e)}",
"results": []
}), 500
@app.route('/get_question_details/<int:question_number>', methods=['GET'])
def get_question_details(question_number):
"""Get detailed information about a specific question"""
global last_processed_omr_key
if last_processed_omr_key is None:
return jsonify({
'error': 'No answer key has been processed yet'
}), 404
question_data = last_processed_omr_key.get_question_details(question_number)
if question_data is None:
return jsonify({
'error': f'Question number {question_number} not found'
}), 404
return jsonify({
'success': True,
'question_data': question_data
})
@app.route('/debug_parsing', methods=['GET'])
def debug_parsing():
"""
Debug endpoint to see how OCR text is being parsed
"""
if not ocr_extracted_texts:
return jsonify({'error': 'No OCR extracted texts available.'}), 400
debug_results = []
for ocr_text in ocr_extracted_texts:
parsed_answers = improved_clean_and_parse_ocr_text(ocr_text)
debug_results.append({
'original_ocr_text': ocr_text,
'parsed_answers': parsed_answers
})
return jsonify({'debug_results': debug_results})
def extract_omr_metadata(text: str) -> tuple:
"""Extract title and duration from the question paper text"""
title = ""
duration = ""
# Look for title (usually in first few lines, often in caps)
lines = text.split('\n')
for line in lines[:5]: # Check first 5 lines
if line.strip().upper() == line.strip() and len(line.strip()) > 10:
title = line.strip()
break
# Look for duration/time
time_pattern = r'Time:\s*(\d+)\s*(minutes|mins|min)'
duration_match = re.search(time_pattern, text, re.IGNORECASE)
if duration_match:
duration = f"{duration_match.group(1)} minutes"
return title, duration
def extract_omr_answers(text: str) -> dict:
"""Extract answers from the question paper text"""
answers = {}
questions = []
question_data = []
current_question = None
print("\nStarting answer extraction...")
# Split text into lines and process line by line
lines = [line.strip() for line in text.split('\n') if line.strip()]
# Skip header lines until we find the first question
started = False
current_dict = None
for line in lines:
print(f"Processing line: {line}")
# Skip header or empty lines
if not started:
if line.startswith('1.'):
started = True
else:
continue
# Check for new question
question_match = re.match(r'^(\d+)[.)](.*?)$', line)
if question_match:
# Save previous question if exists
if current_dict:
question_data.append(current_dict)
# Start new question
q_num = int(question_match.group(1))
q_text = question_match.group(2).strip()
current_dict = {
'number': q_num,
'question': q_text,
'options': {},
'answer': None
}
continue
# Check for options
option_match = re.match(r'^([A-D])[).](.*?)$', line)
if option_match and current_dict is not None:
opt_letter = option_match.group(1)
opt_text = option_match.group(2).strip()
current_dict['options'][opt_letter] = opt_text
continue
# Check for answer
answer_match = re.match(r'^\s*Answer[:\s]*([A-D]|.+)$', line, re.IGNORECASE)
if answer_match and current_dict is not None:
answer = answer_match.group(1).strip()
# print(f"For Question: {current_dict['number']}, Options are:")
# print(current_dict['options'])
for opt_letter, opt_text in current_dict['options'].items():
if answer.lower() == opt_text.lower():
answer = opt_letter
break
current_dict['answer'] = answer
continue
# Add last question
if current_dict:
question_data.append(current_dict)
print("\nExtracted Question Data:")
for q in question_data:
print(f"\nQuestion {q['number']}:")
print(f"Text: {q['question']}")
print(f"Options: {q['options']}")
print(f"Answer: {q['answer']}")
# Add to return format
if q['answer']:
answers[q['number']] = q['answer']
questions.append(f"{q['number']}. {q['question']}")
print(f"\nExtracted {len(questions)} questions and {len(answers)} answers")
print("Questions:", questions)
print("Answers:", answers)
return answers, questions
def debug_text_extraction(text: str):
"""Helper function to debug text extraction issues"""
print("=== Extracted Text ===")
print(text)
print("\n=== Line by Line Analysis ===")
for line in text.split('\n'):
if line.strip():
print(f"Line: {line.strip()}")
@app.route('/process_omr_answer_key', methods=['POST'])
def process_omr_answer_key():
"""
Process OMR answer key from either:
1. JSON format with direct answers
2. PDF/Image of question paper with answers marked
For JSON format:
{
"answers": {
"1": "A",
"2": "B",
...
},
"marks_per_question": 1.0, # optional, defaults to 1
"negative_marking": 0.0 # optional, defaults to 0
}
For PDF/Image:
multipart/form-data with 'file' field containing the question paper
"""
global last_processed_omr_key
try:
omr_key = OMRAnswerKey()
# Check if file upload or JSON
if 'file' in request.files:
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Create Images directory if it doesn't exist
images_dir = os.path.join(app.root_path, 'Images')
os.makedirs(images_dir, exist_ok=True)
if file.filename.lower().endswith('.pdf'):
# Save and process PDF
answer_key_path = os.path.join(images_dir, "omr_answer_key.pdf")
file.save(answer_key_path)
omr_key.path = answer_key_path
# Convert PDF to images and extract text
all_text = ""
try:
print(f"\nProcessing PDF file: {answer_key_path}")
images_from_pdf = convert_from_path(
answer_key_path,
poppler_path=_get_poppler_path(),
dpi=300 # Increase DPI for better quality
)
print(f"Converted PDF to {len(images_from_pdf)} images")
for idx, page_image in enumerate(images_from_pdf):
print(f"\nProcessing page {idx + 1}")
# Preprocess the image for better OCR
# Convert to numpy array
img_np = np.array(page_image)
# Convert to grayscale
gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY)
# Apply thresholding to get black and white image
_, threshold = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# Save processed image for debugging
debug_image_path = os.path.join(images_dir, f"debug_page_{idx + 1}.png")
cv2.imwrite(debug_image_path, threshold)
print(f"Saved processed image to {debug_image_path}")
# Configure Tesseract parameters for better accuracy
custom_config = r'--oem 3 --psm 6'
text = pytesseract.image_to_string(threshold, config=custom_config)
print(f"Extracted text length: {len(text)}")
all_text += text + "\n"
print("\nTotal extracted text length:", len(all_text))
except Exception as e:
print(f"Error during PDF processing: {str(e)}")
raise
# Debug the extracted text
print("\nDebugging PDF extraction:")
debug_text_extraction(all_text)
# Extract metadata and answers
title, duration = extract_omr_metadata(all_text)
answers, questions = extract_omr_answers(all_text)
print("\nExtracted answers:", answers)
omr_key.set_metadata(title, duration)
omr_key.set_answers(answers)
omr_key.questions = questions
else:
# Process as image
answer_key_path = os.path.join(images_dir, "omr_answer_key.png")
file.save(answer_key_path)
omr_key.path = answer_key_path
image = Image.open(answer_key_path)
text = pytesseract.image_to_string(image)
# Debug the extracted text
print("\nDebugging Image extraction:")
debug_text_extraction(text)
# Extract metadata and answers
title, duration = extract_omr_metadata(text)
answers, questions = extract_omr_answers(text)
print("\nStructured Extraction Results:")
print("Title:", title)
print("Duration:", duration)
print("\nQuestions found:", len(questions))
print("Answers found:", len(answers))
print("\nAnswers:", answers)
omr_key.set_metadata(title, duration)
omr_key.set_answers(answers)
omr_key.questions = questions
# Set default marking scheme
marks_per_question = float(request.form.get('marks_per_question', 1.0))
negative_marking = float(request.form.get('negative_marking', 0.0))
else:
# Process JSON input
if not request.is_json:
return jsonify({'error': 'Request must be JSON or file upload'}), 400
data = request.get_json()
if 'answers' not in data:
return jsonify({'error': 'Answer key must be provided'}), 400
# Validate answer format
answer_key = data['answers']
for q_num, answer in answer_key.items():
try:
q_num = int(q_num)
if not isinstance(answer, str) or answer.upper() not in ['A', 'B', 'C', 'D']:
return jsonify({
'error': f'Invalid answer format for question {q_num}. Must be A, B, C, or D'
}), 400
except ValueError:
return jsonify({
'error': f'Question numbers must be integers, got {q_num}'
}), 400
# Set the answers
omr_key.set_answers(answer_key)
# Set metadata if provided
title = data.get('title', '')
duration = data.get('duration', '')
omr_key.set_metadata(title, duration)
# Set marking scheme
marks_per_question = float(data.get('marks_per_question', 1.0))
negative_marking = float(data.get('negative_marking', 0.0))
# Set marking scheme
omr_key.set_marking_scheme(marks_per_question, negative_marking)
# Store globally
last_processed_omr_key = omr_key
return jsonify({
'success': True,
'message': 'OMR answer key processed successfully',
'answer_key': omr_key.to_dict()
})
except Exception as e:
return jsonify({
'error': f'Failed to process answer key: {str(e)}'
}), 500
if __name__ == '__main__':
app.run(
host="0.0.0.0",
port=int(os.environ.get("PORT", 5000)),
debug=os.environ.get("FLASK_DEBUG", "false").lower() == "true"
)