Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -14,7 +14,7 @@ import cv2
|
|
| 14 |
import numpy as np
|
| 15 |
from concurrent.futures import ThreadPoolExecutor, as_completed
|
| 16 |
from PyPDF2 import PdfReader, PdfWriter
|
| 17 |
-
|
| 18 |
# ---------------- CONFIG ----------------
|
| 19 |
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
|
| 20 |
GRID_ROWS, GRID_COLS = 20, 14
|
|
@@ -57,7 +57,6 @@ Answer 2 :
|
|
| 57 |
}
|
| 58 |
,
|
| 59 |
|
| 60 |
-
# GRADING_PROMPT unchanged except we will print steps around calling it
|
| 61 |
"GRADING_PROMPT": {
|
| 62 |
"role": "system",
|
| 63 |
"content": """Developer: You are an official examiner. Apply the following grading rules precisely.
|
|
@@ -82,12 +81,12 @@ Answer 2 :
|
|
| 82 |
Produce two sections per question/sub-question, following this structure:
|
| 83 |
## Question <id>
|
| 84 |
### Markscheme vs Student Answer
|
| 85 |
-
| Mark ID | Markscheme Expectation | Student
|
| 86 |
|---------|------------------------|--------------------|---------|
|
| 87 |
| M1_1 | Recognise GP | "r=0.9" | M1 |
|
| 88 |
β‘οΈ **Total: X/Y**
|
| 89 |
---
|
| 90 |
-
### Examiner
|
| 91 |
At the very end, provide a summary table:
|
| 92 |
| Question Number | Marks | Remark |
|
| 93 |
|-----------------|-------|--------|
|
|
@@ -110,18 +109,6 @@ def save_as_pdf(text, filename="output.pdf"):
|
|
| 110 |
pdf.save(filename)
|
| 111 |
return filename
|
| 112 |
|
| 113 |
-
# --- Gemini standard upload (no RAG required) ---
|
| 114 |
-
def upload_to_gemini(path, display_name=None):
|
| 115 |
-
import google.generativeai as genai
|
| 116 |
-
with open(path, "rb") as f:
|
| 117 |
-
uploaded_file = genai.upload_file(
|
| 118 |
-
file=f, # <- file object
|
| 119 |
-
mime_type="application/pdf", # <- set MIME type
|
| 120 |
-
display_name=display_name or os.path.basename(path)
|
| 121 |
-
)
|
| 122 |
-
return uploaded_file
|
| 123 |
-
|
| 124 |
-
|
| 125 |
def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
| 126 |
if output_path is None:
|
| 127 |
base, ext = os.path.splitext(input_path)
|
|
@@ -162,21 +149,37 @@ def create_model():
|
|
| 162 |
Create the Gemini model and print which model is selected.
|
| 163 |
"""
|
| 164 |
try:
|
| 165 |
-
print("β‘ Attempting to use gemini-2.
|
| 166 |
-
model = genai.GenerativeModel("gemini-2.
|
| 167 |
-
print("β
Selected model: gemini-2.
|
| 168 |
return model
|
| 169 |
except Exception as e:
|
| 170 |
-
print("β οΈ Could not use gemini-2.
|
| 171 |
try:
|
| 172 |
-
print("β‘ Falling back to gemini-
|
| 173 |
-
model = genai.GenerativeModel("gemini-
|
| 174 |
-
print("β
Selected model: gemini-
|
| 175 |
return model
|
| 176 |
except Exception as e:
|
| 177 |
print("β Failed to create any Gemini model:", e)
|
| 178 |
raise
|
| 179 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 180 |
def merge_pdfs(paths, output_path):
|
| 181 |
writer = PdfWriter()
|
| 182 |
for p in paths:
|
|
@@ -196,19 +199,14 @@ def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=
|
|
| 196 |
if file_upload_obj:
|
| 197 |
inputs.append(file_upload_obj)
|
| 198 |
if image_obj:
|
| 199 |
-
# Handle both single images and lists of images
|
| 200 |
if isinstance(image_obj, list):
|
| 201 |
-
# Convert image paths to PIL Image objects
|
| 202 |
for img_path in image_obj:
|
| 203 |
if isinstance(img_path, str):
|
| 204 |
-
# It's a file path, load as PIL Image
|
| 205 |
pil_img = Image.open(img_path)
|
| 206 |
inputs.append(pil_img)
|
| 207 |
else:
|
| 208 |
-
# It's already an image object
|
| 209 |
inputs.append(img_path)
|
| 210 |
else:
|
| 211 |
-
# Single image
|
| 212 |
if isinstance(image_obj, str):
|
| 213 |
pil_img = Image.open(image_obj)
|
| 214 |
inputs.append(pil_img)
|
|
@@ -226,22 +224,17 @@ def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=
|
|
| 226 |
|
| 227 |
# ---------------- PARSERS ----------------
|
| 228 |
def extract_question_ids_from_qpms(text: str):
|
| 229 |
-
"""Extract question IDs from QP+MS transcript.
|
| 230 |
-
Two-step approach: explicit 'Question X' lines, then fallback numbered lists.
|
| 231 |
-
Robust to hidden whitespace and simple unicode spaces."""
|
| 232 |
print("π Extracting question IDs from QP+MS transcript using regex...")
|
| 233 |
|
| 234 |
-
# Normalize spaces/tabs/non-breaking spaces
|
| 235 |
clean_text = text.replace("\u00A0", " ").replace("\t", " ")
|
| 236 |
|
| 237 |
-
# Step 1: Look for explicit "Question X" lines
|
| 238 |
primary_matches = re.findall(r"^\s*Question\s*[:\s]\s*([\dA-Za-z.()]+)", clean_text, re.MULTILINE)
|
| 239 |
if primary_matches:
|
| 240 |
print(f"β
Extracted {len(primary_matches)} question IDs from explicit 'Question X' lines.")
|
| 241 |
print("IDs:", primary_matches)
|
| 242 |
return primary_matches
|
| 243 |
|
| 244 |
-
# Step 2: Fallback β numbered/sub-question lists
|
| 245 |
fallback_matches = re.findall(r"^\s*(\d+(?:[.)]|\([a-zA-Z0-9]+\))?[a-zA-Z0-9]*)", clean_text, re.MULTILINE)
|
| 246 |
if fallback_matches:
|
| 247 |
print(f"β
Extracted {len(fallback_matches)} question IDs (fallback numbered lists).")
|
|
@@ -250,12 +243,9 @@ def extract_question_ids_from_qpms(text: str):
|
|
| 250 |
print("β οΈ No question IDs extracted; will send NA placeholder.")
|
| 251 |
return fallback_matches
|
| 252 |
|
| 253 |
-
# Update AS prompt builder to include graph detection
|
| 254 |
-
|
| 255 |
def build_as_prompt_with_expected_ids(expected_ids, qpms_text=None):
|
| 256 |
"""
|
| 257 |
Construct the AS transcription prompt injecting the expected IDs block and graph detection instructions.
|
| 258 |
-
If qpms_text is provided, instruct the LLM to refer to it for ambiguous handwriting.
|
| 259 |
"""
|
| 260 |
if not expected_ids:
|
| 261 |
ids_block = "{NA}"
|
|
@@ -288,8 +278,6 @@ AS:
|
|
| 288 |
==== GRAPH FOUND ANSWERS ====\nGraph found in:\n- Answer <number> β Page <number>\n(one per line)\n==== END GRAPH FOUND ===="""
|
| 289 |
return prompt
|
| 290 |
|
| 291 |
-
# Robust parsing functions for graph detection
|
| 292 |
-
|
| 293 |
def extract_graph_questions_from_ms(text: str):
|
| 294 |
"""Extract graph questions and page numbers from MS transcript."""
|
| 295 |
clean_text = text.replace("\u00A0", " ").replace("\t", " ")
|
|
@@ -325,9 +313,7 @@ def extract_graph_answers_from_as(text: str):
|
|
| 325 |
|
| 326 |
def extract_marks_from_grading(grading_text):
|
| 327 |
"""
|
| 328 |
-
Parse the grading markdown
|
| 329 |
-
Returns dict: {"grading": [{"question": "1.a", "marks_awarded": ["M1","A1"]}, ...]}
|
| 330 |
-
Preserves all marks in order, including duplicates.
|
| 331 |
"""
|
| 332 |
print("π Extracting awarded marks from grading output...")
|
| 333 |
grading_json = {"grading": []}
|
|
@@ -353,7 +339,6 @@ def extract_marks_from_grading(grading_text):
|
|
| 353 |
def ask_gemini_for_mapping_batch(model, image_paths, grading_json, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS):
|
| 354 |
"""
|
| 355 |
Send multiple page images together to Gemini for batch mapping processing.
|
| 356 |
-
More efficient than sending one by one.
|
| 357 |
"""
|
| 358 |
ids_block = "{NA}"
|
| 359 |
if expected_ids:
|
|
@@ -375,7 +360,6 @@ Return JSON only, like:
|
|
| 375 |
Grading JSON:
|
| 376 |
{json.dumps(grading_json, indent=2)}"""
|
| 377 |
|
| 378 |
-
# Load all images
|
| 379 |
images = [Image.open(p) for p in image_paths]
|
| 380 |
|
| 381 |
print(f"π‘ Sending batch mapping request for {len(image_paths)} pages to Gemini...")
|
|
@@ -391,7 +375,6 @@ Grading JSON:
|
|
| 391 |
print("π Gemini raw batch output:")
|
| 392 |
print(raw_text)
|
| 393 |
|
| 394 |
-
# Try to extract JSON from response
|
| 395 |
try:
|
| 396 |
match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
|
| 397 |
if match:
|
|
@@ -408,14 +391,13 @@ Grading JSON:
|
|
| 408 |
def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS):
|
| 409 |
"""
|
| 410 |
Convert PDF to images, create grid-numbered images for batch sending to Gemini,
|
| 411 |
-
then annotate and produce imprinted PDF
|
| 412 |
"""
|
| 413 |
print("π Converting answer PDF to images for imprinting...")
|
| 414 |
pages = convert_from_path(pdf_path, dpi=200)
|
| 415 |
annotated_page_paths = []
|
| 416 |
temp_grid_images = []
|
| 417 |
|
| 418 |
-
# Create grid images for Gemini
|
| 419 |
for p_index, page in enumerate(pages):
|
| 420 |
img = page.convert("RGB")
|
| 421 |
w, h = img.size
|
|
@@ -444,9 +426,8 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, expec
|
|
| 444 |
temp_grid_images.append(temp_path)
|
| 445 |
print("π° Created grid image:", temp_path)
|
| 446 |
|
| 447 |
-
# Send pages in batches to Gemini for mapping
|
| 448 |
print("π‘ Sending page images to Gemini in batches for mapping...")
|
| 449 |
-
batch_size = 10
|
| 450 |
all_mappings = []
|
| 451 |
|
| 452 |
for start in range(0, len(temp_grid_images), batch_size):
|
|
@@ -455,7 +436,6 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, expec
|
|
| 455 |
all_mappings.extend(batch_mapping)
|
| 456 |
print(f"β
Processed batch {start//batch_size + 1}: pages {start+1}-{start+len(batch_paths)}")
|
| 457 |
|
| 458 |
-
# Annotate original pages according to returned mappings
|
| 459 |
print("π Annotating pages with marks...")
|
| 460 |
for p_index, page in enumerate(pages):
|
| 461 |
page_num = p_index + 1
|
|
@@ -465,7 +445,6 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, expec
|
|
| 465 |
h, w, _ = img_cv.shape
|
| 466 |
cell_w_px, cell_h_px = w / cols, h / rows
|
| 467 |
|
| 468 |
-
# Filter mappings for this page
|
| 469 |
page_mappings = [m for m in all_mappings if m.get("page") == page_num]
|
| 470 |
|
| 471 |
for item in page_mappings:
|
|
@@ -484,11 +463,9 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, expec
|
|
| 484 |
row = (cell_number - 1) // cols
|
| 485 |
col = (cell_number - 1) % cols
|
| 486 |
|
| 487 |
-
# Position marks to the right of the answer, with fallback to left
|
| 488 |
x_c = int((col + 1) * cell_w_px - cell_w_px / 4)
|
| 489 |
y_c = int((row + 0.5) * cell_h_px)
|
| 490 |
|
| 491 |
-
# Use larger, more visible font
|
| 492 |
font_scale = max(1.0, min(2.0, cell_h_px / 40.0))
|
| 493 |
thickness = max(2, int(font_scale * 2))
|
| 494 |
cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX,
|
|
@@ -500,7 +477,6 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, expec
|
|
| 500 |
annotated_page_paths.append(annotated_path)
|
| 501 |
print("β
Annotated page saved:", annotated_path)
|
| 502 |
|
| 503 |
-
# Merge annotated pages into final PDF
|
| 504 |
print("π Merging annotated pages into final PDF...")
|
| 505 |
with open(output_pdf, "wb") as f:
|
| 506 |
f.write(img2pdf.convert(annotated_page_paths))
|
|
@@ -509,21 +485,14 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, expec
|
|
| 509 |
print("π Imprinted PDF saved to:", compressed)
|
| 510 |
return compressed
|
| 511 |
|
| 512 |
-
# ---------------- GRAPH DETECTION HELPERS ----------------
|
| 513 |
-
# These functions are now robustly handled by the new_code, so they are no longer needed here.
|
| 514 |
-
|
| 515 |
-
# ---------------- GRAPH PAGE EXTRACTION HELPER ----------------
|
| 516 |
def extract_pdf_pages_as_images(pdf_path, page_numbers, prefix):
|
| 517 |
"""
|
| 518 |
Extracts unique pages (1-based) from a PDF as images, saves as PNG, returns list of file paths.
|
| 519 |
-
Prints to console when extracting each page.
|
| 520 |
"""
|
| 521 |
unique_pages = sorted(set(page_numbers))
|
| 522 |
images = convert_from_path(pdf_path, dpi=200, first_page=min(unique_pages), last_page=max(unique_pages))
|
| 523 |
out_paths = []
|
| 524 |
for idx, page_num in enumerate(unique_pages):
|
| 525 |
-
# pdf2image returns images in order, but if not contiguous, we need to map
|
| 526 |
-
# So, get the image for this page (1-based)
|
| 527 |
img_idx = page_num - min(unique_pages)
|
| 528 |
img = images[img_idx]
|
| 529 |
out_path = f"{prefix}_page_{page_num}.png"
|
|
@@ -532,37 +501,28 @@ def extract_pdf_pages_as_images(pdf_path, page_numbers, prefix):
|
|
| 532 |
out_paths.append(out_path)
|
| 533 |
return out_paths
|
| 534 |
|
| 535 |
-
# ---------------- PIPELINE
|
| 536 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
| 537 |
"""
|
| 538 |
-
Final pipeline
|
| 539 |
-
Now includes Graph-Aware Grading logic.
|
| 540 |
"""
|
| 541 |
try:
|
| 542 |
print("π Starting pipeline...")
|
| 543 |
-
# Step 0: compress as needed
|
| 544 |
qp_path = compress_pdf(qp_path)
|
| 545 |
ms_path = compress_pdf(ms_path)
|
| 546 |
ans_path = compress_pdf(ans_path)
|
| 547 |
|
| 548 |
-
# Merge QP + MS
|
| 549 |
merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
|
| 550 |
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
| 551 |
print("π Merged QP + MS ->", merged_qpms_path)
|
| 552 |
|
| 553 |
-
# Upload files to Gemini
|
| 554 |
print("πΌ Uploading files to Gemini...")
|
| 555 |
-
|
| 556 |
merged_uploaded = upload_to_gemini(merged_qpms_path, "QP+MS (merged)")
|
| 557 |
ans_uploaded = upload_to_gemini(ans_path, "Answer Sheet")
|
| 558 |
-
|
| 559 |
-
|
| 560 |
print("β
Upload complete.")
|
| 561 |
|
| 562 |
-
# Create model and print which selected
|
| 563 |
model = create_model()
|
| 564 |
|
| 565 |
-
# Step 1.i: QP+MS transcription (first)
|
| 566 |
print("1.i) Transcribing QP+MS (questions first, then full markscheme, with graph detection)...")
|
| 567 |
qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"] + "\nAt the end, also list all questions in the markscheme where a graph is expected, in the format:\nGraph expected in:\n- Question <number> β Page <number>\n(One per line, after ==== MARKSCHEME END ====)"
|
| 568 |
qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
|
|
@@ -570,7 +530,6 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 570 |
with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f:
|
| 571 |
f.write(qpms_text)
|
| 572 |
|
| 573 |
-
# Step 1.i.a: Extract graph-expected questions from MS
|
| 574 |
ms_graph_mapping = extract_graph_questions_from_ms(qpms_text)
|
| 575 |
print("πΌοΈ Graph-expected questions in MS:", ms_graph_mapping)
|
| 576 |
ms_graph_pages = list(ms_graph_mapping.values())
|
|
@@ -578,12 +537,10 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 578 |
if ms_graph_pages:
|
| 579 |
ms_graph_images = extract_pdf_pages_as_images(merged_qpms_path, ms_graph_pages, prefix="qpms_graph")
|
| 580 |
|
| 581 |
-
# Step 2: extract serial numbers (question IDs) using regex from qpms_text
|
| 582 |
extracted_ids = extract_question_ids_from_qpms(qpms_text)
|
| 583 |
if not extracted_ids:
|
| 584 |
extracted_ids = ["NA"]
|
| 585 |
|
| 586 |
-
# Step 1.ii: Build AS prompt injecting extracted IDs and transcribe AS
|
| 587 |
print("1.ii) Building AS transcription prompt with expected question IDs and graph detection, sending to Gemini...")
|
| 588 |
as_prompt = build_as_prompt_with_expected_ids(extracted_ids, qpms_text) + "\nAt the end, also list all answers where a graph is found, in the format:\nGraph found in:\n- Answer <number> β Page <number>\n(One per line, after all answers)"
|
| 589 |
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
|
|
@@ -591,7 +548,6 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 591 |
with open("debug_as_transcript.txt", "w", encoding="utf-8") as f:
|
| 592 |
f.write(as_text)
|
| 593 |
|
| 594 |
-
# Step 2.a: Extract graph-attempted answers from AS
|
| 595 |
as_graph_mapping = extract_graph_answers_from_as(as_text)
|
| 596 |
print("πΌοΈ Graph-attempted answers in AS:", as_graph_mapping)
|
| 597 |
as_graph_pages = list(as_graph_mapping.values())
|
|
@@ -599,9 +555,6 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 599 |
if as_graph_pages:
|
| 600 |
as_graph_images = extract_pdf_pages_as_images(ans_path, as_graph_pages, prefix="as_graph")
|
| 601 |
|
| 602 |
-
# Step 3: (No graph bundle matching, just collect images)
|
| 603 |
-
|
| 604 |
-
# Step 4: Grading - send both transcripts to grading model, inject graph image info
|
| 605 |
print("2) Preparing grading input and sending to Gemini for grading...")
|
| 606 |
grading_input = (
|
| 607 |
"=== QP+MS TRANSCRIPT BEGIN ===\n"
|
|
@@ -611,24 +564,20 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 611 |
+ as_text
|
| 612 |
+ "\n=== ANSWER SHEET TRANSCRIPT END ===\n"
|
| 613 |
)
|
| 614 |
-
# Inject graph image note
|
| 615 |
if ms_graph_images or as_graph_images:
|
| 616 |
-
graph_note = "\n\n---\nSome questions require graphs. I
|
| 617 |
grading_input += graph_note
|
| 618 |
grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
|
| 619 |
-
# Pass images as additional input to gemini_generate_content
|
| 620 |
grading_images = ms_graph_images + as_graph_images
|
| 621 |
grading_text = gemini_generate_content(model, grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input, image_obj=grading_images if grading_images else None)
|
| 622 |
print("π§Ύ Grading output received. Saving debug file: debug_grading.md")
|
| 623 |
with open("debug_grading.md", "w", encoding="utf-8") as f:
|
| 624 |
f.write(grading_text)
|
| 625 |
|
| 626 |
-
# Save grading PDF
|
| 627 |
base_name = os.path.splitext(os.path.basename(ans_path))[0]
|
| 628 |
grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
|
| 629 |
print("π Grading PDF saved:", grading_pdf_path)
|
| 630 |
|
| 631 |
-
# Step 4: Extract marks for imprinting
|
| 632 |
grading_json = extract_marks_from_grading(grading_text)
|
| 633 |
with open("debug_grading_json.json", "w", encoding="utf-8") as f:
|
| 634 |
json.dump(grading_json, f, indent=2, ensure_ascii=False)
|
|
@@ -646,11 +595,13 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 646 |
|
| 647 |
except Exception as e:
|
| 648 |
print("β Pipeline error:", e)
|
|
|
|
|
|
|
| 649 |
return f"β Error: {e}", None, None, None, None
|
| 650 |
|
| 651 |
# ---------------- GRADIO UI ----------------
|
| 652 |
-
with gr.Blocks(title="
|
| 653 |
-
gr.Markdown("## π
|
| 654 |
|
| 655 |
with gr.Row():
|
| 656 |
qp_file = gr.File(label="π Upload Question Paper (PDF)")
|
|
|
|
| 14 |
import numpy as np
|
| 15 |
from concurrent.futures import ThreadPoolExecutor, as_completed
|
| 16 |
from PyPDF2 import PdfReader, PdfWriter
|
| 17 |
+
|
| 18 |
# ---------------- CONFIG ----------------
|
| 19 |
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
|
| 20 |
GRID_ROWS, GRID_COLS = 20, 14
|
|
|
|
| 57 |
}
|
| 58 |
,
|
| 59 |
|
|
|
|
| 60 |
"GRADING_PROMPT": {
|
| 61 |
"role": "system",
|
| 62 |
"content": """Developer: You are an official examiner. Apply the following grading rules precisely.
|
|
|
|
| 81 |
Produce two sections per question/sub-question, following this structure:
|
| 82 |
## Question <id>
|
| 83 |
### Markscheme vs Student Answer
|
| 84 |
+
| Mark ID | Markscheme Expectation | Student's Response | Awarded |
|
| 85 |
|---------|------------------------|--------------------|---------|
|
| 86 |
| M1_1 | Recognise GP | "r=0.9" | M1 |
|
| 87 |
β‘οΈ **Total: X/Y**
|
| 88 |
---
|
| 89 |
+
### Examiner's Report
|
| 90 |
At the very end, provide a summary table:
|
| 91 |
| Question Number | Marks | Remark |
|
| 92 |
|-----------------|-------|--------|
|
|
|
|
| 109 |
pdf.save(filename)
|
| 110 |
return filename
|
| 111 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 112 |
def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
| 113 |
if output_path is None:
|
| 114 |
base, ext = os.path.splitext(input_path)
|
|
|
|
| 149 |
Create the Gemini model and print which model is selected.
|
| 150 |
"""
|
| 151 |
try:
|
| 152 |
+
print("β‘ Attempting to use gemini-2.0-flash-exp model")
|
| 153 |
+
model = genai.GenerativeModel("gemini-2.0-flash-exp", generation_config={"temperature": 0})
|
| 154 |
+
print("β
Selected model: gemini-2.0-flash-exp")
|
| 155 |
return model
|
| 156 |
except Exception as e:
|
| 157 |
+
print("β οΈ Could not use gemini-2.0-flash-exp:", e)
|
| 158 |
try:
|
| 159 |
+
print("β‘ Falling back to gemini-1.5-flash model")
|
| 160 |
+
model = genai.GenerativeModel("gemini-1.5-flash", generation_config={"temperature": 0})
|
| 161 |
+
print("β
Selected model: gemini-1.5-flash")
|
| 162 |
return model
|
| 163 |
except Exception as e:
|
| 164 |
print("β Failed to create any Gemini model:", e)
|
| 165 |
raise
|
| 166 |
|
| 167 |
+
def upload_to_gemini(path, display_name=None):
|
| 168 |
+
"""
|
| 169 |
+
Upload a file to Gemini using the standard File API (no RAG).
|
| 170 |
+
"""
|
| 171 |
+
print(f"π€ Uploading {path} to Gemini...")
|
| 172 |
+
try:
|
| 173 |
+
uploaded_file = genai.upload_file(
|
| 174 |
+
path=path,
|
| 175 |
+
display_name=display_name or os.path.basename(path)
|
| 176 |
+
)
|
| 177 |
+
print(f"β
Uploaded: {uploaded_file.display_name} (URI: {uploaded_file.uri})")
|
| 178 |
+
return uploaded_file
|
| 179 |
+
except Exception as e:
|
| 180 |
+
print(f"β Upload failed for {path}: {e}")
|
| 181 |
+
raise
|
| 182 |
+
|
| 183 |
def merge_pdfs(paths, output_path):
|
| 184 |
writer = PdfWriter()
|
| 185 |
for p in paths:
|
|
|
|
| 199 |
if file_upload_obj:
|
| 200 |
inputs.append(file_upload_obj)
|
| 201 |
if image_obj:
|
|
|
|
| 202 |
if isinstance(image_obj, list):
|
|
|
|
| 203 |
for img_path in image_obj:
|
| 204 |
if isinstance(img_path, str):
|
|
|
|
| 205 |
pil_img = Image.open(img_path)
|
| 206 |
inputs.append(pil_img)
|
| 207 |
else:
|
|
|
|
| 208 |
inputs.append(img_path)
|
| 209 |
else:
|
|
|
|
| 210 |
if isinstance(image_obj, str):
|
| 211 |
pil_img = Image.open(image_obj)
|
| 212 |
inputs.append(pil_img)
|
|
|
|
| 224 |
|
| 225 |
# ---------------- PARSERS ----------------
|
| 226 |
def extract_question_ids_from_qpms(text: str):
|
| 227 |
+
"""Extract question IDs from QP+MS transcript."""
|
|
|
|
|
|
|
| 228 |
print("π Extracting question IDs from QP+MS transcript using regex...")
|
| 229 |
|
|
|
|
| 230 |
clean_text = text.replace("\u00A0", " ").replace("\t", " ")
|
| 231 |
|
|
|
|
| 232 |
primary_matches = re.findall(r"^\s*Question\s*[:\s]\s*([\dA-Za-z.()]+)", clean_text, re.MULTILINE)
|
| 233 |
if primary_matches:
|
| 234 |
print(f"β
Extracted {len(primary_matches)} question IDs from explicit 'Question X' lines.")
|
| 235 |
print("IDs:", primary_matches)
|
| 236 |
return primary_matches
|
| 237 |
|
|
|
|
| 238 |
fallback_matches = re.findall(r"^\s*(\d+(?:[.)]|\([a-zA-Z0-9]+\))?[a-zA-Z0-9]*)", clean_text, re.MULTILINE)
|
| 239 |
if fallback_matches:
|
| 240 |
print(f"β
Extracted {len(fallback_matches)} question IDs (fallback numbered lists).")
|
|
|
|
| 243 |
print("β οΈ No question IDs extracted; will send NA placeholder.")
|
| 244 |
return fallback_matches
|
| 245 |
|
|
|
|
|
|
|
| 246 |
def build_as_prompt_with_expected_ids(expected_ids, qpms_text=None):
|
| 247 |
"""
|
| 248 |
Construct the AS transcription prompt injecting the expected IDs block and graph detection instructions.
|
|
|
|
| 249 |
"""
|
| 250 |
if not expected_ids:
|
| 251 |
ids_block = "{NA}"
|
|
|
|
| 278 |
==== GRAPH FOUND ANSWERS ====\nGraph found in:\n- Answer <number> β Page <number>\n(one per line)\n==== END GRAPH FOUND ===="""
|
| 279 |
return prompt
|
| 280 |
|
|
|
|
|
|
|
| 281 |
def extract_graph_questions_from_ms(text: str):
|
| 282 |
"""Extract graph questions and page numbers from MS transcript."""
|
| 283 |
clean_text = text.replace("\u00A0", " ").replace("\t", " ")
|
|
|
|
| 313 |
|
| 314 |
def extract_marks_from_grading(grading_text):
|
| 315 |
"""
|
| 316 |
+
Parse the grading markdown and extract marks per question.
|
|
|
|
|
|
|
| 317 |
"""
|
| 318 |
print("π Extracting awarded marks from grading output...")
|
| 319 |
grading_json = {"grading": []}
|
|
|
|
| 339 |
def ask_gemini_for_mapping_batch(model, image_paths, grading_json, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS):
|
| 340 |
"""
|
| 341 |
Send multiple page images together to Gemini for batch mapping processing.
|
|
|
|
| 342 |
"""
|
| 343 |
ids_block = "{NA}"
|
| 344 |
if expected_ids:
|
|
|
|
| 360 |
Grading JSON:
|
| 361 |
{json.dumps(grading_json, indent=2)}"""
|
| 362 |
|
|
|
|
| 363 |
images = [Image.open(p) for p in image_paths]
|
| 364 |
|
| 365 |
print(f"π‘ Sending batch mapping request for {len(image_paths)} pages to Gemini...")
|
|
|
|
| 375 |
print("π Gemini raw batch output:")
|
| 376 |
print(raw_text)
|
| 377 |
|
|
|
|
| 378 |
try:
|
| 379 |
match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
|
| 380 |
if match:
|
|
|
|
| 391 |
def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS):
|
| 392 |
"""
|
| 393 |
Convert PDF to images, create grid-numbered images for batch sending to Gemini,
|
| 394 |
+
then annotate and produce imprinted PDF.
|
| 395 |
"""
|
| 396 |
print("π Converting answer PDF to images for imprinting...")
|
| 397 |
pages = convert_from_path(pdf_path, dpi=200)
|
| 398 |
annotated_page_paths = []
|
| 399 |
temp_grid_images = []
|
| 400 |
|
|
|
|
| 401 |
for p_index, page in enumerate(pages):
|
| 402 |
img = page.convert("RGB")
|
| 403 |
w, h = img.size
|
|
|
|
| 426 |
temp_grid_images.append(temp_path)
|
| 427 |
print("π° Created grid image:", temp_path)
|
| 428 |
|
|
|
|
| 429 |
print("π‘ Sending page images to Gemini in batches for mapping...")
|
| 430 |
+
batch_size = 10
|
| 431 |
all_mappings = []
|
| 432 |
|
| 433 |
for start in range(0, len(temp_grid_images), batch_size):
|
|
|
|
| 436 |
all_mappings.extend(batch_mapping)
|
| 437 |
print(f"β
Processed batch {start//batch_size + 1}: pages {start+1}-{start+len(batch_paths)}")
|
| 438 |
|
|
|
|
| 439 |
print("π Annotating pages with marks...")
|
| 440 |
for p_index, page in enumerate(pages):
|
| 441 |
page_num = p_index + 1
|
|
|
|
| 445 |
h, w, _ = img_cv.shape
|
| 446 |
cell_w_px, cell_h_px = w / cols, h / rows
|
| 447 |
|
|
|
|
| 448 |
page_mappings = [m for m in all_mappings if m.get("page") == page_num]
|
| 449 |
|
| 450 |
for item in page_mappings:
|
|
|
|
| 463 |
row = (cell_number - 1) // cols
|
| 464 |
col = (cell_number - 1) % cols
|
| 465 |
|
|
|
|
| 466 |
x_c = int((col + 1) * cell_w_px - cell_w_px / 4)
|
| 467 |
y_c = int((row + 0.5) * cell_h_px)
|
| 468 |
|
|
|
|
| 469 |
font_scale = max(1.0, min(2.0, cell_h_px / 40.0))
|
| 470 |
thickness = max(2, int(font_scale * 2))
|
| 471 |
cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX,
|
|
|
|
| 477 |
annotated_page_paths.append(annotated_path)
|
| 478 |
print("β
Annotated page saved:", annotated_path)
|
| 479 |
|
|
|
|
| 480 |
print("π Merging annotated pages into final PDF...")
|
| 481 |
with open(output_pdf, "wb") as f:
|
| 482 |
f.write(img2pdf.convert(annotated_page_paths))
|
|
|
|
| 485 |
print("π Imprinted PDF saved to:", compressed)
|
| 486 |
return compressed
|
| 487 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 488 |
def extract_pdf_pages_as_images(pdf_path, page_numbers, prefix):
|
| 489 |
"""
|
| 490 |
Extracts unique pages (1-based) from a PDF as images, saves as PNG, returns list of file paths.
|
|
|
|
| 491 |
"""
|
| 492 |
unique_pages = sorted(set(page_numbers))
|
| 493 |
images = convert_from_path(pdf_path, dpi=200, first_page=min(unique_pages), last_page=max(unique_pages))
|
| 494 |
out_paths = []
|
| 495 |
for idx, page_num in enumerate(unique_pages):
|
|
|
|
|
|
|
| 496 |
img_idx = page_num - min(unique_pages)
|
| 497 |
img = images[img_idx]
|
| 498 |
out_path = f"{prefix}_page_{page_num}.png"
|
|
|
|
| 501 |
out_paths.append(out_path)
|
| 502 |
return out_paths
|
| 503 |
|
| 504 |
+
# ---------------- PIPELINE ----------------
|
| 505 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
| 506 |
"""
|
| 507 |
+
Final pipeline with graph-aware grading logic.
|
|
|
|
| 508 |
"""
|
| 509 |
try:
|
| 510 |
print("π Starting pipeline...")
|
|
|
|
| 511 |
qp_path = compress_pdf(qp_path)
|
| 512 |
ms_path = compress_pdf(ms_path)
|
| 513 |
ans_path = compress_pdf(ans_path)
|
| 514 |
|
|
|
|
| 515 |
merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
|
| 516 |
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
| 517 |
print("π Merged QP + MS ->", merged_qpms_path)
|
| 518 |
|
|
|
|
| 519 |
print("πΌ Uploading files to Gemini...")
|
|
|
|
| 520 |
merged_uploaded = upload_to_gemini(merged_qpms_path, "QP+MS (merged)")
|
| 521 |
ans_uploaded = upload_to_gemini(ans_path, "Answer Sheet")
|
|
|
|
|
|
|
| 522 |
print("β
Upload complete.")
|
| 523 |
|
|
|
|
| 524 |
model = create_model()
|
| 525 |
|
|
|
|
| 526 |
print("1.i) Transcribing QP+MS (questions first, then full markscheme, with graph detection)...")
|
| 527 |
qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"] + "\nAt the end, also list all questions in the markscheme where a graph is expected, in the format:\nGraph expected in:\n- Question <number> β Page <number>\n(One per line, after ==== MARKSCHEME END ====)"
|
| 528 |
qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
|
|
|
|
| 530 |
with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f:
|
| 531 |
f.write(qpms_text)
|
| 532 |
|
|
|
|
| 533 |
ms_graph_mapping = extract_graph_questions_from_ms(qpms_text)
|
| 534 |
print("πΌοΈ Graph-expected questions in MS:", ms_graph_mapping)
|
| 535 |
ms_graph_pages = list(ms_graph_mapping.values())
|
|
|
|
| 537 |
if ms_graph_pages:
|
| 538 |
ms_graph_images = extract_pdf_pages_as_images(merged_qpms_path, ms_graph_pages, prefix="qpms_graph")
|
| 539 |
|
|
|
|
| 540 |
extracted_ids = extract_question_ids_from_qpms(qpms_text)
|
| 541 |
if not extracted_ids:
|
| 542 |
extracted_ids = ["NA"]
|
| 543 |
|
|
|
|
| 544 |
print("1.ii) Building AS transcription prompt with expected question IDs and graph detection, sending to Gemini...")
|
| 545 |
as_prompt = build_as_prompt_with_expected_ids(extracted_ids, qpms_text) + "\nAt the end, also list all answers where a graph is found, in the format:\nGraph found in:\n- Answer <number> β Page <number>\n(One per line, after all answers)"
|
| 546 |
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
|
|
|
|
| 548 |
with open("debug_as_transcript.txt", "w", encoding="utf-8") as f:
|
| 549 |
f.write(as_text)
|
| 550 |
|
|
|
|
| 551 |
as_graph_mapping = extract_graph_answers_from_as(as_text)
|
| 552 |
print("πΌοΈ Graph-attempted answers in AS:", as_graph_mapping)
|
| 553 |
as_graph_pages = list(as_graph_mapping.values())
|
|
|
|
| 555 |
if as_graph_pages:
|
| 556 |
as_graph_images = extract_pdf_pages_as_images(ans_path, as_graph_pages, prefix="as_graph")
|
| 557 |
|
|
|
|
|
|
|
|
|
|
| 558 |
print("2) Preparing grading input and sending to Gemini for grading...")
|
| 559 |
grading_input = (
|
| 560 |
"=== QP+MS TRANSCRIPT BEGIN ===\n"
|
|
|
|
| 564 |
+ as_text
|
| 565 |
+ "\n=== ANSWER SHEET TRANSCRIPT END ===\n"
|
| 566 |
)
|
|
|
|
| 567 |
if ms_graph_images or as_graph_images:
|
| 568 |
+
graph_note = "\n\n---\nSome questions require graphs. I've attached the relevant graph pages from QP+MS and from the Answer Sheet. Use them as visual context when grading.\n---\n"
|
| 569 |
grading_input += graph_note
|
| 570 |
grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
|
|
|
|
| 571 |
grading_images = ms_graph_images + as_graph_images
|
| 572 |
grading_text = gemini_generate_content(model, grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input, image_obj=grading_images if grading_images else None)
|
| 573 |
print("π§Ύ Grading output received. Saving debug file: debug_grading.md")
|
| 574 |
with open("debug_grading.md", "w", encoding="utf-8") as f:
|
| 575 |
f.write(grading_text)
|
| 576 |
|
|
|
|
| 577 |
base_name = os.path.splitext(os.path.basename(ans_path))[0]
|
| 578 |
grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
|
| 579 |
print("π Grading PDF saved:", grading_pdf_path)
|
| 580 |
|
|
|
|
| 581 |
grading_json = extract_marks_from_grading(grading_text)
|
| 582 |
with open("debug_grading_json.json", "w", encoding="utf-8") as f:
|
| 583 |
json.dump(grading_json, f, indent=2, ensure_ascii=False)
|
|
|
|
| 595 |
|
| 596 |
except Exception as e:
|
| 597 |
print("β Pipeline error:", e)
|
| 598 |
+
import traceback
|
| 599 |
+
traceback.print_exc()
|
| 600 |
return f"β Error: {e}", None, None, None, None
|
| 601 |
|
| 602 |
# ---------------- GRADIO UI ----------------
|
| 603 |
+
with gr.Blocks(title="AI Grading (Final Flow)") as demo:
|
| 604 |
+
gr.Markdown("## π AI Grading β Final Flow")
|
| 605 |
|
| 606 |
with gr.Row():
|
| 607 |
qp_file = gr.File(label="π Upload Question Paper (PDF)")
|