Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -21,44 +21,54 @@ GRID_ROWS, GRID_COLS = 20, 14
|
|
| 21 |
|
| 22 |
# ---------------- PROMPTS ----------------
|
| 23 |
PROMPTS = {
|
|
|
|
| 24 |
"QP_MS_TRANSCRIPTION": {
|
| 25 |
"role": "system",
|
| 26 |
"content": """You are a high-quality OCR/Transcription assistant.
|
| 27 |
|
| 28 |
-
INPUT: This file is a PDF that first contains the Question Paper and
|
| 29 |
TASK: Produce an exact transcription in plain text with clear separators.
|
| 30 |
|
| 31 |
-
|
| 32 |
-
question
|
| 33 |
-
|
| 34 |
-
|
| 35 |
-
|
| 36 |
-
mark scheme ( exact for each question)
|
| 37 |
|
| 38 |
-
|
| 39 |
-
|
| 40 |
-
|
|
|
|
| 41 |
|
| 42 |
-
|
| 43 |
-
( ignore THESE N1 , N2 , N3 )
|
| 44 |
|
| 45 |
-
|
| 46 |
-
|
| 47 |
-
|
|
|
|
|
|
|
| 48 |
|
| 49 |
-
|
| 50 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 51 |
QP:
|
| 52 |
-
<
|
|
|
|
| 53 |
|
| 54 |
-
|
| 55 |
-
|
| 56 |
|
| 57 |
-
|
| 58 |
-
|
|
|
|
| 59 |
"""
|
| 60 |
},
|
| 61 |
-
|
|
|
|
| 62 |
"GRADING_PROMPT": {
|
| 63 |
"role": "system",
|
| 64 |
"content": """Developer: You are an official examiner. Apply the following grading rules precisely.
|
|
@@ -101,7 +111,7 @@ Then show total clearly as a final line:
|
|
| 101 |
`Total: <obtained_marks>/<max_marks>`
|
| 102 |
|
| 103 |
NOTES:
|
| 104 |
-
- The assistant will receive two transcripts (QP+MS transcription
|
| 105 |
- Match student answers to question IDs and grade according to the provided verbatim markscheme.
|
| 106 |
- Produce full markdown as above. Ensure mark IDs used in the grading are present and consistent with the markscheme.
|
| 107 |
"""
|
|
@@ -126,8 +136,10 @@ def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
|
| 126 |
return input_path
|
| 127 |
|
| 128 |
if size <= max_size:
|
|
|
|
| 129 |
return input_path
|
| 130 |
|
|
|
|
| 131 |
try:
|
| 132 |
gs_cmd = [
|
| 133 |
"gs", "-sDEVICE=pdfwrite",
|
|
@@ -138,18 +150,35 @@ def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
|
| 138 |
]
|
| 139 |
subprocess.run(gs_cmd, check=True)
|
| 140 |
new_size = os.path.getsize(output_path)
|
|
|
|
| 141 |
if new_size <= max_size:
|
| 142 |
return output_path
|
| 143 |
else:
|
|
|
|
| 144 |
return input_path
|
| 145 |
-
except Exception:
|
|
|
|
| 146 |
return input_path
|
| 147 |
|
| 148 |
def create_model():
|
|
|
|
|
|
|
|
|
|
| 149 |
try:
|
| 150 |
-
|
| 151 |
-
|
| 152 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 153 |
|
| 154 |
def merge_pdfs(paths, output_path):
|
| 155 |
writer = PdfWriter()
|
|
@@ -164,58 +193,60 @@ def merge_pdfs(paths, output_path):
|
|
| 164 |
def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=None):
|
| 165 |
"""
|
| 166 |
Send prompt_text and optionally an uploaded file (or an image object) to the model.
|
| 167 |
-
Returns textual response.
|
| 168 |
"""
|
| 169 |
inputs = [prompt_text]
|
| 170 |
if file_upload_obj:
|
| 171 |
inputs.append(file_upload_obj)
|
| 172 |
if image_obj:
|
| 173 |
inputs.append(image_obj)
|
|
|
|
| 174 |
response = model.generate_content(inputs)
|
| 175 |
raw_text = getattr(response, "text", None)
|
| 176 |
if not raw_text and getattr(response, "candidates", None):
|
| 177 |
raw_text = response.candidates[0].content.parts[0].text
|
| 178 |
-
if
|
| 179 |
raw_text = str(response)
|
|
|
|
| 180 |
return raw_text
|
| 181 |
|
| 182 |
# ---------------- PARSERS ----------------
|
| 183 |
def extract_question_ids_from_qpms(text):
|
| 184 |
"""
|
| 185 |
Extract question IDs from QP+MS transcript output.
|
| 186 |
-
We expect
|
| 187 |
Return a list of unique IDs in order of appearance.
|
| 188 |
"""
|
|
|
|
| 189 |
ids = []
|
| 190 |
-
|
| 191 |
-
for m in re.finditer(r"(?im)^\s*Question\s+([0-9]+(?:[.\-a-zA-Z()]+(?:\.[a-zA-Z0-9()]+)*)?)\b", text):
|
| 192 |
qid = m.group(1).strip()
|
| 193 |
if qid not in ids:
|
| 194 |
ids.append(qid)
|
| 195 |
-
|
| 196 |
-
|
| 197 |
-
|
| 198 |
-
|
| 199 |
-
|
| 200 |
-
|
| 201 |
-
|
| 202 |
-
|
| 203 |
-
|
| 204 |
-
|
| 205 |
-
|
| 206 |
-
|
| 207 |
-
|
|
|
|
|
|
|
| 208 |
return ids
|
| 209 |
|
| 210 |
def build_as_prompt_with_expected_ids(expected_ids):
|
| 211 |
"""
|
| 212 |
-
Construct the AS transcription prompt injecting the expected IDs block
|
| 213 |
-
The expected_ids is a list; we format them per user instruction inside braces.
|
| 214 |
"""
|
| 215 |
if not expected_ids:
|
| 216 |
ids_block = "{NA}"
|
| 217 |
else:
|
| 218 |
-
# Format exactly as user provided: curly brace block with each ID on its own line
|
| 219 |
ids_block = "{\n" + "\n".join(expected_ids) + "\n}"
|
| 220 |
prompt = f"""You are a high-quality handwritten transcription assistant.
|
| 221 |
|
|
@@ -247,15 +278,15 @@ def extract_marks_from_grading(grading_text):
|
|
| 247 |
Parse the grading markdown produced by the GRADING_PROMPT and extract marks per question.
|
| 248 |
Returns dict: {"grading": [{"question": "1.a", "marks_awarded": ["M1","A1"]}, ...]}
|
| 249 |
"""
|
|
|
|
| 250 |
grading_json = {"grading": []}
|
| 251 |
|
| 252 |
-
# Split by question sections using "## Question" header
|
| 253 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 254 |
for block in question_blocks[1:]:
|
| 255 |
-
first_line = block.strip().splitlines()[0].strip()
|
| 256 |
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
|
| 257 |
if not q_id_match:
|
| 258 |
-
q_id = first_line.split()[0]
|
| 259 |
else:
|
| 260 |
q_id = q_id_match.group(1).strip()
|
| 261 |
awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
|
|
@@ -269,6 +300,8 @@ def extract_marks_from_grading(grading_text):
|
|
| 269 |
"question": q_id,
|
| 270 |
"marks_awarded": awarded_unique
|
| 271 |
})
|
|
|
|
|
|
|
| 272 |
return grading_json
|
| 273 |
|
| 274 |
# ---------------- MAPPING/IMPRINT HELPERS ----------------
|
|
@@ -292,25 +325,32 @@ Return JSON only, like:
|
|
| 292 |
Grading JSON:
|
| 293 |
{json.dumps(grading_json, indent=2)}
|
| 294 |
"""
|
|
|
|
| 295 |
img = Image.open(image_path)
|
| 296 |
response = model.generate_content([prompt, img])
|
| 297 |
raw_text = getattr(response, "text", None)
|
| 298 |
if not raw_text and getattr(response, "candidates", None):
|
| 299 |
raw_text = response.candidates[0].content.parts[0].text
|
|
|
|
|
|
|
|
|
|
| 300 |
try:
|
| 301 |
start = raw_text.index('[')
|
| 302 |
end = raw_text.rindex(']') + 1
|
| 303 |
json_part = raw_text[start:end]
|
| 304 |
mapping = json.loads(json_part)
|
|
|
|
| 305 |
return mapping
|
| 306 |
except Exception:
|
| 307 |
match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
|
| 308 |
if match:
|
| 309 |
try:
|
| 310 |
mapping = json.loads(match.group(1))
|
|
|
|
| 311 |
return mapping
|
| 312 |
except Exception:
|
| 313 |
pass
|
|
|
|
| 314 |
return []
|
| 315 |
|
| 316 |
def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=GRID_ROWS, cols=GRID_COLS):
|
|
@@ -318,11 +358,11 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 318 |
Convert PDF to images, create grid-numbered images for sending to Gemini,
|
| 319 |
send all page images in parallel to Gemini for mapping, then annotate and produce imprinted PDF.
|
| 320 |
"""
|
|
|
|
| 321 |
pages = convert_from_path(pdf_path, dpi=200)
|
| 322 |
annotated_page_paths = []
|
| 323 |
temp_grid_images = []
|
| 324 |
|
| 325 |
-
# Create grid-numbered images for mapping prompt
|
| 326 |
for p_index, page in enumerate(pages):
|
| 327 |
img = page.convert("RGB")
|
| 328 |
w, h = img.size
|
|
@@ -349,10 +389,12 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 349 |
temp_path = f"page_{p_index+1}_grid.png"
|
| 350 |
img.save(temp_path, "PNG")
|
| 351 |
temp_grid_images.append(temp_path)
|
|
|
|
| 352 |
|
| 353 |
# Send all grid images in parallel to Gemini to get mappings
|
|
|
|
| 354 |
mappings_per_page = {}
|
| 355 |
-
model_local = model
|
| 356 |
with ThreadPoolExecutor(max_workers=min(8, len(temp_grid_images))) as ex:
|
| 357 |
futures = {ex.submit(ask_gemini_for_mapping_for_page, model_local, img_path, grading_json, rows, cols): idx
|
| 358 |
for idx, img_path in enumerate(temp_grid_images)}
|
|
@@ -360,11 +402,13 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 360 |
idx = futures[fut]
|
| 361 |
try:
|
| 362 |
mapping = fut.result()
|
| 363 |
-
except Exception:
|
|
|
|
| 364 |
mapping = []
|
| 365 |
mappings_per_page[idx] = mapping
|
| 366 |
|
| 367 |
# Annotate original pages according to returned mappings
|
|
|
|
| 368 |
for p_index, page in enumerate(pages):
|
| 369 |
page_img = page.convert("RGB")
|
| 370 |
img_cv = np.array(page_img)
|
|
@@ -419,29 +463,23 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 419 |
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 420 |
cv2.imwrite(annotated_path, img_cv)
|
| 421 |
annotated_page_paths.append(annotated_path)
|
|
|
|
| 422 |
|
| 423 |
with open(output_pdf, "wb") as f:
|
| 424 |
f.write(img2pdf.convert(annotated_page_paths))
|
| 425 |
|
| 426 |
compressed = compress_pdf(output_pdf)
|
|
|
|
| 427 |
return compressed
|
| 428 |
|
| 429 |
# ---------------- MAIN PIPELINE ----------------
|
| 430 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
| 431 |
"""
|
| 432 |
-
|
| 433 |
-
1) compress files if needed
|
| 434 |
-
2) merge QP + MS -> merged_qpms.pdf
|
| 435 |
-
3) upload merged_qpms to Gemini, request transcription (QP+MS)
|
| 436 |
-
4) extract question IDs via regex from QP+MS result
|
| 437 |
-
5) build AS transcription prompt injecting expected IDs block
|
| 438 |
-
6) send AS transcription request (using injected expected IDs)
|
| 439 |
-
7) send both transcripts to grading prompt -> get grading markdown
|
| 440 |
-
8) extract marks for imprinting
|
| 441 |
-
9) optional imprint: convert pages, send page images in parallel to LLM for mapping, annotate and produce imprinted PDF
|
| 442 |
"""
|
| 443 |
try:
|
| 444 |
-
|
|
|
|
| 445 |
qp_path = compress_pdf(qp_path)
|
| 446 |
ms_path = compress_pdf(ms_path)
|
| 447 |
ans_path = compress_pdf(ans_path)
|
|
@@ -449,36 +487,40 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 449 |
# Merge QP + MS
|
| 450 |
merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
|
| 451 |
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
|
|
|
| 452 |
|
| 453 |
-
# Upload files
|
|
|
|
| 454 |
merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
|
| 455 |
ans_uploaded = genai.upload_file(path=ans_path, display_name="Answer Sheet")
|
|
|
|
| 456 |
|
|
|
|
| 457 |
model = create_model()
|
| 458 |
|
| 459 |
-
# Step 1: QP+MS transcription (first)
|
|
|
|
| 460 |
qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
|
| 461 |
qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
|
| 462 |
-
|
| 463 |
with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f:
|
| 464 |
f.write(qpms_text)
|
| 465 |
|
| 466 |
# Step 2: extract serial numbers (question IDs) using regex from qpms_text
|
| 467 |
extracted_ids = extract_question_ids_from_qpms(qpms_text)
|
| 468 |
-
# if empty, we still provide a default list placeholder so AS model writes NA for missing ones
|
| 469 |
if not extracted_ids:
|
| 470 |
extracted_ids = ["NA"]
|
| 471 |
|
| 472 |
-
# Step
|
|
|
|
| 473 |
as_prompt = build_as_prompt_with_expected_ids(extracted_ids)
|
| 474 |
-
|
| 475 |
-
# Step 4: AS transcription (after injecting IDs)
|
| 476 |
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
|
|
|
|
| 477 |
with open("debug_as_transcript.txt", "w", encoding="utf-8") as f:
|
| 478 |
f.write(as_text)
|
| 479 |
|
| 480 |
-
# Step
|
| 481 |
-
|
| 482 |
grading_input = (
|
| 483 |
"=== QP+MS TRANSCRIPT BEGIN ===\n"
|
| 484 |
+ qpms_text
|
|
@@ -488,39 +530,39 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 488 |
+ "\n=== ANSWER SHEET TRANSCRIPT END ===\n"
|
| 489 |
)
|
| 490 |
grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
|
| 491 |
-
grading_text = gemini_generate_content(model, grading_prompt_system
|
| 492 |
-
|
| 493 |
-
|
| 494 |
-
|
| 495 |
-
grading_text = getattr(response, "text", None)
|
| 496 |
-
if not grading_text and getattr(response, "candidates", None):
|
| 497 |
-
grading_text = response.candidates[0].content.parts[0].text
|
| 498 |
-
if not grading_text:
|
| 499 |
-
raise RuntimeError("No grading output returned from Gemini.")
|
| 500 |
|
| 501 |
# Save grading PDF
|
| 502 |
base_name = os.path.splitext(os.path.basename(ans_path))[0]
|
| 503 |
grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
|
|
|
|
| 504 |
|
| 505 |
-
# Step
|
| 506 |
grading_json = extract_marks_from_grading(grading_text)
|
| 507 |
with open("debug_grading_json.json", "w", encoding="utf-8") as f:
|
| 508 |
json.dump(grading_json, f, indent=2, ensure_ascii=False)
|
|
|
|
| 509 |
|
| 510 |
imprinted_pdf_path = None
|
| 511 |
if imprint:
|
| 512 |
-
|
| 513 |
imprinted_pdf_path = f"{base_name}_imprinted.pdf"
|
| 514 |
imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, model)
|
|
|
|
| 515 |
|
|
|
|
| 516 |
return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 517 |
|
| 518 |
except Exception as e:
|
|
|
|
| 519 |
return f"β Error: {e}", None, None, None, None
|
| 520 |
|
| 521 |
# ---------------- GRADIO UI ----------------
|
| 522 |
-
with gr.Blocks(title="LeadIB AI Grading (
|
| 523 |
-
gr.Markdown("## π LeadIB AI Grading β Final Flow\nUpload **Question Paper**, **Markscheme**, and **Student Answer Sheet**.\nFlow: merge QP+MS -> transcribe
|
| 524 |
|
| 525 |
with gr.Row():
|
| 526 |
qp_file = gr.File(label="π Upload Question Paper (PDF)")
|
|
@@ -534,26 +576,4 @@ with gr.Blocks(title="LeadIB AI Grading (Updated Flow: QP+MS -> IDs -> AS -> Gra
|
|
| 534 |
qpms_box = gr.Textbox(label="π QP+MS Transcript", lines=12)
|
| 535 |
as_box = gr.Textbox(label="π AS Transcript", lines=12)
|
| 536 |
|
| 537 |
-
grading_output_box = gr.Textbox(label="π§Ύ Grading (
|
| 538 |
-
grading_pdf_file = gr.File(label="π₯ Download Grading PDF")
|
| 539 |
-
imprint_pdf_file = gr.File(label="π₯ Download Imprinted PDF (Optional)")
|
| 540 |
-
|
| 541 |
-
def run_pipeline(qp_file_obj, ms_file_obj, ans_file_obj, imprint_flag):
|
| 542 |
-
qp_path = qp_file_obj.name
|
| 543 |
-
ms_path = ms_file_obj.name
|
| 544 |
-
ans_path = ans_file_obj.name
|
| 545 |
-
|
| 546 |
-
qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path = align_and_grade_pipeline(
|
| 547 |
-
qp_path, ms_path, ans_path, imprint=imprint_flag
|
| 548 |
-
)
|
| 549 |
-
|
| 550 |
-
return qpms_text or "", as_text or "", grading_text or "", grading_pdf_path, imprinted_pdf_path
|
| 551 |
-
|
| 552 |
-
run_button.click(
|
| 553 |
-
fn=run_pipeline,
|
| 554 |
-
inputs=[qp_file, ms_file, ans_file, imprint_toggle],
|
| 555 |
-
outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file]
|
| 556 |
-
)
|
| 557 |
-
|
| 558 |
-
if __name__ == "__main__":
|
| 559 |
-
demo.launch()
|
|
|
|
| 21 |
|
| 22 |
# ---------------- PROMPTS ----------------
|
| 23 |
PROMPTS = {
|
| 24 |
+
# Updated QP+MS transcription prompt:
|
| 25 |
"QP_MS_TRANSCRIPTION": {
|
| 26 |
"role": "system",
|
| 27 |
"content": """You are a high-quality OCR/Transcription assistant.
|
| 28 |
|
| 29 |
+
INPUT: This file is a scanned/printed PDF that first contains the Question Paper and then, after all questions, the Markscheme.
|
| 30 |
TASK: Produce an exact transcription in plain text with clear separators.
|
| 31 |
|
| 32 |
+
IMPORTANT: Output **ALL QUESTIONS FIRST** (in the same order they appear in the PDF).
|
| 33 |
+
For each question, output:
|
| 34 |
+
- Question ID (exact as printed, e.g., "1", "2(a)", "3.b", "4(ii)")
|
| 35 |
+
- Question text (exact wording; do not change punctuation)
|
| 36 |
+
- Total marks for that question (exact number if printed; if not printed leave blank)
|
|
|
|
| 37 |
|
| 38 |
+
After you have outputted **all questions** (and their total marks), output the **entire markscheme block** exactly as it appears in the PDF. In the markscheme section, ensure notation is explicit and clear: represent M, A, R notation **in brackets** after each mark item where applicable. For example:
|
| 39 |
+
[M1] Description...
|
| 40 |
+
[A1] Description...
|
| 41 |
+
[R1] Description...
|
| 42 |
|
| 43 |
+
Also include at the top a single line stating the total marks of the paper (if present in the paper).
|
|
|
|
| 44 |
|
| 45 |
+
KEY REQUIREMENTS:
|
| 46 |
+
- Do NOT interleave question and markscheme. First: questions + totals. Second: markscheme (verbatim, preserving mark IDs/formatting).
|
| 47 |
+
- Transcribe the markscheme verbatim; do NOT correct or reformat content (only ensure M/A/R are shown in brackets if present).
|
| 48 |
+
- Represent M, A, R marks explicitly and consistently (e.g., M1, A2, R1). If mark IDs are missing, transcribe as-is.
|
| 49 |
+
- Ignore any N1, N2, N3 notations (do not use them).
|
| 50 |
|
| 51 |
+
OUTPUT FORMAT (use these exact markers to make parsing straightforward):
|
| 52 |
+
==== PAPER TOTAL MARKS ====
|
| 53 |
+
<integer or blank>
|
| 54 |
+
|
| 55 |
+
==== QUESTIONS BEGIN ====
|
| 56 |
+
Question: <id>
|
| 57 |
+
Total Marks: <integer or blank>
|
| 58 |
QP:
|
| 59 |
+
<question text (multiline)>
|
| 60 |
+
--QUESTION-END--
|
| 61 |
|
| 62 |
+
(repeat the Question block for all questions, in order)
|
| 63 |
+
==== QUESTIONS END ====
|
| 64 |
|
| 65 |
+
==== MARKSCHEME BEGIN ====
|
| 66 |
+
<verbatim markscheme text exactly as in PDF; include mark IDs and use brackets for M/A/R notations where they appear>
|
| 67 |
+
==== MARKSCHEME END ====
|
| 68 |
"""
|
| 69 |
},
|
| 70 |
+
|
| 71 |
+
# GRADING_PROMPT unchanged except we will print steps around calling it
|
| 72 |
"GRADING_PROMPT": {
|
| 73 |
"role": "system",
|
| 74 |
"content": """Developer: You are an official examiner. Apply the following grading rules precisely.
|
|
|
|
| 111 |
`Total: <obtained_marks>/<max_marks>`
|
| 112 |
|
| 113 |
NOTES:
|
| 114 |
+
- The assistant will receive two transcripts: (1) QP+MS transcription (questions then markscheme) and (2) AS transcription (student answers). Use the QP+MS transcript as the authoritative source of question wording, total marks, and verbatim markscheme entries (M/A/R mark IDs).
|
| 115 |
- Match student answers to question IDs and grade according to the provided verbatim markscheme.
|
| 116 |
- Produce full markdown as above. Ensure mark IDs used in the grading are present and consistent with the markscheme.
|
| 117 |
"""
|
|
|
|
| 136 |
return input_path
|
| 137 |
|
| 138 |
if size <= max_size:
|
| 139 |
+
print(f"βΉοΈ Not compressing {input_path} ({size/1024/1024:.2f} MB <= {max_size/1024/1024} MB)")
|
| 140 |
return input_path
|
| 141 |
|
| 142 |
+
print(f"π Compressing {input_path} ({size/1024/1024:.2f} MB) -> {output_path}")
|
| 143 |
try:
|
| 144 |
gs_cmd = [
|
| 145 |
"gs", "-sDEVICE=pdfwrite",
|
|
|
|
| 150 |
]
|
| 151 |
subprocess.run(gs_cmd, check=True)
|
| 152 |
new_size = os.path.getsize(output_path)
|
| 153 |
+
print(f"β
Compression done. New size: {new_size/1024/1024:.2f} MB")
|
| 154 |
if new_size <= max_size:
|
| 155 |
return output_path
|
| 156 |
else:
|
| 157 |
+
print("β οΈ Compressed file still larger than threshold; returning original")
|
| 158 |
return input_path
|
| 159 |
+
except Exception as e:
|
| 160 |
+
print("β Compression error:", e)
|
| 161 |
return input_path
|
| 162 |
|
| 163 |
def create_model():
|
| 164 |
+
"""
|
| 165 |
+
Create the Gemini model and print which model is selected.
|
| 166 |
+
"""
|
| 167 |
try:
|
| 168 |
+
print("β‘ Attempting to use gemini-2.5-pro model")
|
| 169 |
+
model = genai.GenerativeModel("gemini-2.5-pro", generation_config={"temperature": 0})
|
| 170 |
+
print("β
Selected model: gemini-2.5-pro")
|
| 171 |
+
return model
|
| 172 |
+
except Exception as e:
|
| 173 |
+
print("β οΈ Could not use gemini-2.5-pro:", e)
|
| 174 |
+
try:
|
| 175 |
+
print("β‘ Falling back to gemini-2.5-flash model")
|
| 176 |
+
model = genai.GenerativeModel("gemini-2.5-flash", generation_config={"temperature": 0})
|
| 177 |
+
print("β
Selected model: gemini-2.5-flash")
|
| 178 |
+
return model
|
| 179 |
+
except Exception as e:
|
| 180 |
+
print("β Failed to create any Gemini model:", e)
|
| 181 |
+
raise
|
| 182 |
|
| 183 |
def merge_pdfs(paths, output_path):
|
| 184 |
writer = PdfWriter()
|
|
|
|
| 193 |
def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=None):
|
| 194 |
"""
|
| 195 |
Send prompt_text and optionally an uploaded file (or an image object) to the model.
|
| 196 |
+
Returns textual response and prints progress.
|
| 197 |
"""
|
| 198 |
inputs = [prompt_text]
|
| 199 |
if file_upload_obj:
|
| 200 |
inputs.append(file_upload_obj)
|
| 201 |
if image_obj:
|
| 202 |
inputs.append(image_obj)
|
| 203 |
+
print("π‘ Sending request to Gemini (prompt length:", len(prompt_text), "chars )")
|
| 204 |
response = model.generate_content(inputs)
|
| 205 |
raw_text = getattr(response, "text", None)
|
| 206 |
if not raw_text and getattr(response, "candidates", None):
|
| 207 |
raw_text = response.candidates[0].content.parts[0].text
|
| 208 |
+
if raw_text is None:
|
| 209 |
raw_text = str(response)
|
| 210 |
+
print("π₯ Received response (chars):", len(raw_text))
|
| 211 |
return raw_text
|
| 212 |
|
| 213 |
# ---------------- PARSERS ----------------
|
| 214 |
def extract_question_ids_from_qpms(text):
|
| 215 |
"""
|
| 216 |
Extract question IDs from QP+MS transcript output.
|
| 217 |
+
We expect the QP+MS prompt to produce lines like 'Question: <id>'
|
| 218 |
Return a list of unique IDs in order of appearance.
|
| 219 |
"""
|
| 220 |
+
print("π Extracting question IDs from QP+MS transcript using regex...")
|
| 221 |
ids = []
|
| 222 |
+
for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\b", text):
|
|
|
|
| 223 |
qid = m.group(1).strip()
|
| 224 |
if qid not in ids:
|
| 225 |
ids.append(qid)
|
| 226 |
+
if ids:
|
| 227 |
+
print(f"β
Extracted {len(ids)} question IDs.")
|
| 228 |
+
print("IDs:", ids)
|
| 229 |
+
return ids
|
| 230 |
+
|
| 231 |
+
# fallback scans
|
| 232 |
+
for m in re.finditer(r"(?m)^\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\s*[\.\):\-]\s", text):
|
| 233 |
+
qid = m.group(1).strip()
|
| 234 |
+
if qid not in ids:
|
| 235 |
+
ids.append(qid)
|
| 236 |
+
if ids:
|
| 237 |
+
print(f"β
Extracted {len(ids)} question IDs (fallback heuristic).")
|
| 238 |
+
print("IDs:", ids)
|
| 239 |
+
else:
|
| 240 |
+
print("β οΈ No question IDs extracted; will send NA placeholder.")
|
| 241 |
return ids
|
| 242 |
|
| 243 |
def build_as_prompt_with_expected_ids(expected_ids):
|
| 244 |
"""
|
| 245 |
+
Construct the AS transcription prompt injecting the expected IDs block.
|
|
|
|
| 246 |
"""
|
| 247 |
if not expected_ids:
|
| 248 |
ids_block = "{NA}"
|
| 249 |
else:
|
|
|
|
| 250 |
ids_block = "{\n" + "\n".join(expected_ids) + "\n}"
|
| 251 |
prompt = f"""You are a high-quality handwritten transcription assistant.
|
| 252 |
|
|
|
|
| 278 |
Parse the grading markdown produced by the GRADING_PROMPT and extract marks per question.
|
| 279 |
Returns dict: {"grading": [{"question": "1.a", "marks_awarded": ["M1","A1"]}, ...]}
|
| 280 |
"""
|
| 281 |
+
print("π Extracting awarded marks from grading output...")
|
| 282 |
grading_json = {"grading": []}
|
| 283 |
|
|
|
|
| 284 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 285 |
for block in question_blocks[1:]:
|
| 286 |
+
first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
|
| 287 |
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
|
| 288 |
if not q_id_match:
|
| 289 |
+
q_id = first_line.split()[0] if first_line else ""
|
| 290 |
else:
|
| 291 |
q_id = q_id_match.group(1).strip()
|
| 292 |
awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
|
|
|
|
| 300 |
"question": q_id,
|
| 301 |
"marks_awarded": awarded_unique
|
| 302 |
})
|
| 303 |
+
print("β
Extracted grading marks for", len(grading_json["grading"]), "question blocks.")
|
| 304 |
+
print(json.dumps(grading_json, indent=2))
|
| 305 |
return grading_json
|
| 306 |
|
| 307 |
# ---------------- MAPPING/IMPRINT HELPERS ----------------
|
|
|
|
| 325 |
Grading JSON:
|
| 326 |
{json.dumps(grading_json, indent=2)}
|
| 327 |
"""
|
| 328 |
+
print(f"π‘ Sending mapping request for image {image_path} to Gemini...")
|
| 329 |
img = Image.open(image_path)
|
| 330 |
response = model.generate_content([prompt, img])
|
| 331 |
raw_text = getattr(response, "text", None)
|
| 332 |
if not raw_text and getattr(response, "candidates", None):
|
| 333 |
raw_text = response.candidates[0].content.parts[0].text
|
| 334 |
+
if not raw_text:
|
| 335 |
+
raw_text = str(response)
|
| 336 |
+
print("π₯ Mapping response (chars):", len(raw_text))
|
| 337 |
try:
|
| 338 |
start = raw_text.index('[')
|
| 339 |
end = raw_text.rindex(']') + 1
|
| 340 |
json_part = raw_text[start:end]
|
| 341 |
mapping = json.loads(json_part)
|
| 342 |
+
print("β
Parsed mapping JSON for", image_path, "| entries:", len(mapping))
|
| 343 |
return mapping
|
| 344 |
except Exception:
|
| 345 |
match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
|
| 346 |
if match:
|
| 347 |
try:
|
| 348 |
mapping = json.loads(match.group(1))
|
| 349 |
+
print("β
Parsed mapping JSON (alt) for", image_path, "| entries:", len(mapping))
|
| 350 |
return mapping
|
| 351 |
except Exception:
|
| 352 |
pass
|
| 353 |
+
print("β οΈ Failed to parse mapping JSON for", image_path)
|
| 354 |
return []
|
| 355 |
|
| 356 |
def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=GRID_ROWS, cols=GRID_COLS):
|
|
|
|
| 358 |
Convert PDF to images, create grid-numbered images for sending to Gemini,
|
| 359 |
send all page images in parallel to Gemini for mapping, then annotate and produce imprinted PDF.
|
| 360 |
"""
|
| 361 |
+
print("π Converting answer PDF to images for imprinting...")
|
| 362 |
pages = convert_from_path(pdf_path, dpi=200)
|
| 363 |
annotated_page_paths = []
|
| 364 |
temp_grid_images = []
|
| 365 |
|
|
|
|
| 366 |
for p_index, page in enumerate(pages):
|
| 367 |
img = page.convert("RGB")
|
| 368 |
w, h = img.size
|
|
|
|
| 389 |
temp_path = f"page_{p_index+1}_grid.png"
|
| 390 |
img.save(temp_path, "PNG")
|
| 391 |
temp_grid_images.append(temp_path)
|
| 392 |
+
print("π° Created grid image:", temp_path)
|
| 393 |
|
| 394 |
# Send all grid images in parallel to Gemini to get mappings
|
| 395 |
+
print("π‘ Sending all page images to Gemini in parallel for mapping...")
|
| 396 |
mappings_per_page = {}
|
| 397 |
+
model_local = model
|
| 398 |
with ThreadPoolExecutor(max_workers=min(8, len(temp_grid_images))) as ex:
|
| 399 |
futures = {ex.submit(ask_gemini_for_mapping_for_page, model_local, img_path, grading_json, rows, cols): idx
|
| 400 |
for idx, img_path in enumerate(temp_grid_images)}
|
|
|
|
| 402 |
idx = futures[fut]
|
| 403 |
try:
|
| 404 |
mapping = fut.result()
|
| 405 |
+
except Exception as e:
|
| 406 |
+
print("β οΈ Mapping request failed for page", idx, e)
|
| 407 |
mapping = []
|
| 408 |
mappings_per_page[idx] = mapping
|
| 409 |
|
| 410 |
# Annotate original pages according to returned mappings
|
| 411 |
+
print("π Annotating pages with marks...")
|
| 412 |
for p_index, page in enumerate(pages):
|
| 413 |
page_img = page.convert("RGB")
|
| 414 |
img_cv = np.array(page_img)
|
|
|
|
| 463 |
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 464 |
cv2.imwrite(annotated_path, img_cv)
|
| 465 |
annotated_page_paths.append(annotated_path)
|
| 466 |
+
print("β
Annotated page saved:", annotated_path)
|
| 467 |
|
| 468 |
with open(output_pdf, "wb") as f:
|
| 469 |
f.write(img2pdf.convert(annotated_page_paths))
|
| 470 |
|
| 471 |
compressed = compress_pdf(output_pdf)
|
| 472 |
+
print("π Imprinted PDF saved to:", compressed)
|
| 473 |
return compressed
|
| 474 |
|
| 475 |
# ---------------- MAIN PIPELINE ----------------
|
| 476 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
| 477 |
"""
|
| 478 |
+
Final pipeline implementing requested flow and verbose console logging.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 479 |
"""
|
| 480 |
try:
|
| 481 |
+
print("π Starting pipeline...")
|
| 482 |
+
# Step 0: compress as needed
|
| 483 |
qp_path = compress_pdf(qp_path)
|
| 484 |
ms_path = compress_pdf(ms_path)
|
| 485 |
ans_path = compress_pdf(ans_path)
|
|
|
|
| 487 |
# Merge QP + MS
|
| 488 |
merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
|
| 489 |
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
| 490 |
+
print("π Merged QP + MS ->", merged_qpms_path)
|
| 491 |
|
| 492 |
+
# Upload files to Gemini
|
| 493 |
+
print("πΌ Uploading files to Gemini...")
|
| 494 |
merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
|
| 495 |
ans_uploaded = genai.upload_file(path=ans_path, display_name="Answer Sheet")
|
| 496 |
+
print("β
Upload complete.")
|
| 497 |
|
| 498 |
+
# Create model and print which selected
|
| 499 |
model = create_model()
|
| 500 |
|
| 501 |
+
# Step 1.i: QP+MS transcription (first)
|
| 502 |
+
print("1.i) Transcribing QP+MS (questions first, then full markscheme)...")
|
| 503 |
qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
|
| 504 |
qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
|
| 505 |
+
print("π QP+MS transcription received. Saving debug file: debug_qpms_transcript.txt")
|
| 506 |
with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f:
|
| 507 |
f.write(qpms_text)
|
| 508 |
|
| 509 |
# Step 2: extract serial numbers (question IDs) using regex from qpms_text
|
| 510 |
extracted_ids = extract_question_ids_from_qpms(qpms_text)
|
|
|
|
| 511 |
if not extracted_ids:
|
| 512 |
extracted_ids = ["NA"]
|
| 513 |
|
| 514 |
+
# Step 1.ii: Build AS prompt injecting extracted IDs and transcribe AS
|
| 515 |
+
print("1.ii) Building AS transcription prompt with expected question IDs and sending to Gemini...")
|
| 516 |
as_prompt = build_as_prompt_with_expected_ids(extracted_ids)
|
|
|
|
|
|
|
| 517 |
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
|
| 518 |
+
print("π AS transcription received. Saving debug file: debug_as_transcript.txt")
|
| 519 |
with open("debug_as_transcript.txt", "w", encoding="utf-8") as f:
|
| 520 |
f.write(as_text)
|
| 521 |
|
| 522 |
+
# Step 3: Grading - send both transcripts to grading model
|
| 523 |
+
print("2) Preparing grading input and sending to Gemini for grading...")
|
| 524 |
grading_input = (
|
| 525 |
"=== QP+MS TRANSCRIPT BEGIN ===\n"
|
| 526 |
+ qpms_text
|
|
|
|
| 530 |
+ "\n=== ANSWER SHEET TRANSCRIPT END ===\n"
|
| 531 |
)
|
| 532 |
grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
|
| 533 |
+
grading_text = gemini_generate_content(model, grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input)
|
| 534 |
+
print("π§Ύ Grading output received. Saving debug file: debug_grading.md")
|
| 535 |
+
with open("debug_grading.md", "w", encoding="utf-8") as f:
|
| 536 |
+
f.write(grading_text)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 537 |
|
| 538 |
# Save grading PDF
|
| 539 |
base_name = os.path.splitext(os.path.basename(ans_path))[0]
|
| 540 |
grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
|
| 541 |
+
print("π Grading PDF saved:", grading_pdf_path)
|
| 542 |
|
| 543 |
+
# Step 4: Extract marks for imprinting
|
| 544 |
grading_json = extract_marks_from_grading(grading_text)
|
| 545 |
with open("debug_grading_json.json", "w", encoding="utf-8") as f:
|
| 546 |
json.dump(grading_json, f, indent=2, ensure_ascii=False)
|
| 547 |
+
print("π§ Grading marks extraction complete.")
|
| 548 |
|
| 549 |
imprinted_pdf_path = None
|
| 550 |
if imprint:
|
| 551 |
+
print("β Imprint option enabled. Starting imprinting process (parallel mapping requests)...")
|
| 552 |
imprinted_pdf_path = f"{base_name}_imprinted.pdf"
|
| 553 |
imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, model)
|
| 554 |
+
print("β
Imprinting finished. Imprinted PDF at:", imprinted_pdf_path)
|
| 555 |
|
| 556 |
+
print("π Pipeline finished successfully.")
|
| 557 |
return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 558 |
|
| 559 |
except Exception as e:
|
| 560 |
+
print("β Pipeline error:", e)
|
| 561 |
return f"β Error: {e}", None, None, None, None
|
| 562 |
|
| 563 |
# ---------------- GRADIO UI ----------------
|
| 564 |
+
with gr.Blocks(title="LeadIB AI Grading (Final Flow β Verbose)") as demo:
|
| 565 |
+
gr.Markdown("## π LeadIB AI Grading β Final Flow\nUpload **Question Paper**, **Markscheme**, and **Student Answer Sheet**.\nFlow: merge QP+MS -> transcribe QP+MS (questions first, full markscheme) -> extract IDs -> transcribe AS with expected IDs -> grade -> (optional) imprint. Console prints show progress.")
|
| 566 |
|
| 567 |
with gr.Row():
|
| 568 |
qp_file = gr.File(label="π Upload Question Paper (PDF)")
|
|
|
|
| 576 |
qpms_box = gr.Textbox(label="π QP+MS Transcript", lines=12)
|
| 577 |
as_box = gr.Textbox(label="π AS Transcript", lines=12)
|
| 578 |
|
| 579 |
+
grading_output_box = gr.Textbox(label="π§Ύ Grading (Ma
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|