atz21 commited on
Commit
c962bfa
Β·
verified Β·
1 Parent(s): e598b6e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +203 -352
app.py CHANGED
@@ -2,18 +2,16 @@ import os
2
  import re
3
  import json
4
  import subprocess
5
- import tempfile
6
- import time
7
  import img2pdf
8
  import gradio as gr
9
  import google.generativeai as genai
10
- from markdown_pdf import MarkdownPdf, Section
11
  from pdf2image import convert_from_path
12
  from PIL import Image, ImageDraw, ImageFont
13
  import cv2
14
  import numpy as np
15
  from concurrent.futures import ThreadPoolExecutor, as_completed
16
  from PyPDF2 import PdfReader, PdfWriter
 
17
 
18
  # ---------------- CONFIG ----------------
19
  genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
@@ -21,53 +19,31 @@ GRID_ROWS, GRID_COLS = 20, 14
21
 
22
  # ---------------- PROMPTS ----------------
23
  PROMPTS = {
24
- "QP_MS_TRANSCRIPTION" : {
25
- "role": "system",
26
- "content": """You are a high-quality OCR/Transcription assistant.
27
-
28
  INPUT: This file is a PDF that first contains the Question Paper and immediately after it the Markscheme.
29
-
30
  TASK:
31
  1. Transcribe EXACTLY all the questions FIRST (with their total marks).
32
  2. After ALL questions, transcribe the Markscheme exactly, preserving M/A/R notation in brackets.
33
  3. Always number the questions sequentially (Question 1, Question 2, Question 3, …) **in the order they appear in the PDF**, even if the PDF shows a different number or leaves it blank. Do NOT skip or leave Question: blank.
34
-
35
  FORMAT:
36
  ==== PAPER TOTAL MARKS ====
37
  <total marks>
38
-
39
  ==== QUESTIONS BEGIN ====
40
  Question 1.i
41
  Total Marks: <number>
42
  QP: <question text>
43
  --QUESTION-END--
44
-
45
- Question 1.ii
46
- Total Marks: <number>
47
- QP: <question text>
48
- --QUESTION-END--
49
-
50
- (repeat for all questions in order of appearance)
51
-
52
  ==== QUESTIONS END ====
53
-
54
  ==== MARKSCHEME BEGIN ====
55
  Answer 1.i:
56
  <exact MS for Q1.i with notations M1, A1, R1 etc>
57
-
58
- Answer 1.ii:
59
- <exact MS for Q1.ii with notations>
60
-
61
- Answer 2 :
62
- <exact MS for Q2 with notations>
63
-
64
  (repeat for all answers)
65
  ==== MARKSCHEME END ====
66
  """
67
- }
68
- ,
69
-
70
- # GRADING_PROMPT unchanged except we will print steps around calling it
71
  "GRADING_PROMPT": {
72
  "role": "system",
73
  "content": """Developer: You are an official examiner. Apply the following grading rules precisely.
@@ -87,32 +63,25 @@ Answer 2 :
87
  5. Apply FT where appropriate.
88
  6. Use proper notation: M1A0, A1, etc.
89
  7. Any lost mark: use red `<span style="color:red">M0</span>` and make Reason red.
90
- ---
91
  ## Output Format
92
  Produce two sections per question/sub-question, following this structure:
93
-
94
  ## Question <id>
95
  ### Markscheme vs Student Answer
96
  | Mark ID | Markscheme Expectation | Student’s Response | Awarded |
97
  |---------|------------------------|--------------------|---------|
98
  | M1_1 | Recognise GP | "r=0.9" | M1 |
99
  ➑️ **Total: X/Y**
100
-
101
  ---
102
-
103
  ### Examiner’s Report
104
  At the very end, provide a summary table:
105
  | Question Number | Marks | Remark |
106
  |-----------------|-------|--------|
107
  | 1 | X/Y | <remark> |
108
-
109
  Then show total clearly as a final line:
110
  `Total: <obtained_marks>/<max_marks>`
111
-
112
  NOTES:
113
- - The assistant will receive two transcripts: (1) QP+MS transcription (questions then markscheme) and (2) AS transcription (student answers). Use the QP+MS transcript as the authoritative source of question wording, total marks, and verbatim markscheme entries (M/A/R mark IDs).
114
- - Match student answers to question IDs and grade according to the provided verbatim markscheme.
115
- - Produce full markdown as above. Ensure mark IDs used in the grading are present and consistent with the markscheme.
116
  """
117
  }
118
  }
@@ -128,17 +97,12 @@ def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
128
  if output_path is None:
129
  base, ext = os.path.splitext(input_path)
130
  output_path = f"{base}_compressed{ext}"
131
-
132
  try:
133
  size = os.path.getsize(input_path)
134
  except Exception:
135
  return input_path
136
-
137
  if size <= max_size:
138
- print(f"ℹ️ Not compressing {input_path} ({size/1024/1024:.2f} MB <= {max_size/1024/1024} MB)")
139
  return input_path
140
-
141
- print(f"πŸ”Ž Compressing {input_path} ({size/1024/1024:.2f} MB) -> {output_path}")
142
  try:
143
  gs_cmd = [
144
  "gs", "-sDEVICE=pdfwrite",
@@ -149,35 +113,17 @@ def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
149
  ]
150
  subprocess.run(gs_cmd, check=True)
151
  new_size = os.path.getsize(output_path)
152
- print(f"βœ… Compression done. New size: {new_size/1024/1024:.2f} MB")
153
  if new_size <= max_size:
154
  return output_path
155
- else:
156
- print("⚠️ Compressed file still larger than threshold; returning original")
157
- return input_path
158
- except Exception as e:
159
- print("❌ Compression error:", e)
160
  return input_path
161
 
162
  def create_model():
163
- """
164
- Create the Gemini model and print which model is selected.
165
- """
166
  try:
167
- print("⚑ Attempting to use gemini-2.5-pro model")
168
- model = genai.GenerativeModel("gemini-2.5-pro", generation_config={"temperature": 0})
169
- print("βœ… Selected model: gemini-2.5-pro")
170
- return model
171
- except Exception as e:
172
- print("⚠️ Could not use gemini-2.5-pro:", e)
173
- try:
174
- print("⚑ Falling back to gemini-2.5-flash model")
175
- model = genai.GenerativeModel("gemini-2.5-flash", generation_config={"temperature": 0})
176
- print("βœ… Selected model: gemini-2.5-flash")
177
- return model
178
- except Exception as e:
179
- print("❌ Failed to create any Gemini model:", e)
180
- raise
181
 
182
  def merge_pdfs(paths, output_path):
183
  writer = PdfWriter()
@@ -190,79 +136,49 @@ def merge_pdfs(paths, output_path):
190
  return output_path
191
 
192
  def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=None):
193
- """
194
- Send prompt_text and optionally an uploaded file (or an image object) to the model.
195
- Returns textual response and prints progress.
196
- """
197
  inputs = [prompt_text]
198
  if file_upload_obj:
199
  inputs.append(file_upload_obj)
200
  if image_obj:
201
  inputs.append(image_obj)
202
- print("πŸ“‘ Sending request to Gemini (prompt length:", len(prompt_text), "chars )")
203
  response = model.generate_content(inputs)
204
  raw_text = getattr(response, "text", None)
205
  if not raw_text and getattr(response, "candidates", None):
206
  raw_text = response.candidates[0].content.parts[0].text
207
  if raw_text is None:
208
  raw_text = str(response)
209
- print("πŸ“₯ Received response (chars):", len(raw_text))
210
  return raw_text
211
 
212
  # ---------------- PARSERS ----------------
213
  def extract_question_ids_from_qpms(text):
214
  """
215
- Extract question IDs from QP+MS transcript output.
216
- We expect the QP+MS prompt to produce lines like 'Question: <id>'
217
- Return a list of unique IDs in order of appearance.
218
  """
219
- print("πŸ”Ž Extracting question IDs from QP+MS transcript using regex...")
220
  ids = []
221
- for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\b", text):
222
- qid = m.group(1).strip()
223
- if qid not in ids:
224
- ids.append(qid)
225
- if ids:
226
- print(f"βœ… Extracted {len(ids)} question IDs.")
227
- print("IDs:", ids)
228
- return ids
229
-
230
- # fallback scans
231
- for m in re.finditer(r"(?m)^\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\s*[\.\):\-]\s", text):
232
  qid = m.group(1).strip()
233
- if qid not in ids:
 
 
 
 
234
  ids.append(qid)
235
- if ids:
236
- print(f"βœ… Extracted {len(ids)} question IDs (fallback heuristic).")
237
- print("IDs:", ids)
238
- else:
239
- print("⚠️ No question IDs extracted; will send NA placeholder.")
240
- return ids
241
 
242
  def build_as_prompt_with_expected_ids(expected_ids):
243
  """
244
- Construct the AS transcription prompt injecting the expected IDs block.
 
245
  """
246
- if not expected_ids:
247
- ids_block = "{NA}"
248
- else:
249
- ids_block = "{\n" + "\n".join(expected_ids) + "\n}"
250
  prompt = f"""You are a high-quality handwritten transcription assistant.
251
-
252
  INPUT: This PDF contains a student's handwritten answer sheet.
253
- TASK: Transcribe the student's answers exactly (as text). Preserve step order and line breaks. Attempt to assign each answer to a question ID if the student has labelled it (e.g., "1", "1a", "2(b)", "3"). If the student hasn't labelled answers, segment contiguous answer blocks and attempt to infer question IDs from context β€” but mark inferred IDs clearly as "INFERRED: <id>"
254
-
255
- Enclose all mathematical expressions in Markdown fenced code blocks (``` triple backticks).
256
-
257
- If a diagram/graph is omitted, write [Graph omitted].
258
- Unreadable parts: [illegible].
259
- Unanswered: [No response].
260
-
261
- Do NOT recreate diagrams.
262
-
263
- Ensure consistency and determinism in formatting so subsequent models can grade directly from this aligned format.
264
-
265
- Expected questions (if missing, write NA):
266
  {ids_block}
267
  -----------------------
268
  OUTPUT FORMAT:
@@ -270,61 +186,55 @@ Question <id>
270
  AS:
271
  <transcribed answer or placeholder>
272
  """
273
- return prompt
274
 
275
- def extract_marks_from_grading(grading_text):
276
- """
277
- Parse the grading markdown produced by the GRADING_PROMPT and extract marks per question.
278
- Returns dict: {"grading": [{"question": "1.a", "marks_awarded": ["M1","A1"]}, ...]}
279
- """
280
- print("πŸ”Ž Extracting awarded marks from grading output...")
281
  grading_json = {"grading": []}
282
-
283
  question_blocks = re.split(r"##\s*Question\s+", grading_text)
284
  for block in question_blocks[1:]:
285
  first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
286
  q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
287
- if not q_id_match:
288
- q_id = first_line.split()[0] if first_line else ""
289
- else:
290
- q_id = q_id_match.group(1).strip()
291
  awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
292
- seen = set()
293
- awarded_unique = []
294
- for m in awarded:
295
- if m not in seen:
296
- awarded_unique.append(m)
297
- seen.add(m)
298
- grading_json["grading"].append({
299
- "question": q_id,
300
- "marks_awarded": awarded_unique
301
- })
302
- print("βœ… Extracted grading marks for", len(grading_json["grading"]), "question blocks.")
303
- print(json.dumps(grading_json, indent=2))
304
  return grading_json
305
 
306
- # ---------------- MAPPING/IMPRINT HELPERS ----------------
307
- def ask_gemini_for_mapping_for_page(model, image_path, grading_json, rows=GRID_ROWS, cols=GRID_COLS):
308
  """
309
- Send a single page image along with the grading_json; LLM should return JSON mapping.
 
 
 
 
 
310
  """
311
  prompt = f"""
312
- You are an exam marker. Your role is to identify where each question begins on the page.
313
- The page is divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label (1..{rows*cols}).
314
- For each question in the grading JSON, return the cell NUMBER where the FIRST STEP of that question begins.
315
-
316
- IMPORTANT RULES:
317
- - Do not place marks inside another question's answer area.
318
- - Prefer placing the marks in a BLANK cell immediately to the RIGHT of the answer step. If no blank cell is available to the right, then place in a blank cell to the LEFT.
319
- - Never place marks above or below the answer.
320
- - If a question starts on a previous page, you may omit it for this page.
321
- Return JSON only, like:
322
- [{{"question": "1.a", "cell_number": 15}}, ...]
323
-
324
- Grading JSON:
325
  {json.dumps(grading_json, indent=2)}
 
 
 
 
 
 
 
 
 
 
 
 
 
326
  """
327
- print(f"πŸ“‘ Sending mapping request for image {image_path} to Gemini...")
328
  img = Image.open(image_path)
329
  response = model.generate_content([prompt, img])
330
  raw_text = getattr(response, "text", None)
@@ -332,250 +242,193 @@ Grading JSON:
332
  raw_text = response.candidates[0].content.parts[0].text
333
  if not raw_text:
334
  raw_text = str(response)
335
- print("πŸ“₯ Mapping response (chars):", len(raw_text))
336
  try:
337
  start = raw_text.index('[')
338
  end = raw_text.rindex(']') + 1
339
- json_part = raw_text[start:end]
340
- mapping = json.loads(json_part)
341
- print("βœ… Parsed mapping JSON for", image_path, "| entries:", len(mapping))
342
- return mapping
343
  except Exception:
344
- match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
345
- if match:
346
- try:
347
- mapping = json.loads(match.group(1))
348
- print("βœ… Parsed mapping JSON (alt) for", image_path, "| entries:", len(mapping))
349
- return mapping
350
- except Exception:
351
- pass
352
- print("⚠️ Failed to parse mapping JSON for", image_path)
353
  return []
354
 
355
- def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=GRID_ROWS, cols=GRID_COLS):
356
  """
357
- Convert PDF to images, create grid-numbered images for sending to Gemini,
358
- send all page images in parallel to Gemini for mapping, then annotate and produce imprinted PDF.
 
 
 
 
 
359
  """
360
- print("πŸ“„ Converting answer PDF to images for imprinting...")
361
- pages = convert_from_path(pdf_path, dpi=200)
 
 
 
 
 
 
 
 
362
  annotated_page_paths = []
363
  temp_grid_images = []
364
 
365
- for p_index, page in enumerate(pages):
366
- img = page.convert("RGB")
367
- w, h = img.size
368
- cell_w, cell_h = w / cols, h / rows
369
-
370
  draw = ImageDraw.Draw(img)
371
  try:
372
- num_font = ImageFont.truetype("arial.ttf", 16)
373
- except Exception:
374
- num_font = ImageFont.load_default()
375
 
 
 
376
  cell_num = 1
377
  for r in range(rows):
378
  for c in range(cols):
 
379
  x = int(c * cell_w + cell_w / 2)
380
  y = int(r * cell_h + cell_h / 2)
381
- text = str(cell_num)
382
- bbox = draw.textbbox((0, 0), text, font=num_font)
383
- tw = bbox[2] - bbox[0]
384
- th = bbox[3] - bbox[1]
385
- draw.text((x - tw/2, y - th/2), text, fill="black", font=num_font)
386
- cell_num += 1
387
-
388
- temp_path = f"page_{p_index+1}_grid.png"
389
- img.save(temp_path, "PNG")
390
- temp_grid_images.append(temp_path)
391
- print("πŸ›° Created grid image:", temp_path)
392
-
393
- # Send all grid images in parallel to Gemini to get mappings
394
- print("πŸ“‘ Sending all page images to Gemini in parallel for mapping...")
395
  mappings_per_page = {}
396
- model_local = model
397
- with ThreadPoolExecutor(max_workers=min(8, len(temp_grid_images))) as ex:
398
- futures = {ex.submit(ask_gemini_for_mapping_for_page, model_local, img_path, grading_json, rows, cols): idx
399
- for idx, img_path in enumerate(temp_grid_images)}
400
  for fut in as_completed(futures):
401
  idx = futures[fut]
402
  try:
403
- mapping = fut.result()
404
- except Exception as e:
405
- print("⚠️ Mapping request failed for page", idx, e)
406
- mapping = []
407
- mappings_per_page[idx] = mapping
408
-
409
- # Annotate original pages according to returned mappings
410
- print("πŸ–Š Annotating pages with marks...")
411
- for p_index, page in enumerate(pages):
412
- page_img = page.convert("RGB")
413
- img_cv = np.array(page_img)
414
  img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
415
  h, w, _ = img_cv.shape
416
- cell_w_px, cell_h_px = w / cols, h / rows
417
-
418
  mapping = mappings_per_page.get(p_index, [])
419
  occupied = set()
420
  for item in mapping:
421
  qid = item.get("question")
422
  cell_number = item.get("cell_number")
423
- if qid is None or cell_number is None:
424
- continue
425
-
426
- marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == qid), [])
427
- if not marks_list:
428
- marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", [])
429
- if g["question"].lower() == (qid or "").lower()), [])
430
-
431
  marks_text = ",".join(marks_list) if marks_list else "?"
432
-
433
- row = (cell_number - 1) // cols
434
- col = (cell_number - 1) % cols
435
-
436
  candidates = []
437
- if col + 1 < cols:
438
- candidates.append((row, col + 1))
439
- candidates.append((row, col))
440
- if col - 1 >= 0:
441
- candidates.append((row, col - 1))
442
-
443
- chosen = None
444
- for (r_c, c_c) in candidates:
445
- cell_id = r_c * cols + c_c + 1
446
- if cell_id not in occupied:
447
- chosen = (r_c, c_c)
448
- occupied.add(cell_id)
449
- break
450
- if chosen is None:
451
- chosen = (row, col)
452
-
453
- r_c, c_c = chosen
454
- x_c = int((c_c + 1) * cell_w_px - cell_w_px * 0.1)
455
- y_c = int((r_c + 0.5) * cell_h_px)
456
-
457
- font_scale = max(0.6, min(1.6, cell_h_px / 60.0))
458
- thickness = max(1, int(font_scale * 2))
459
- cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX,
460
- font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
461
 
462
  annotated_path = f"annotated_page_{p_index+1}.png"
463
  cv2.imwrite(annotated_path, img_cv)
464
  annotated_page_paths.append(annotated_path)
465
- print("βœ… Annotated page saved:", annotated_path)
466
 
467
- with open(output_pdf, "wb") as f:
468
- f.write(img2pdf.convert(annotated_page_paths))
 
469
 
470
- compressed = compress_pdf(output_pdf)
471
- print("πŸ“‘ Imprinted PDF saved to:", compressed)
472
- return compressed
473
 
474
- # ---------------- MAIN PIPELINE ----------------
475
  def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
476
- """
477
- Final pipeline implementing requested flow and verbose console logging.
478
- """
479
- try:
480
- print("πŸ” Starting pipeline...")
481
- # Step 0: compress as needed
482
- qp_path = compress_pdf(qp_path)
483
- ms_path = compress_pdf(ms_path)
484
- ans_path = compress_pdf(ans_path)
485
-
486
- # Merge QP + MS
487
- merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
488
- merge_pdfs([qp_path, ms_path], merged_qpms_path)
489
- print("πŸ“Ž Merged QP + MS ->", merged_qpms_path)
490
-
491
- # Upload files to Gemini
492
- print("πŸ”Ό Uploading files to Gemini...")
493
- merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
494
- ans_uploaded = genai.upload_file(path=ans_path, display_name="Answer Sheet")
495
- print("βœ… Upload complete.")
496
-
497
- # Create model and print which selected
498
- model = create_model()
499
-
500
- # Step 1.i: QP+MS transcription (first)
501
- print("1.i) Transcribing QP+MS (questions first, then full markscheme)...")
502
- qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
503
- qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
504
- print("πŸ“„ QP+MS transcription received. Saving debug file: debug_qpms_transcript.txt")
505
- with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f:
506
- f.write(qpms_text)
507
-
508
- # Step 2: extract serial numbers (question IDs) using regex from qpms_text
509
- extracted_ids = extract_question_ids_from_qpms(qpms_text)
510
- if not extracted_ids:
511
- extracted_ids = ["NA"]
512
-
513
- # Step 1.ii: Build AS prompt injecting extracted IDs and transcribe AS
514
- print("1.ii) Building AS transcription prompt with expected question IDs and sending to Gemini...")
515
- as_prompt = build_as_prompt_with_expected_ids(extracted_ids)
516
- as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
517
- print("πŸ“ AS transcription received. Saving debug file: debug_as_transcript.txt")
518
- with open("debug_as_transcript.txt", "w", encoding="utf-8") as f:
519
- f.write(as_text)
520
-
521
- # Step 3: Grading - send both transcripts to grading model
522
- print("2) Preparing grading input and sending to Gemini for grading...")
523
- grading_input = (
524
- "=== QP+MS TRANSCRIPT BEGIN ===\n"
525
- + qpms_text
526
- + "\n=== QP+MS TRANSCRIPT END ===\n\n"
527
- + "=== ANSWER SHEET TRANSCRIPT BEGIN ===\n"
528
- + as_text
529
- + "\n=== ANSWER SHEET TRANSCRIPT END ===\n"
530
- )
531
- grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
532
- grading_text = gemini_generate_content(model, grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input)
533
- print("🧾 Grading output received. Saving debug file: debug_grading.md")
534
- with open("debug_grading.md", "w", encoding="utf-8") as f:
535
- f.write(grading_text)
536
-
537
- # Save grading PDF
538
- base_name = os.path.splitext(os.path.basename(ans_path))[0]
539
- grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
540
- print("πŸ“„ Grading PDF saved:", grading_pdf_path)
541
-
542
- # Step 4: Extract marks for imprinting
543
- grading_json = extract_marks_from_grading(grading_text)
544
- with open("debug_grading_json.json", "w", encoding="utf-8") as f:
545
- json.dump(grading_json, f, indent=2, ensure_ascii=False)
546
- print("πŸ”§ Grading marks extraction complete.")
547
-
548
- imprinted_pdf_path = None
549
- if imprint:
550
- print("✍ Imprint option enabled. Starting imprinting process (parallel mapping requests)...")
551
- imprinted_pdf_path = f"{base_name}_imprinted.pdf"
552
- imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, model)
553
- print("βœ… Imprinting finished. Imprinted PDF at:", imprinted_pdf_path)
554
-
555
- print("🏁 Pipeline finished successfully.")
556
- return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
557
-
558
- except Exception as e:
559
- print("❌ Pipeline error:", e)
560
- return f"❌ Error: {e}", None, None, None, None
561
-
562
- # ---------------- GRADIO UI ----------------
563
- with gr.Blocks(title="LeadIB AI Grading (Final Flow β€” Verbose)") as demo:
564
- gr.Markdown("## πŸ“˜ LeadIB AI Grading β€” Final Flow\nUpload **Question Paper**, **Markscheme**, and **Student Answer Sheet**.\nFlow: merge QP+MS -> transcribe QP+MS (questions first, full markscheme) -> extract IDs -> transcribe AS with expected IDs -> grade -> (optional) imprint. Console prints show progress.")
565
 
566
  with gr.Row():
567
- qp_file = gr.File(label="πŸ“„ Upload Question Paper (PDF)")
568
- ms_file = gr.File(label="πŸ“„ Upload Markscheme (PDF)")
569
- ans_file = gr.File(label="πŸ“ Upload Student Answer Sheet (PDF)")
570
 
571
- imprint_toggle = gr.Checkbox(label="✍ Imprint Marks on Student Answer Sheet", value=False)
572
  run_button = gr.Button("πŸš€ Run Pipeline")
573
 
574
  with gr.Row():
575
  qpms_box = gr.Textbox(label="πŸ“‘ QP+MS Transcript", lines=12)
576
  as_box = gr.Textbox(label="πŸ“ AS Transcript", lines=12)
577
 
578
- grading_output_box = gr.Textbox(label="🧾 Grading (Markdown)", lines=20)
579
  grading_pdf_file = gr.File(label="πŸ“₯ Download Grading PDF")
580
  imprint_pdf_file = gr.File(label="πŸ“₯ Download Imprinted PDF (Optional)")
581
 
@@ -583,11 +436,9 @@ with gr.Blocks(title="LeadIB AI Grading (Final Flow β€” Verbose)") as demo:
583
  qp_path = qp_file_obj.name
584
  ms_path = ms_file_obj.name
585
  ans_path = ans_file_obj.name
586
-
587
  qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path = align_and_grade_pipeline(
588
  qp_path, ms_path, ans_path, imprint=imprint_flag
589
  )
590
-
591
  return qpms_text or "", as_text or "", grading_text or "", grading_pdf_path, imprinted_pdf_path
592
 
593
  run_button.click(
 
2
  import re
3
  import json
4
  import subprocess
 
 
5
  import img2pdf
6
  import gradio as gr
7
  import google.generativeai as genai
 
8
  from pdf2image import convert_from_path
9
  from PIL import Image, ImageDraw, ImageFont
10
  import cv2
11
  import numpy as np
12
  from concurrent.futures import ThreadPoolExecutor, as_completed
13
  from PyPDF2 import PdfReader, PdfWriter
14
+ from markdown_pdf import MarkdownPdf, Section
15
 
16
  # ---------------- CONFIG ----------------
17
  genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
 
19
 
20
  # ---------------- PROMPTS ----------------
21
  PROMPTS = {
22
+ "QP_MS_TRANSCRIPTION": {
23
+ "role": "system",
24
+ "content": """You are a high-quality OCR/Transcription assistant.
 
25
  INPUT: This file is a PDF that first contains the Question Paper and immediately after it the Markscheme.
 
26
  TASK:
27
  1. Transcribe EXACTLY all the questions FIRST (with their total marks).
28
  2. After ALL questions, transcribe the Markscheme exactly, preserving M/A/R notation in brackets.
29
  3. Always number the questions sequentially (Question 1, Question 2, Question 3, …) **in the order they appear in the PDF**, even if the PDF shows a different number or leaves it blank. Do NOT skip or leave Question: blank.
 
30
  FORMAT:
31
  ==== PAPER TOTAL MARKS ====
32
  <total marks>
 
33
  ==== QUESTIONS BEGIN ====
34
  Question 1.i
35
  Total Marks: <number>
36
  QP: <question text>
37
  --QUESTION-END--
38
+ (repeat for all questions)
 
 
 
 
 
 
 
39
  ==== QUESTIONS END ====
 
40
  ==== MARKSCHEME BEGIN ====
41
  Answer 1.i:
42
  <exact MS for Q1.i with notations M1, A1, R1 etc>
 
 
 
 
 
 
 
43
  (repeat for all answers)
44
  ==== MARKSCHEME END ====
45
  """
46
+ },
 
 
 
47
  "GRADING_PROMPT": {
48
  "role": "system",
49
  "content": """Developer: You are an official examiner. Apply the following grading rules precisely.
 
63
  5. Apply FT where appropriate.
64
  6. Use proper notation: M1A0, A1, etc.
65
  7. Any lost mark: use red `<span style="color:red">M0</span>` and make Reason red.
66
+ ---
67
  ## Output Format
68
  Produce two sections per question/sub-question, following this structure:
 
69
  ## Question <id>
70
  ### Markscheme vs Student Answer
71
  | Mark ID | Markscheme Expectation | Student’s Response | Awarded |
72
  |---------|------------------------|--------------------|---------|
73
  | M1_1 | Recognise GP | "r=0.9" | M1 |
74
  ➑️ **Total: X/Y**
 
75
  ---
 
76
  ### Examiner’s Report
77
  At the very end, provide a summary table:
78
  | Question Number | Marks | Remark |
79
  |-----------------|-------|--------|
80
  | 1 | X/Y | <remark> |
 
81
  Then show total clearly as a final line:
82
  `Total: <obtained_marks>/<max_marks>`
 
83
  NOTES:
84
+ - The assistant will receive two transcripts: (1) QP+MS transcription (questions then markscheme) and (2) AS transcription (student answers). Use the QP+MS transcript as the authoritative source.
 
 
85
  """
86
  }
87
  }
 
97
  if output_path is None:
98
  base, ext = os.path.splitext(input_path)
99
  output_path = f"{base}_compressed{ext}"
 
100
  try:
101
  size = os.path.getsize(input_path)
102
  except Exception:
103
  return input_path
 
104
  if size <= max_size:
 
105
  return input_path
 
 
106
  try:
107
  gs_cmd = [
108
  "gs", "-sDEVICE=pdfwrite",
 
113
  ]
114
  subprocess.run(gs_cmd, check=True)
115
  new_size = os.path.getsize(output_path)
 
116
  if new_size <= max_size:
117
  return output_path
118
+ return input_path
119
+ except Exception:
 
 
 
120
  return input_path
121
 
122
  def create_model():
 
 
 
123
  try:
124
+ return genai.GenerativeModel("gemini-2.5-pro", generation_config={"temperature": 0})
125
+ except Exception:
126
+ return genai.GenerativeModel("gemini-2.5-flash", generation_config={"temperature": 0})
 
 
 
 
 
 
 
 
 
 
 
127
 
128
  def merge_pdfs(paths, output_path):
129
  writer = PdfWriter()
 
136
  return output_path
137
 
138
  def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=None):
 
 
 
 
139
  inputs = [prompt_text]
140
  if file_upload_obj:
141
  inputs.append(file_upload_obj)
142
  if image_obj:
143
  inputs.append(image_obj)
 
144
  response = model.generate_content(inputs)
145
  raw_text = getattr(response, "text", None)
146
  if not raw_text and getattr(response, "candidates", None):
147
  raw_text = response.candidates[0].content.parts[0].text
148
  if raw_text is None:
149
  raw_text = str(response)
 
150
  return raw_text
151
 
152
  # ---------------- PARSERS ----------------
153
  def extract_question_ids_from_qpms(text):
154
  """
155
+ Find all question ids in order *without* deduplication.
156
+ We will collect every match in sequence exactly as found.
 
157
  """
 
158
  ids = []
159
+ # first try an explicit "Question: <id>" pattern
160
+ for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\b", text):
 
 
 
 
 
 
 
 
 
161
  qid = m.group(1).strip()
162
+ ids.append(qid)
163
+ # if none found by that pattern, use a looser leading numbering pattern
164
+ if not ids:
165
+ for m in re.finditer(r"(?m)^\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\s*[\.\):\-]\s", text):
166
+ qid = m.group(1).strip()
167
  ids.append(qid)
168
+ return ids if ids else ["NA"]
 
 
 
 
 
169
 
170
  def build_as_prompt_with_expected_ids(expected_ids):
171
  """
172
+ Build the AS transcription prompt; also useful to produce an ids_block string
173
+ that can be passed to the imprint mapping prompt.
174
  """
175
+ ids_block = "{\n" + "\n".join(expected_ids) + "\n}" if expected_ids else "{NA}"
 
 
 
176
  prompt = f"""You are a high-quality handwritten transcription assistant.
 
177
  INPUT: This PDF contains a student's handwritten answer sheet.
178
+ TASK: Transcribe the student's answers exactly (as text), preserving step order and line breaks.
179
+ Attempt to assign each answer to a question ID if student labelled it; else mark as INFERRED.
180
+ Enclose math in ``` blocks, diagrams as [Graph omitted], unreadable as [illegible].
181
+ Expected questions:
 
 
 
 
 
 
 
 
 
182
  {ids_block}
183
  -----------------------
184
  OUTPUT FORMAT:
 
186
  AS:
187
  <transcribed answer or placeholder>
188
  """
189
+ return prompt, ids_block
190
 
191
+ def extract_marks_from_grading_exact(grading_text):
 
 
 
 
 
192
  grading_json = {"grading": []}
 
193
  question_blocks = re.split(r"##\s*Question\s+", grading_text)
194
  for block in question_blocks[1:]:
195
  first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
196
  q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
197
+ q_id = q_id_match.group(1).strip() if q_id_match else first_line.split()[0] if first_line else ""
 
 
 
198
  awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
199
+ grading_json["grading"].append({"question": q_id, "marks_awarded": awarded})
 
 
 
 
 
 
 
 
 
 
 
200
  return grading_json
201
 
202
+ # ---------------- IMPRINT ----------------
203
+ def ask_gemini_for_mapping_for_page_v2(model, image_path, grading_json, question_scheme, ids_block, rows=GRID_ROWS, cols=GRID_COLS):
204
  """
205
+ Ask Gemini to map question IDs (from ids_block) to cell numbers on this page.
206
+ We pass the ids_block explicitly (same block used when transcribing student answers)
207
+ and instruct the model to return JSON only: a list of {"question":"<id>","cell_number":N}.
208
+
209
+ Also instruct the LLM about mislabelled subparts: e.g., if it sees 'ii)' above
210
+ 'Q4.i' without a number, it may belong to Q3.ii.
211
  """
212
  prompt = f"""
213
+ You are an exam marker. Identify where each question listed in the ids block begins on this page.
214
+ The page has {rows}x{cols} grid (cells 1..{rows*cols}).
215
+ QUESTION IDS (expected) you must look for:
216
+ {ids_block}
217
+
218
+ Question scheme (authoritative transcription excerpt):
219
+ {question_scheme}
220
+
221
+ Grading JSON (marks awarded summary):
 
 
 
 
222
  {json.dumps(grading_json, indent=2)}
223
+
224
+ Instructions (IMPORTANT):
225
+ - Only return questions from the provided IDs block above.
226
+ - For each question you find on this page, return the single grid cell number where the first step of that question begins.
227
+ - If you see a subpart like `ii)` with no leading question number directly above or below another labelled subpart, try to infer which question it belongs to (example: if you find `Q4.i` and above it you see `ii)` alone, it may be `Q3.ii` β€” if you infer like this explain your reasoning briefly in the JSON entry's optional "note" field).
228
+ - Avoid placing marks inside another question's area; prefer an adjacent blank cell to the RIGHT if possible, else LEFT.
229
+ - Return JSON only, exactly like:
230
+ [
231
+ {"question":"1.a","cell_number":15},
232
+ {"question":"3.ii","cell_number":23, "note":"inferred from unlabeled ii above Q4.i"},
233
+ ...
234
+ ]
235
+ - If no instances of an expected question appear on this page, return an empty list: [].
236
  """
237
+ # load image (PIL) so the model can see it if supported
238
  img = Image.open(image_path)
239
  response = model.generate_content([prompt, img])
240
  raw_text = getattr(response, "text", None)
 
242
  raw_text = response.candidates[0].content.parts[0].text
243
  if not raw_text:
244
  raw_text = str(response)
245
+ # try to extract a JSON array from the output
246
  try:
247
  start = raw_text.index('[')
248
  end = raw_text.rindex(']') + 1
249
+ return json.loads(raw_text[start:end])
 
 
 
250
  except Exception:
251
+ # if parsing fails, return an empty list for safety
 
 
 
 
 
 
 
 
252
  return []
253
 
254
+ def imprint_marks_using_mapping_v2(pdf_path, grading_json, output_pdf, question_scheme, model, expected_ids, rows=GRID_ROWS, cols=GRID_COLS):
255
  """
256
+ Imprint marks onto the student answer PDF while preserving original page size.
257
+
258
+ - Reads page size from PDF (points).
259
+ - Converts pages to images with convert_from_path(..., size=(width_pt,height_pt)).
260
+ - Places grid and marks using cell widths/heights computed from the resulting image.
261
+ - Does NOT rescale images later.
262
+ - Writes final imprinted PDF using img2pdf with original page dimensions.
263
  """
264
+ reader = PdfReader(pdf_path)
265
+ # use the first page size as canonical for all pages (could be extended to per-page)
266
+ page0 = reader.pages[0]
267
+ width_pt = float(page0.mediabox.width) # points
268
+ height_pt = float(page0.mediabox.height)
269
+
270
+ # convert PDF pages to images with exact size -> 1 image pixel β‰ˆ 1 PDF point
271
+ # NOTE: pdf2image size expects a tuple of ints
272
+ pages = convert_from_path(pdf_path, size=(int(width_pt), int(height_pt)))
273
+
274
  annotated_page_paths = []
275
  temp_grid_images = []
276
 
277
+ # Draw grid numbers on a copy (useful to send to LLM to ask mapping)
278
+ for p_index, page_img in enumerate(pages):
279
+ img = page_img.convert("RGB")
 
 
280
  draw = ImageDraw.Draw(img)
281
  try:
282
+ font = ImageFont.truetype("arial.ttf", 16)
283
+ except:
284
+ font = ImageFont.load_default()
285
 
286
+ cell_w = img.width / cols
287
+ cell_h = img.height / rows
288
  cell_num = 1
289
  for r in range(rows):
290
  for c in range(cols):
291
+ # center of cell
292
  x = int(c * cell_w + cell_w / 2)
293
  y = int(r * cell_h + cell_h / 2)
294
+ bbox = draw.textbbox((0,0), str(cell_num), font=font)
295
+ draw.text((x - (bbox[2]-bbox[0])/2, y - (bbox[3]-bbox[1])/2), str(cell_num), fill="black", font=font)
296
+ cell_num +=1
297
+ grid_path = f"page_{p_index+1}_grid.png"
298
+ img.save(grid_path, "PNG")
299
+ temp_grid_images.append(grid_path)
300
+
301
+ # Build ids_block from expected_ids
302
+ ids_block = "{\n" + "\n".join(expected_ids) + "\n}" if expected_ids else "{NA}"
303
+
304
+ # Ask model to map each page (parallel)
 
 
 
305
  mappings_per_page = {}
306
+ with ThreadPoolExecutor(max_workers=min(8,len(temp_grid_images))) as ex:
307
+ futures = {ex.submit(ask_gemini_for_mapping_for_page_v2, model, img_path, grading_json, question_scheme, ids_block, rows, cols): idx
308
+ for idx,img_path in enumerate(temp_grid_images)}
 
309
  for fut in as_completed(futures):
310
  idx = futures[fut]
311
  try:
312
+ mappings_per_page[idx] = fut.result()
313
+ except:
314
+ mappings_per_page[idx] = []
315
+
316
+ # Annotate original pages (no rescaling)
317
+ for p_index, page_img in enumerate(pages):
318
+ img_cv = np.array(page_img.convert("RGB"))
 
 
 
 
319
  img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
320
  h, w, _ = img_cv.shape
321
+ cell_w_px, cell_h_px = w/cols, h/rows
 
322
  mapping = mappings_per_page.get(p_index, [])
323
  occupied = set()
324
  for item in mapping:
325
  qid = item.get("question")
326
  cell_number = item.get("cell_number")
327
+ if qid is None or cell_number is None: continue
328
+ # find marks for that question in grading_json (exact string match)
329
+ marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"]==qid), [])
 
 
 
 
 
330
  marks_text = ",".join(marks_list) if marks_list else "?"
331
+ # map cell_number -> row/col
332
+ row = (cell_number-1)//cols
333
+ col = (cell_number-1)%cols
334
+ # choose preference: right, same, left
335
  candidates = []
336
+ if col+1<cols: candidates.append((row,col+1))
337
+ candidates.append((row,col))
338
+ if col-1>=0: candidates.append((row,col-1))
339
+ chosen = next(((r,c) for r,c in candidates if (r*cols+c+1) not in occupied), (row,col))
340
+ occupied.add(chosen[0]*cols+chosen[1]+1)
341
+ x_c = int((chosen[1]+0.5)*cell_w_px)
342
+ y_c = int((chosen[0]+0.5)*cell_h_px)
343
+ # draw mark text directly onto image (OpenCV uses BGR)
344
+ font_scale = max(0.6,min(1.6,cell_h_px/60))
345
+ thickness = max(1,int(font_scale*2))
346
+ # ensure text doesn't go out of bounds; shift up a bit
347
+ text_size = cv2.getTextSize(marks_text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness)[0]
348
+ x_draw = max(0, min(w - text_size[0], x_c - text_size[0]//2))
349
+ y_draw = max(text_size[1], min(h - 1, y_c + text_size[1]//2))
350
+ cv2.putText(img_cv, marks_text, (x_draw,y_draw), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0,0,255), thickness)
351
+
352
+ # optional: if mapping includes "note", draw a small 'i' icon nearby
353
+ if item.get("note"):
354
+ note_text = "i"
355
+ ns = cv2.getTextSize(note_text, cv2.FONT_HERSHEY_SIMPLEX, font_scale*0.8, max(1,int(thickness/2)))[0]
356
+ nx = max(0, x_draw + text_size[0] + 4)
357
+ ny = max(ns[1], y_draw)
358
+ cv2.putText(img_cv, note_text, (nx, ny), cv2.FONT_HERSHEY_SIMPLEX, font_scale*0.8, (0,0,0), max(1,int(thickness/2)))
 
359
 
360
  annotated_path = f"annotated_page_{p_index+1}.png"
361
  cv2.imwrite(annotated_path, img_cv)
362
  annotated_page_paths.append(annotated_path)
 
363
 
364
+ # Recreate PDF using img2pdf with the original page dimensions (width_pt,height_pt)
365
+ with open(output_pdf,"wb") as f:
366
+ f.write(img2pdf.convert(annotated_page_paths, layout_fun=img2pdf.get_layout_fun((width_pt,height_pt))))
367
 
368
+ return compress_pdf(output_pdf)
 
 
369
 
370
+ # ---------------- PIPELINE ----------------
371
  def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
372
+ qp_path = compress_pdf(qp_path)
373
+ ms_path = compress_pdf(ms_path)
374
+ ans_path = compress_pdf(ans_path)
375
+
376
+ merged_qpms_path = os.path.splitext(qp_path)[0]+"_merged_qp_ms.pdf"
377
+ merge_pdfs([qp_path, ms_path], merged_qpms_path)
378
+
379
+ merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
380
+ ans_uploaded = genai.upload_file(path=ans_path, display_name="Answer Sheet")
381
+
382
+ model = create_model()
383
+
384
+ qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
385
+ qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
386
+
387
+ # extract question ids (no deduplication)
388
+ extracted_ids = extract_question_ids_from_qpms(qpms_text)
389
+
390
+ # build AS prompt (and get ids_block)
391
+ as_prompt, ids_block = build_as_prompt_with_expected_ids(extracted_ids)
392
+ as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
393
+
394
+ grading_input = (
395
+ "=== QP+MS TRANSCRIPT BEGIN ===\n"+qpms_text+
396
+ "\n=== QP+MS TRANSCRIPT END ===\n\n"+
397
+ "=== ANSWER SHEET TRANSCRIPT BEGIN ===\n"+as_text+
398
+ "\n=== ANSWER SHEET TRANSCRIPT END ===\n"
399
+ )
400
+ grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
401
+ grading_text = gemini_generate_content(model, grading_prompt_system+"\n\nPlease grade the following transcripts:\n"+grading_input)
402
+
403
+ grading_pdf_path = save_as_pdf(grading_text, os.path.splitext(os.path.basename(ans_path))[0]+"_graded.pdf")
404
+ grading_json = extract_marks_from_grading_exact(grading_text)
405
+
406
+ imprinted_pdf_path = None
407
+ if imprint:
408
+ question_scheme = qpms_text
409
+ imprinted_pdf_path = os.path.splitext(os.path.basename(ans_path))[0]+"_imprinted.pdf"
410
+ # Pass expected_ids (extracted_ids) to imprint function so it can build the ids_block and ask the model
411
+ imprinted_pdf_path = imprint_marks_using_mapping_v2(ans_path, grading_json, imprinted_pdf_path, question_scheme, model, expected_ids=extracted_ids)
412
+
413
+ return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
414
+
415
+ # ---------------- GRADIO ----------------
416
+ with gr.Blocks(title="LeadIB AI Grading (Updated Imprint)") as demo:
417
+ gr.Markdown("## πŸ“˜ LeadIB AI Grading β€” Updated Imprint Pipeline\nUpload QP, Markscheme, and Student Answer Sheet.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
418
 
419
  with gr.Row():
420
+ qp_file = gr.File(label="πŸ“„ Question Paper (PDF)")
421
+ ms_file = gr.File(label="πŸ“„ Markscheme (PDF)")
422
+ ans_file = gr.File(label="πŸ“ Student Answer Sheet (PDF)")
423
 
424
+ imprint_toggle = gr.Checkbox(label="✍ Imprint Marks", value=False)
425
  run_button = gr.Button("πŸš€ Run Pipeline")
426
 
427
  with gr.Row():
428
  qpms_box = gr.Textbox(label="πŸ“‘ QP+MS Transcript", lines=12)
429
  as_box = gr.Textbox(label="πŸ“ AS Transcript", lines=12)
430
 
431
+ grading_output_box = gr.Textbox(label="🧾 Grading Markdown", lines=20)
432
  grading_pdf_file = gr.File(label="πŸ“₯ Download Grading PDF")
433
  imprint_pdf_file = gr.File(label="πŸ“₯ Download Imprinted PDF (Optional)")
434
 
 
436
  qp_path = qp_file_obj.name
437
  ms_path = ms_file_obj.name
438
  ans_path = ans_file_obj.name
 
439
  qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path = align_and_grade_pipeline(
440
  qp_path, ms_path, ans_path, imprint=imprint_flag
441
  )
 
442
  return qpms_text or "", as_text or "", grading_text or "", grading_pdf_path, imprinted_pdf_path
443
 
444
  run_button.click(