Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -14,11 +14,11 @@ import cv2
|
|
| 14 |
import numpy as np
|
| 15 |
from concurrent.futures import ThreadPoolExecutor, as_completed
|
| 16 |
from PyPDF2 import PdfReader, PdfWriter
|
| 17 |
-
|
| 18 |
# ---------------- CONFIG ----------------
|
| 19 |
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
|
| 20 |
GRID_ROWS, GRID_COLS = 20, 14
|
| 21 |
-
|
| 22 |
# ---------------- PROMPTS ----------------
|
| 23 |
PROMPTS = {
|
| 24 |
"QP_MS_TRANSCRIPTION" : {
|
|
@@ -55,7 +55,8 @@ Answer 2 :
|
|
| 55 |
"""
|
| 56 |
}
|
| 57 |
,
|
| 58 |
-
|
|
|
|
| 59 |
"GRADING_PROMPT": {
|
| 60 |
"role": "system",
|
| 61 |
"content": """Developer: You are an official examiner. Apply the following grading rules precisely.
|
|
@@ -74,7 +75,7 @@ Answer 2 :
|
|
| 74 |
4. Accept valid equivalent forms unless otherwise specified.
|
| 75 |
5. Apply FT where appropriate.
|
| 76 |
6. Use proper notation: M1A0, A1, etc.
|
| 77 |
-
7. Any lost mark: use red `<span style="color:red">M0</span>` and make Reason red.
|
| 78 |
---
|
| 79 |
## Output Format
|
| 80 |
Produce two sections per question/sub-question, following this structure:
|
|
@@ -99,28 +100,28 @@ NOTES:
|
|
| 99 |
"""
|
| 100 |
}
|
| 101 |
}
|
| 102 |
-
|
| 103 |
# ---------------- HELPERS ----------------
|
| 104 |
def save_as_pdf(text, filename="output.pdf"):
|
| 105 |
pdf = MarkdownPdf()
|
| 106 |
pdf.add_section(Section(text, toc=False))
|
| 107 |
pdf.save(filename)
|
| 108 |
return filename
|
| 109 |
-
|
| 110 |
def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
| 111 |
if output_path is None:
|
| 112 |
base, ext = os.path.splitext(input_path)
|
| 113 |
output_path = f"{base}_compressed{ext}"
|
| 114 |
-
|
| 115 |
try:
|
| 116 |
size = os.path.getsize(input_path)
|
| 117 |
except Exception:
|
| 118 |
return input_path
|
| 119 |
-
|
| 120 |
if size <= max_size:
|
| 121 |
print(f"βΉοΈ Not compressing {input_path} ({size/1024/1024:.2f} MB <= {max_size/1024/1024} MB)")
|
| 122 |
return input_path
|
| 123 |
-
|
| 124 |
print(f"π Compressing {input_path} ({size/1024/1024:.2f} MB) -> {output_path}")
|
| 125 |
try:
|
| 126 |
gs_cmd = [
|
|
@@ -141,8 +142,11 @@ def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
|
| 141 |
except Exception as e:
|
| 142 |
print("β Compression error:", e)
|
| 143 |
return input_path
|
| 144 |
-
|
| 145 |
def create_model():
|
|
|
|
|
|
|
|
|
|
| 146 |
try:
|
| 147 |
print("β‘ Attempting to use gemini-2.5-pro model")
|
| 148 |
model = genai.GenerativeModel("gemini-2.5-pro", generation_config={"temperature": 0})
|
|
@@ -158,7 +162,7 @@ def create_model():
|
|
| 158 |
except Exception as e:
|
| 159 |
print("β Failed to create any Gemini model:", e)
|
| 160 |
raise
|
| 161 |
-
|
| 162 |
def merge_pdfs(paths, output_path):
|
| 163 |
writer = PdfWriter()
|
| 164 |
for p in paths:
|
|
@@ -168,8 +172,12 @@ def merge_pdfs(paths, output_path):
|
|
| 168 |
with open(output_path, "wb") as f:
|
| 169 |
writer.write(f)
|
| 170 |
return output_path
|
| 171 |
-
|
| 172 |
def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=None):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 173 |
inputs = [prompt_text]
|
| 174 |
if file_upload_obj:
|
| 175 |
inputs.append(file_upload_obj)
|
|
@@ -184,9 +192,14 @@ def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=
|
|
| 184 |
raw_text = str(response)
|
| 185 |
print("π₯ Received response (chars):", len(raw_text))
|
| 186 |
return raw_text
|
| 187 |
-
|
| 188 |
# ---------------- PARSERS ----------------
|
| 189 |
def extract_question_ids_from_qpms(text):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 190 |
print("π Extracting question IDs from QP+MS transcript using regex...")
|
| 191 |
ids = []
|
| 192 |
for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\b", text):
|
|
@@ -196,7 +209,8 @@ def extract_question_ids_from_qpms(text):
|
|
| 196 |
print(f"β
Extracted {len(ids)} question IDs.")
|
| 197 |
print("IDs:", ids)
|
| 198 |
return ids
|
| 199 |
-
|
|
|
|
| 200 |
for m in re.finditer(r"(?m)^\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\s*[\.\):\-]\s", text):
|
| 201 |
qid = m.group(1).strip()
|
| 202 |
ids.append(qid)
|
|
@@ -206,8 +220,11 @@ def extract_question_ids_from_qpms(text):
|
|
| 206 |
else:
|
| 207 |
print("β οΈ No question IDs extracted; will send NA placeholder.")
|
| 208 |
return ids
|
| 209 |
-
|
| 210 |
def build_as_prompt_with_expected_ids(expected_ids):
|
|
|
|
|
|
|
|
|
|
| 211 |
if not expected_ids:
|
| 212 |
ids_block = "{NA}"
|
| 213 |
else:
|
|
@@ -230,20 +247,25 @@ AS:
|
|
| 230 |
<transcribed answer or placeholder>
|
| 231 |
"""
|
| 232 |
return prompt
|
| 233 |
-
|
| 234 |
def extract_marks_from_grading(grading_text):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 235 |
print("π Extracting awarded marks from grading output...")
|
| 236 |
grading_json = {"grading": []}
|
|
|
|
| 237 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 238 |
for block in question_blocks[1:]:
|
| 239 |
first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
|
| 240 |
-
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^
|
| 241 |
if not q_id_match:
|
| 242 |
q_id = first_line.split()[0] if first_line else ""
|
| 243 |
else:
|
| 244 |
q_id = q_id_match.group(1).strip()
|
| 245 |
awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
|
| 246 |
-
# π΄ Change 1: DO NOT deduplicate, keep all marks in sequence
|
| 247 |
grading_json["grading"].append({
|
| 248 |
"question": q_id,
|
| 249 |
"marks_awarded": awarded
|
|
@@ -251,34 +273,31 @@ def extract_marks_from_grading(grading_text):
|
|
| 251 |
print("β
Extracted grading marks for", len(grading_json["grading"]), "question blocks.")
|
| 252 |
print(json.dumps(grading_json, indent=2))
|
| 253 |
return grading_json
|
| 254 |
-
|
| 255 |
# ---------------- MAPPING/IMPRINT HELPERS ----------------
|
| 256 |
-
def ask_gemini_for_mapping_for_page(model, image_path, grading_json, rows=GRID_ROWS, cols=GRID_COLS
|
| 257 |
-
|
| 258 |
-
|
| 259 |
-
|
|
|
|
|
|
|
| 260 |
ids_block = "{\n" + "\n".join(expected_ids) + "\n}"
|
| 261 |
-
|
| 262 |
-
|
| 263 |
-
|
| 264 |
-
|
| 265 |
-
|
| 266 |
-
The only questions you should spot are listed here:
|
| 267 |
-
{ids_block}
|
| 268 |
|
| 269 |
-
|
| 270 |
-
|
| 271 |
-
|
| 272 |
-
- Do not place marks inside another question's answer area.
|
| 273 |
-
- Prefer placing the marks in a BLANK cell immediately to the RIGHT of the answer step. If no blank cell is available to the right, then place in a blank cell to the LEFT.
|
| 274 |
-
- Never place marks above or below the answer.
|
| 275 |
-
- If
|
| 276 |
-
|
| 277 |
-
|
| 278 |
-
|
| 279 |
-
|
| 280 |
-
Grading JSON:
|
| 281 |
-
{json.dumps(grading_json, indent=2)}
|
| 282 |
"""
|
| 283 |
print(f"π‘ Sending mapping request for image {image_path} to Gemini...")
|
| 284 |
img = Image.open(image_path)
|
|
@@ -307,115 +326,250 @@ Grading JSON:
|
|
| 307 |
pass
|
| 308 |
print("β οΈ Failed to parse mapping JSON for", image_path)
|
| 309 |
return []
|
| 310 |
-
# ---------------- IMPRINTING ----------------
|
| 311 |
-
def imprint_marks_using_mapping(image_path, mapping, output_path, rows=GRID_ROWS, cols=GRID_COLS):
|
| 312 |
-
print(f"ποΈ Imprinting marks on {image_path} -> {output_path}")
|
| 313 |
-
img = cv2.imread(image_path)
|
| 314 |
-
h, w, _ = img.shape
|
| 315 |
-
cell_h, cell_w = h // rows, w // cols
|
| 316 |
-
|
| 317 |
-
for entry in mapping:
|
| 318 |
-
try:
|
| 319 |
-
q = entry["question"]
|
| 320 |
-
cell_num = int(entry["cell_number"])
|
| 321 |
-
awarded = entry.get("marks_awarded", [])
|
| 322 |
-
row = (cell_num - 1) // cols
|
| 323 |
-
col = (cell_num - 1) % cols
|
| 324 |
-
x = col * cell_w + 5
|
| 325 |
-
y = row * cell_h + 20
|
| 326 |
-
mark_text = f"{q}: {' '.join(awarded)}"
|
| 327 |
-
cv2.putText(img, mark_text, (x, y),
|
| 328 |
-
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
|
| 329 |
-
except Exception as e:
|
| 330 |
-
print("β οΈ Imprint error for entry:", entry, "|", e)
|
| 331 |
-
|
| 332 |
-
cv2.imwrite(output_path, img)
|
| 333 |
-
return output_path
|
| 334 |
-
|
| 335 |
-
|
| 336 |
-
# ---------------- MAIN PIPELINE ----------------
|
| 337 |
-
def align_and_grade_pipeline(qp_ms_pdf, as_pdf):
|
| 338 |
-
model = create_model()
|
| 339 |
-
|
| 340 |
-
# Step 1: Transcribe QP + MS
|
| 341 |
-
print("π Transcribing QP+MS PDF...")
|
| 342 |
-
qpms_text = gemini_generate_content(model, PROMPTS["QP_MS_TRANSCRIPTION"]["content"], file_upload_obj=qp_ms_pdf)
|
| 343 |
-
|
| 344 |
-
# Step 2: Extract IDs
|
| 345 |
-
expected_ids = extract_question_ids_from_qpms(qpms_text)
|
| 346 |
-
|
| 347 |
-
# Step 3: Transcribe AS
|
| 348 |
-
print("π Transcribing Answer Sheet PDF...")
|
| 349 |
-
as_prompt = build_as_prompt_with_expected_ids(expected_ids)
|
| 350 |
-
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=as_pdf)
|
| 351 |
-
|
| 352 |
-
# Step 4: Grade
|
| 353 |
-
grading_prompt = PROMPTS["GRADING_PROMPT"]["content"] + "\n\n" + \
|
| 354 |
-
"QP+MS Transcript:\n" + qpms_text + "\n\nAS Transcript:\n" + as_text
|
| 355 |
-
grading_text = gemini_generate_content(model, grading_prompt)
|
| 356 |
-
|
| 357 |
-
# Step 5: Extract marks JSON
|
| 358 |
-
grading_json = extract_marks_from_grading(grading_text)
|
| 359 |
-
|
| 360 |
-
# Step 6: Convert AS to images
|
| 361 |
-
images = convert_from_path(as_pdf, dpi=200)
|
| 362 |
-
temp_dir = tempfile.mkdtemp()
|
| 363 |
-
image_paths = []
|
| 364 |
-
for i, img in enumerate(images):
|
| 365 |
-
img_path = os.path.join(temp_dir, f"page_{i+1}.png")
|
| 366 |
-
img.save(img_path, "PNG")
|
| 367 |
-
image_paths.append(img_path)
|
| 368 |
-
|
| 369 |
-
# Step 7: Mapping for each page
|
| 370 |
-
mappings = []
|
| 371 |
-
for img_path in image_paths:
|
| 372 |
-
mapping = ask_gemini_for_mapping_for_page(model, img_path, grading_json,
|
| 373 |
-
rows=GRID_ROWS, cols=GRID_COLS,
|
| 374 |
-
expected_ids=expected_ids)
|
| 375 |
-
# Merge awarded marks into mapping
|
| 376 |
-
for entry in mapping:
|
| 377 |
-
for g in grading_json["grading"]:
|
| 378 |
-
if g["question"] == entry["question"]:
|
| 379 |
-
entry["marks_awarded"] = g["marks_awarded"]
|
| 380 |
-
mappings.append((img_path, mapping))
|
| 381 |
-
|
| 382 |
-
# Step 8: Imprint marks
|
| 383 |
-
imprinted_paths = []
|
| 384 |
-
for img_path, mapping in mappings:
|
| 385 |
-
out_path = img_path.replace(".png", "_imprinted.png")
|
| 386 |
-
imprint_marks_using_mapping(img_path, mapping, out_path)
|
| 387 |
-
imprinted_paths.append(out_path)
|
| 388 |
-
|
| 389 |
-
# Step 9: Convert to PDF
|
| 390 |
-
output_pdf = os.path.join(temp_dir, "final_output.pdf")
|
| 391 |
-
with open(output_pdf, "wb") as f:
|
| 392 |
-
f.write(img2pdf.convert(imprinted_paths))
|
| 393 |
-
|
| 394 |
-
compressed_pdf = compress_pdf(output_pdf)
|
| 395 |
-
return grading_text, compressed_pdf
|
| 396 |
-
|
| 397 |
-
|
| 398 |
-
# ---------------- GRADIO UI ----------------
|
| 399 |
-
def run_gradio():
|
| 400 |
-
with gr.Blocks() as demo:
|
| 401 |
-
gr.Markdown("# π Automated Exam Grader (QP + MS + AS)")
|
| 402 |
-
|
| 403 |
-
with gr.Row():
|
| 404 |
-
qpms_file = gr.File(label="Upload Question Paper + Markscheme PDF", file_types=[".pdf"])
|
| 405 |
-
as_file = gr.File(label="Upload Student Answer Sheet PDF", file_types=[".pdf"])
|
| 406 |
-
|
| 407 |
-
run_btn = gr.Button("Run Alignment + Grading")
|
| 408 |
-
grading_output = gr.Textbox(label="Grading Report (Markdown)", lines=20)
|
| 409 |
-
final_pdf = gr.File(label="Download Final Imprinted PDF")
|
| 410 |
-
|
| 411 |
-
def process(qpms_pdf, as_pdf):
|
| 412 |
-
grading_text, pdf_path = align_and_grade_pipeline(qpms_pdf, as_pdf)
|
| 413 |
-
return grading_text, pdf_path
|
| 414 |
-
|
| 415 |
-
run_btn.click(process, inputs=[qpms_file, as_file], outputs=[grading_output, final_pdf])
|
| 416 |
|
| 417 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 418 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 419 |
|
| 420 |
-
if
|
| 421 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 14 |
import numpy as np
|
| 15 |
from concurrent.futures import ThreadPoolExecutor, as_completed
|
| 16 |
from PyPDF2 import PdfReader, PdfWriter
|
| 17 |
+
|
| 18 |
# ---------------- CONFIG ----------------
|
| 19 |
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
|
| 20 |
GRID_ROWS, GRID_COLS = 20, 14
|
| 21 |
+
|
| 22 |
# ---------------- PROMPTS ----------------
|
| 23 |
PROMPTS = {
|
| 24 |
"QP_MS_TRANSCRIPTION" : {
|
|
|
|
| 55 |
"""
|
| 56 |
}
|
| 57 |
,
|
| 58 |
+
|
| 59 |
+
# GRADING_PROMPT unchanged except we will print steps around calling it
|
| 60 |
"GRADING_PROMPT": {
|
| 61 |
"role": "system",
|
| 62 |
"content": """Developer: You are an official examiner. Apply the following grading rules precisely.
|
|
|
|
| 75 |
4. Accept valid equivalent forms unless otherwise specified.
|
| 76 |
5. Apply FT where appropriate.
|
| 77 |
6. Use proper notation: M1A0, A1, etc.
|
| 78 |
+
7. Any lost mark: use red `<span style=\"color:red\">M0</span>` and make Reason red.
|
| 79 |
---
|
| 80 |
## Output Format
|
| 81 |
Produce two sections per question/sub-question, following this structure:
|
|
|
|
| 100 |
"""
|
| 101 |
}
|
| 102 |
}
|
| 103 |
+
|
| 104 |
# ---------------- HELPERS ----------------
|
| 105 |
def save_as_pdf(text, filename="output.pdf"):
|
| 106 |
pdf = MarkdownPdf()
|
| 107 |
pdf.add_section(Section(text, toc=False))
|
| 108 |
pdf.save(filename)
|
| 109 |
return filename
|
| 110 |
+
|
| 111 |
def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
| 112 |
if output_path is None:
|
| 113 |
base, ext = os.path.splitext(input_path)
|
| 114 |
output_path = f"{base}_compressed{ext}"
|
| 115 |
+
|
| 116 |
try:
|
| 117 |
size = os.path.getsize(input_path)
|
| 118 |
except Exception:
|
| 119 |
return input_path
|
| 120 |
+
|
| 121 |
if size <= max_size:
|
| 122 |
print(f"βΉοΈ Not compressing {input_path} ({size/1024/1024:.2f} MB <= {max_size/1024/1024} MB)")
|
| 123 |
return input_path
|
| 124 |
+
|
| 125 |
print(f"π Compressing {input_path} ({size/1024/1024:.2f} MB) -> {output_path}")
|
| 126 |
try:
|
| 127 |
gs_cmd = [
|
|
|
|
| 142 |
except Exception as e:
|
| 143 |
print("β Compression error:", e)
|
| 144 |
return input_path
|
| 145 |
+
|
| 146 |
def create_model():
|
| 147 |
+
"""
|
| 148 |
+
Create the Gemini model and print which model is selected.
|
| 149 |
+
"""
|
| 150 |
try:
|
| 151 |
print("β‘ Attempting to use gemini-2.5-pro model")
|
| 152 |
model = genai.GenerativeModel("gemini-2.5-pro", generation_config={"temperature": 0})
|
|
|
|
| 162 |
except Exception as e:
|
| 163 |
print("β Failed to create any Gemini model:", e)
|
| 164 |
raise
|
| 165 |
+
|
| 166 |
def merge_pdfs(paths, output_path):
|
| 167 |
writer = PdfWriter()
|
| 168 |
for p in paths:
|
|
|
|
| 172 |
with open(output_path, "wb") as f:
|
| 173 |
writer.write(f)
|
| 174 |
return output_path
|
| 175 |
+
|
| 176 |
def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=None):
|
| 177 |
+
"""
|
| 178 |
+
Send prompt_text and optionally an uploaded file (or an image object) to the model.
|
| 179 |
+
Returns textual response and prints progress.
|
| 180 |
+
"""
|
| 181 |
inputs = [prompt_text]
|
| 182 |
if file_upload_obj:
|
| 183 |
inputs.append(file_upload_obj)
|
|
|
|
| 192 |
raw_text = str(response)
|
| 193 |
print("π₯ Received response (chars):", len(raw_text))
|
| 194 |
return raw_text
|
| 195 |
+
|
| 196 |
# ---------------- PARSERS ----------------
|
| 197 |
def extract_question_ids_from_qpms(text):
|
| 198 |
+
"""
|
| 199 |
+
Extract question IDs from QP+MS transcript output.
|
| 200 |
+
We expect the QP+MS prompt to produce lines like 'Question: <id>'
|
| 201 |
+
Return a list of IDs in order of appearance, including duplicates.
|
| 202 |
+
"""
|
| 203 |
print("π Extracting question IDs from QP+MS transcript using regex...")
|
| 204 |
ids = []
|
| 205 |
for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\b", text):
|
|
|
|
| 209 |
print(f"β
Extracted {len(ids)} question IDs.")
|
| 210 |
print("IDs:", ids)
|
| 211 |
return ids
|
| 212 |
+
|
| 213 |
+
# fallback scans
|
| 214 |
for m in re.finditer(r"(?m)^\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\s*[\.\):\-]\s", text):
|
| 215 |
qid = m.group(1).strip()
|
| 216 |
ids.append(qid)
|
|
|
|
| 220 |
else:
|
| 221 |
print("β οΈ No question IDs extracted; will send NA placeholder.")
|
| 222 |
return ids
|
| 223 |
+
|
| 224 |
def build_as_prompt_with_expected_ids(expected_ids):
|
| 225 |
+
"""
|
| 226 |
+
Construct the AS transcription prompt injecting the expected IDs block.
|
| 227 |
+
"""
|
| 228 |
if not expected_ids:
|
| 229 |
ids_block = "{NA}"
|
| 230 |
else:
|
|
|
|
| 247 |
<transcribed answer or placeholder>
|
| 248 |
"""
|
| 249 |
return prompt
|
| 250 |
+
|
| 251 |
def extract_marks_from_grading(grading_text):
|
| 252 |
+
"""
|
| 253 |
+
Parse the grading markdown produced by the GRADING_PROMPT and extract marks per question.
|
| 254 |
+
Returns dict: {"grading": [{"question": "1.a", "marks_awarded": ["M1","A1"]}, ...]}
|
| 255 |
+
Preserves all marks in order, including duplicates.
|
| 256 |
+
"""
|
| 257 |
print("π Extracting awarded marks from grading output...")
|
| 258 |
grading_json = {"grading": []}
|
| 259 |
+
|
| 260 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 261 |
for block in question_blocks[1:]:
|
| 262 |
first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
|
| 263 |
+
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
|
| 264 |
if not q_id_match:
|
| 265 |
q_id = first_line.split()[0] if first_line else ""
|
| 266 |
else:
|
| 267 |
q_id = q_id_match.group(1).strip()
|
| 268 |
awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
|
|
|
|
| 269 |
grading_json["grading"].append({
|
| 270 |
"question": q_id,
|
| 271 |
"marks_awarded": awarded
|
|
|
|
| 273 |
print("β
Extracted grading marks for", len(grading_json["grading"]), "question blocks.")
|
| 274 |
print(json.dumps(grading_json, indent=2))
|
| 275 |
return grading_json
|
| 276 |
+
|
| 277 |
# ---------------- MAPPING/IMPRINT HELPERS ----------------
|
| 278 |
+
def ask_gemini_for_mapping_for_page(model, image_path, grading_json, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS):
|
| 279 |
+
"""
|
| 280 |
+
Send a single page image along with the grading_json and expected_ids; LLM should return JSON mapping.
|
| 281 |
+
"""
|
| 282 |
+
ids_block = "{NA}"
|
| 283 |
+
if expected_ids:
|
| 284 |
ids_block = "{\n" + "\n".join(expected_ids) + "\n}"
|
| 285 |
+
prompt = f"""
|
| 286 |
+
You are an exam marker. Your role is to identify where each question begins on the page.
|
| 287 |
+
The page is divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label (1..{rows*cols}).
|
| 288 |
+
For each question in the grading JSON, return the cell NUMBER where the FIRST STEP of that question begins.
|
|
|
|
|
|
|
|
|
|
| 289 |
|
| 290 |
+
IMPORTANT: Only spot and return cell numbers for the following question IDs (one per line):
|
| 291 |
+
{ids_block}
|
| 292 |
+
If you see a sub-question (e.g., ii) above a main question (e.g., Q4), infer it belongs to the previous question (e.g., Q3.ii).
|
| 293 |
+
- Do not place marks inside another question's answer area.
|
| 294 |
+
- Prefer placing the marks in a BLANK cell immediately to the RIGHT of the answer step. If no blank cell is available to the right, then place in a blank cell to the LEFT.
|
| 295 |
+
- Never place marks above or below the answer.
|
| 296 |
+
- If a question starts on a previous page, you may omit it for this page.
|
| 297 |
+
Return JSON only, like:
|
| 298 |
+
[{{"question": "1.a", "cell_number": 15}}, ...]
|
| 299 |
+
Grading JSON:
|
| 300 |
+
{json.dumps(grading_json, indent=2)}
|
|
|
|
|
|
|
| 301 |
"""
|
| 302 |
print(f"π‘ Sending mapping request for image {image_path} to Gemini...")
|
| 303 |
img = Image.open(image_path)
|
|
|
|
| 326 |
pass
|
| 327 |
print("β οΈ Failed to parse mapping JSON for", image_path)
|
| 328 |
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 329 |
|
| 330 |
+
def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS):
|
| 331 |
+
"""
|
| 332 |
+
Convert PDF to images, create grid-numbered images for sending to Gemini,
|
| 333 |
+
send all page images in parallel to Gemini for mapping, then annotate and produce imprinted PDF.
|
| 334 |
+
"""
|
| 335 |
+
print("π Converting answer PDF to images for imprinting...")
|
| 336 |
+
pages = convert_from_path(pdf_path, dpi=200)
|
| 337 |
+
annotated_page_paths = []
|
| 338 |
+
temp_grid_images = []
|
| 339 |
+
|
| 340 |
+
for p_index, page in enumerate(pages):
|
| 341 |
+
img = page.convert("RGB")
|
| 342 |
+
w, h = img.size
|
| 343 |
+
cell_w, cell_h = w / cols, h / rows
|
| 344 |
+
|
| 345 |
+
draw = ImageDraw.Draw(img)
|
| 346 |
+
try:
|
| 347 |
+
num_font = ImageFont.truetype("arial.ttf", 16)
|
| 348 |
+
except Exception:
|
| 349 |
+
num_font = ImageFont.load_default()
|
| 350 |
+
|
| 351 |
+
cell_num = 1
|
| 352 |
+
for r in range(rows):
|
| 353 |
+
for c in range(cols):
|
| 354 |
+
x = int(c * cell_w + cell_w / 2)
|
| 355 |
+
y = int(r * cell_h + cell_h / 2)
|
| 356 |
+
text = str(cell_num)
|
| 357 |
+
bbox = draw.textbbox((0, 0), text, font=num_font)
|
| 358 |
+
tw = bbox[2] - bbox[0]
|
| 359 |
+
th = bbox[3] - bbox[1]
|
| 360 |
+
draw.text((x - tw/2, y - th/2), text, fill="black", font=num_font)
|
| 361 |
+
cell_num += 1
|
| 362 |
+
|
| 363 |
+
temp_path = f"page_{p_index+1}_grid.png"
|
| 364 |
+
img.save(temp_path, "PNG")
|
| 365 |
+
temp_grid_images.append(temp_path)
|
| 366 |
+
print("π° Created grid image:", temp_path)
|
| 367 |
+
|
| 368 |
+
# Send all grid images in parallel to Gemini to get mappings
|
| 369 |
+
print("π‘ Sending all page images to Gemini in parallel for mapping...")
|
| 370 |
+
mappings_per_page = {}
|
| 371 |
+
model_local = model
|
| 372 |
+
with ThreadPoolExecutor(max_workers=min(8, len(temp_grid_images))) as ex:
|
| 373 |
+
futures = {ex.submit(ask_gemini_for_mapping_for_page, model_local, img_path, grading_json, expected_ids, rows, cols): idx
|
| 374 |
+
for idx, img_path in enumerate(temp_grid_images)}
|
| 375 |
+
for fut in as_completed(futures):
|
| 376 |
+
idx = futures[fut]
|
| 377 |
+
try:
|
| 378 |
+
mapping = fut.result()
|
| 379 |
+
except Exception as e:
|
| 380 |
+
print("β οΈ Mapping request failed for page", idx, e)
|
| 381 |
+
mapping = []
|
| 382 |
+
mappings_per_page[idx] = mapping
|
| 383 |
+
|
| 384 |
+
# Annotate original pages according to returned mappings
|
| 385 |
+
print("π Annotating pages with marks...")
|
| 386 |
+
for p_index, page in enumerate(pages):
|
| 387 |
+
page_img = page.convert("RGB")
|
| 388 |
+
img_cv = np.array(page_img)
|
| 389 |
+
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
|
| 390 |
+
h, w, _ = img_cv.shape
|
| 391 |
+
cell_w_px, cell_h_px = w / cols, h / rows
|
| 392 |
+
|
| 393 |
+
mapping = mappings_per_page.get(p_index, [])
|
| 394 |
+
occupied = set()
|
| 395 |
+
for item in mapping:
|
| 396 |
+
qid = item.get("question")
|
| 397 |
+
cell_number = item.get("cell_number")
|
| 398 |
+
if qid is None or cell_number is None:
|
| 399 |
+
continue
|
| 400 |
|
| 401 |
+
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == qid), [])
|
| 402 |
+
if not marks_list:
|
| 403 |
+
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", [])
|
| 404 |
+
if g["question"].lower() == (qid or "").lower()), [])
|
| 405 |
|
| 406 |
+
marks_text = ",".join(marks_list) if marks_list else "?"
|
| 407 |
+
|
| 408 |
+
row = (cell_number - 1) // cols
|
| 409 |
+
col = (cell_number - 1) % cols
|
| 410 |
+
|
| 411 |
+
candidates = []
|
| 412 |
+
if col + 1 < cols:
|
| 413 |
+
candidates.append((row, col + 1))
|
| 414 |
+
candidates.append((row, col))
|
| 415 |
+
if col - 1 >= 0:
|
| 416 |
+
candidates.append((row, col - 1))
|
| 417 |
+
|
| 418 |
+
chosen = None
|
| 419 |
+
for (r_c, c_c) in candidates:
|
| 420 |
+
cell_id = r_c * cols + c_c + 1
|
| 421 |
+
if cell_id not in occupied:
|
| 422 |
+
chosen = (r_c, c_c)
|
| 423 |
+
occupied.add(cell_id)
|
| 424 |
+
break
|
| 425 |
+
if chosen is None:
|
| 426 |
+
chosen = (row, col)
|
| 427 |
+
|
| 428 |
+
r_c, c_c = chosen
|
| 429 |
+
x_c = int((c_c + 1) * cell_w_px - cell_w_px * 0.1)
|
| 430 |
+
y_c = int((r_c + 0.5) * cell_h_px)
|
| 431 |
+
|
| 432 |
+
font_scale = max(0.6, min(1.6, cell_h_px / 60.0))
|
| 433 |
+
thickness = max(1, int(font_scale * 2))
|
| 434 |
+
cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX,
|
| 435 |
+
font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
|
| 436 |
+
|
| 437 |
+
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 438 |
+
cv2.imwrite(annotated_path, img_cv)
|
| 439 |
+
annotated_page_paths.append(annotated_path)
|
| 440 |
+
print("β
Annotated page saved:", annotated_path)
|
| 441 |
+
|
| 442 |
+
with open(output_pdf, "wb") as f:
|
| 443 |
+
f.write(img2pdf.convert(annotated_page_paths))
|
| 444 |
+
|
| 445 |
+
compressed = compress_pdf(output_pdf)
|
| 446 |
+
print("π Imprinted PDF saved to:", compressed)
|
| 447 |
+
return compressed
|
| 448 |
+
|
| 449 |
+
# ---------------- MAIN PIPELINE ----------------
|
| 450 |
+
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
| 451 |
+
"""
|
| 452 |
+
Final pipeline implementing requested flow and verbose console logging.
|
| 453 |
+
"""
|
| 454 |
+
try:
|
| 455 |
+
print("π Starting pipeline...")
|
| 456 |
+
# Step 0: compress as needed
|
| 457 |
+
qp_path = compress_pdf(qp_path)
|
| 458 |
+
ms_path = compress_pdf(ms_path)
|
| 459 |
+
ans_path = compress_pdf(ans_path)
|
| 460 |
+
|
| 461 |
+
# Merge QP + MS
|
| 462 |
+
merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
|
| 463 |
+
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
| 464 |
+
print("π Merged QP + MS ->", merged_qpms_path)
|
| 465 |
+
|
| 466 |
+
# Upload files to Gemini
|
| 467 |
+
print("πΌ Uploading files to Gemini...")
|
| 468 |
+
merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
|
| 469 |
+
ans_uploaded = genai.upload_file(path=ans_path, display_name="Answer Sheet")
|
| 470 |
+
print("β
Upload complete.")
|
| 471 |
+
|
| 472 |
+
# Create model and print which selected
|
| 473 |
+
model = create_model()
|
| 474 |
+
|
| 475 |
+
# Step 1.i: QP+MS transcription (first)
|
| 476 |
+
print("1.i) Transcribing QP+MS (questions first, then full markscheme)...")
|
| 477 |
+
qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
|
| 478 |
+
qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
|
| 479 |
+
print("π QP+MS transcription received. Saving debug file: debug_qpms_transcript.txt")
|
| 480 |
+
with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f:
|
| 481 |
+
f.write(qpms_text)
|
| 482 |
+
|
| 483 |
+
# Step 2: extract serial numbers (question IDs) using regex from qpms_text
|
| 484 |
+
extracted_ids = extract_question_ids_from_qpms(qpms_text)
|
| 485 |
+
if not extracted_ids:
|
| 486 |
+
extracted_ids = ["NA"]
|
| 487 |
+
|
| 488 |
+
# Step 1.ii: Build AS prompt injecting extracted IDs and transcribe AS
|
| 489 |
+
print("1.ii) Building AS transcription prompt with expected question IDs and sending to Gemini...")
|
| 490 |
+
as_prompt = build_as_prompt_with_expected_ids(extracted_ids)
|
| 491 |
+
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
|
| 492 |
+
print("π AS transcription received. Saving debug file: debug_as_transcript.txt")
|
| 493 |
+
with open("debug_as_transcript.txt", "w", encoding="utf-8") as f:
|
| 494 |
+
f.write(as_text)
|
| 495 |
+
|
| 496 |
+
# Step 3: Grading - send both transcripts to grading model
|
| 497 |
+
print("2) Preparing grading input and sending to Gemini for grading...")
|
| 498 |
+
grading_input = (
|
| 499 |
+
"=== QP+MS TRANSCRIPT BEGIN ===\n"
|
| 500 |
+
+ qpms_text
|
| 501 |
+
+ "\n=== QP+MS TRANSCRIPT END ===\n\n"
|
| 502 |
+
+ "=== ANSWER SHEET TRANSCRIPT BEGIN ===\n"
|
| 503 |
+
+ as_text
|
| 504 |
+
+ "\n=== ANSWER SHEET TRANSCRIPT END ===\n"
|
| 505 |
+
)
|
| 506 |
+
grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
|
| 507 |
+
grading_text = gemini_generate_content(model, grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input)
|
| 508 |
+
print("π§Ύ Grading output received. Saving debug file: debug_grading.md")
|
| 509 |
+
with open("debug_grading.md", "w", encoding="utf-8") as f:
|
| 510 |
+
f.write(grading_text)
|
| 511 |
+
|
| 512 |
+
# Save grading PDF
|
| 513 |
+
base_name = os.path.splitext(os.path.basename(ans_path))[0]
|
| 514 |
+
grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
|
| 515 |
+
print("π Grading PDF saved:", grading_pdf_path)
|
| 516 |
+
|
| 517 |
+
# Step 4: Extract marks for imprinting
|
| 518 |
+
grading_json = extract_marks_from_grading(grading_text)
|
| 519 |
+
with open("debug_grading_json.json", "w", encoding="utf-8") as f:
|
| 520 |
+
json.dump(grading_json, f, indent=2, ensure_ascii=False)
|
| 521 |
+
print("π§ Grading marks extraction complete.")
|
| 522 |
+
|
| 523 |
+
imprinted_pdf_path = None
|
| 524 |
+
if imprint:
|
| 525 |
+
print("β Imprint option enabled. Starting imprinting process (parallel mapping requests)...")
|
| 526 |
+
imprinted_pdf_path = f"{base_name}_imprinted.pdf"
|
| 527 |
+
imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, model, extracted_ids)
|
| 528 |
+
print("β
Imprinting finished. Imprinted PDF at:", imprinted_pdf_path)
|
| 529 |
+
|
| 530 |
+
print("π Pipeline finished successfully.")
|
| 531 |
+
return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 532 |
+
|
| 533 |
+
except Exception as e:
|
| 534 |
+
print("β Pipeline error:", e)
|
| 535 |
+
return f"β Error: {e}", None, None, None, None
|
| 536 |
+
|
| 537 |
+
# ---------------- GRADIO UI ----------------
|
| 538 |
+
with gr.Blocks(title="LeadIB AI Grading (Final Flow β Verbose)") as demo:
|
| 539 |
+
gr.Markdown("## π LeadIB AI Grading β Final Flow\nUpload **Question Paper**, **Markscheme**, and **Student Answer Sheet**.\nFlow: merge QP+MS -> transcribe QP+MS (questions first, full markscheme) -> extract IDs -> transcribe AS with expected IDs -> grade -> (optional) imprint. Console prints show progress.")
|
| 540 |
+
|
| 541 |
+
with gr.Row():
|
| 542 |
+
qp_file = gr.File(label="π Upload Question Paper (PDF)")
|
| 543 |
+
ms_file = gr.File(label="π Upload Markscheme (PDF)")
|
| 544 |
+
ans_file = gr.File(label="π Upload Student Answer Sheet (PDF)")
|
| 545 |
+
|
| 546 |
+
imprint_toggle = gr.Checkbox(label="β Imprint Marks on Student Answer Sheet", value=False)
|
| 547 |
+
run_button = gr.Button("π Run Pipeline")
|
| 548 |
+
|
| 549 |
+
with gr.Row():
|
| 550 |
+
qpms_box = gr.Textbox(label="π QP+MS Transcript", lines=12)
|
| 551 |
+
as_box = gr.Textbox(label="π AS Transcript", lines=12)
|
| 552 |
+
|
| 553 |
+
grading_output_box = gr.Textbox(label="π§Ύ Grading (Markdown)", lines=20)
|
| 554 |
+
grading_pdf_file = gr.File(label="π₯ Download Grading PDF")
|
| 555 |
+
imprint_pdf_file = gr.File(label="π₯ Download Imprinted PDF (Optional)")
|
| 556 |
+
|
| 557 |
+
def run_pipeline(qp_file_obj, ms_file_obj, ans_file_obj, imprint_flag):
|
| 558 |
+
qp_path = qp_file_obj.name
|
| 559 |
+
ms_path = ms_file_obj.name
|
| 560 |
+
ans_path = ans_file_obj.name
|
| 561 |
+
|
| 562 |
+
qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path = align_and_grade_pipeline(
|
| 563 |
+
qp_path, ms_path, ans_path, imprint=imprint_flag
|
| 564 |
+
)
|
| 565 |
+
|
| 566 |
+
return qpms_text or "", as_text or "", grading_text or "", grading_pdf_path, imprinted_pdf_path
|
| 567 |
+
|
| 568 |
+
run_button.click(
|
| 569 |
+
fn=run_pipeline,
|
| 570 |
+
inputs=[qp_file, ms_file, ans_file, imprint_toggle],
|
| 571 |
+
outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file]
|
| 572 |
+
)
|
| 573 |
+
|
| 574 |
+
if __name__ == "__main__":
|
| 575 |
+
demo.launch()
|