Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -15,60 +15,50 @@ import numpy as np
|
|
| 15 |
from concurrent.futures import ThreadPoolExecutor, as_completed
|
| 16 |
from PyPDF2 import PdfReader, PdfWriter
|
| 17 |
|
| 18 |
-
# ---------- CONFIG ----------
|
| 19 |
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
|
| 20 |
GRID_ROWS, GRID_COLS = 20, 14
|
| 21 |
|
| 22 |
-
# ---------- PROMPTS
|
| 23 |
PROMPTS = {
|
| 24 |
-
"
|
| 25 |
"role": "system",
|
| 26 |
"content": """You are a high-quality OCR/Transcription assistant.
|
| 27 |
|
| 28 |
-
INPUT: This file is a PDF that
|
| 29 |
-
TASK: Produce an exact transcription in plain text with clear separators.
|
| 30 |
-
- Question ID (exact as printed, e.g., "1", "2(a)", "3.b", "4(ii)")
|
| 31 |
-
- Question text (exact wording; do not change punctuation)
|
| 32 |
-
- Total marks for the question (if printed; otherwise try to infer/leave blank)
|
| 33 |
|
| 34 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 35 |
|
| 36 |
-
|
| 37 |
-
|
| 38 |
-
|
| 39 |
-
QUESTION BEGIN
|
| 40 |
-
ID: <id>
|
| 41 |
-
QTEXT:
|
| 42 |
-
<question text (multiline)>
|
| 43 |
-
TOTAL_MARKS: <integer or empty>
|
| 44 |
-
MARKSCHEME:
|
| 45 |
-
<verbatim markscheme lines for this question (multiline)>
|
| 46 |
-
QUESTION END
|
| 47 |
-
----
|
| 48 |
-
Repeat for every question in order. If some part is not available, leave the field empty but keep the block structure.
|
| 49 |
-
"""
|
| 50 |
-
},
|
| 51 |
|
| 52 |
-
|
| 53 |
-
|
| 54 |
-
"content": """You are a high-quality handwritten transcription assistant.
|
| 55 |
|
| 56 |
-
|
| 57 |
-
|
|
|
|
| 58 |
|
| 59 |
-
|
| 60 |
-
|
| 61 |
-
|
| 62 |
-
|
| 63 |
-
|
| 64 |
-
|
| 65 |
-
<
|
| 66 |
-
|
| 67 |
-
----
|
| 68 |
-
|
| 69 |
"""
|
| 70 |
},
|
| 71 |
-
|
| 72 |
"GRADING_PROMPT": {
|
| 73 |
"role": "system",
|
| 74 |
"content": """Developer: You are an official examiner. Apply the following grading rules precisely.
|
|
@@ -90,29 +80,35 @@ Repeat for each student answer block found.
|
|
| 90 |
7. Any lost mark: use red `<span style="color:red">M0</span>` and make Reason red.
|
| 91 |
---
|
| 92 |
## Output Format
|
| 93 |
-
Produce two sections per question/sub-question:
|
| 94 |
-
|
| 95 |
-
## Question
|
| 96 |
### Markscheme vs Student Answer
|
| 97 |
| Mark ID | Markscheme Expectation | Studentβs Response | Awarded |
|
| 98 |
|---------|------------------------|--------------------|---------|
|
| 99 |
| M1_1 | Recognise GP | "r=0.9" | M1 |
|
| 100 |
-
β‘οΈ **Total:
|
|
|
|
| 101 |
---
|
|
|
|
| 102 |
### Examinerβs Report
|
| 103 |
At the very end, provide a summary table:
|
| 104 |
| Question Number | Marks | Remark |
|
| 105 |
|-----------------|-------|--------|
|
| 106 |
-
| 1 |
|
| 107 |
-
|
| 108 |
-
|
|
|
|
| 109 |
|
| 110 |
-
NOTES:
|
|
|
|
|
|
|
|
|
|
| 111 |
"""
|
| 112 |
}
|
| 113 |
}
|
| 114 |
|
| 115 |
-
# ----------------
|
| 116 |
def save_as_pdf(text, filename="output.pdf"):
|
| 117 |
pdf = MarkdownPdf()
|
| 118 |
pdf.add_section(Section(text, toc=False))
|
|
@@ -130,10 +126,8 @@ def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
|
| 130 |
return input_path
|
| 131 |
|
| 132 |
if size <= max_size:
|
| 133 |
-
print(f"βΉοΈ Not compressing {input_path} ({size/1024/1024:.2f} MB <= {max_size/1024/1024} MB)")
|
| 134 |
return input_path
|
| 135 |
|
| 136 |
-
print(f"π Compressing {input_path} ({size/1024/1024:.2f} MB) -> {output_path}")
|
| 137 |
try:
|
| 138 |
gs_cmd = [
|
| 139 |
"gs", "-sDEVICE=pdfwrite",
|
|
@@ -144,27 +138,118 @@ def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
|
| 144 |
]
|
| 145 |
subprocess.run(gs_cmd, check=True)
|
| 146 |
new_size = os.path.getsize(output_path)
|
| 147 |
-
print(f"β
Compression done. New size: {new_size/1024/1024:.2f} MB")
|
| 148 |
if new_size <= max_size:
|
| 149 |
return output_path
|
| 150 |
else:
|
| 151 |
-
print("β οΈ Compressed file still larger than threshold; returning original")
|
| 152 |
return input_path
|
| 153 |
-
except Exception
|
| 154 |
-
print("β Compression error:", e)
|
| 155 |
return input_path
|
| 156 |
|
| 157 |
def create_model():
|
| 158 |
try:
|
| 159 |
-
print("β‘ Using gemini-2.5-pro model")
|
| 160 |
return genai.GenerativeModel("gemini-2.5-pro", generation_config={"temperature": 0})
|
| 161 |
except Exception:
|
| 162 |
-
print("β‘ Falling back to gemini-2.5-flash model")
|
| 163 |
return genai.GenerativeModel("gemini-2.5-flash", generation_config={"temperature": 0})
|
| 164 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 165 |
def extract_marks_from_grading(grading_text):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 166 |
grading_json = {"grading": []}
|
| 167 |
-
|
|
|
|
| 168 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 169 |
for block in question_blocks[1:]:
|
| 170 |
first_line = block.strip().splitlines()[0].strip()
|
|
@@ -186,159 +271,11 @@ def extract_marks_from_grading(grading_text):
|
|
| 186 |
})
|
| 187 |
return grading_json
|
| 188 |
|
| 189 |
-
# ----------
|
| 190 |
-
def
|
| 191 |
-
writer = PdfWriter()
|
| 192 |
-
for p in paths:
|
| 193 |
-
reader = PdfReader(p)
|
| 194 |
-
for page in reader.pages:
|
| 195 |
-
writer.add_page(page)
|
| 196 |
-
with open(output_path, "wb") as f:
|
| 197 |
-
writer.write(f)
|
| 198 |
-
return output_path
|
| 199 |
-
|
| 200 |
-
# ---------- Transcript parsing helpers ----------
|
| 201 |
-
def parse_qp_ms_transcript(text):
|
| 202 |
-
"""
|
| 203 |
-
Parse QP+MS transcript produced according to the QP_MS_TRANSCRIBE prompt blocks.
|
| 204 |
-
Expected block markers: QUESTION BEGIN ... QUESTION END with fields ID, QTEXT, TOTAL_MARKS, MARKSCHEME.
|
| 205 |
-
Return list of questions: {id, qp, total_marks, ms}
|
| 206 |
-
"""
|
| 207 |
-
questions = []
|
| 208 |
-
# Try to find blocks using the explicit markers we requested
|
| 209 |
-
blocks = re.findall(r"QUESTION BEGIN(.*?)QUESTION END", text, flags=re.DOTALL | re.IGNORECASE)
|
| 210 |
-
if blocks:
|
| 211 |
-
for block in blocks:
|
| 212 |
-
id_match = re.search(r"ID:\s*(.+)", block)
|
| 213 |
-
qtext_match = re.search(r"QTEXT:\s*(.*?)\n(?:TOTAL_MARKS:|MARKSCHEME:)", block, flags=re.DOTALL)
|
| 214 |
-
tm_match = re.search(r"TOTAL_MARKS:\s*(.*)", block)
|
| 215 |
-
ms_match = re.search(r"MARKSCHEME:\s*(.*)", block, flags=re.DOTALL)
|
| 216 |
-
qid = id_match.group(1).strip() if id_match else ""
|
| 217 |
-
qtext = qtext_match.group(1).strip() if qtext_match else ""
|
| 218 |
-
total_marks = tm_match.group(1).strip() if tm_match else ""
|
| 219 |
-
# try to normalize total_marks to int if possible
|
| 220 |
-
try:
|
| 221 |
-
total_marks = int(re.search(r"\d+", total_marks).group(0)) if total_marks else None
|
| 222 |
-
except Exception:
|
| 223 |
-
total_marks = None
|
| 224 |
-
ms = ms_match.group(1).strip() if ms_match else ""
|
| 225 |
-
questions.append({
|
| 226 |
-
"id": qid,
|
| 227 |
-
"qp": qtext,
|
| 228 |
-
"total_marks": total_marks,
|
| 229 |
-
"ms": ms
|
| 230 |
-
})
|
| 231 |
-
return questions
|
| 232 |
-
|
| 233 |
-
# Fallback: If model didn't follow markers, try splitting by lines that look like question headers
|
| 234 |
-
# This is conservative: find headings like "1", "1.", "1(a)" at line starts
|
| 235 |
-
parts = re.split(r"(?m)^\s*(\d+(?:\([a-zA-Z0-9]+\)|[a-zA-Z]|\.[a-zA-Z0-9]+)?)\s*[\.\):\-]\s*", text)
|
| 236 |
-
# parts list pattern: [pretext, id1, body1, id2, body2, ...]
|
| 237 |
-
if len(parts) >= 3:
|
| 238 |
-
it = iter(parts)
|
| 239 |
-
pre = next(it)
|
| 240 |
-
while True:
|
| 241 |
-
try:
|
| 242 |
-
qid = next(it).strip()
|
| 243 |
-
body = next(it)
|
| 244 |
-
except StopIteration:
|
| 245 |
-
break
|
| 246 |
-
# try to separate question text and markscheme inside body using "Markscheme" keyword
|
| 247 |
-
ms_split = re.split(r"(?i)\bmarkscheme\b|(?i)\bmark scheme\b", body, maxsplit=1)
|
| 248 |
-
if len(ms_split) == 2:
|
| 249 |
-
qtext = ms_split[0].strip(":-\n ")
|
| 250 |
-
ms = ms_split[1].strip()
|
| 251 |
-
else:
|
| 252 |
-
# try to look for "Marks" summary then rest
|
| 253 |
-
m_search = re.search(r"(?i)\bmarks[:\s]*\d+", body)
|
| 254 |
-
if m_search:
|
| 255 |
-
# take text before marks as qtext
|
| 256 |
-
qtext = body[:m_search.start()].strip()
|
| 257 |
-
ms = body[m_search.start():].strip()
|
| 258 |
-
else:
|
| 259 |
-
# fallback: put entire body into qp and ms empty
|
| 260 |
-
qtext = body.strip()
|
| 261 |
-
ms = ""
|
| 262 |
-
# try to find total marks integer
|
| 263 |
-
tm = None
|
| 264 |
-
tm_found = re.search(r"(?i)(?:total\s*marks|marks|[\/]\s*\d+|out of)\s*[:\s]*?(\d+)", body)
|
| 265 |
-
if tm_found:
|
| 266 |
-
try:
|
| 267 |
-
tm = int(tm_found.group(1))
|
| 268 |
-
except:
|
| 269 |
-
tm = None
|
| 270 |
-
questions.append({
|
| 271 |
-
"id": qid,
|
| 272 |
-
"qp": qtext,
|
| 273 |
-
"total_marks": tm,
|
| 274 |
-
"ms": ms
|
| 275 |
-
})
|
| 276 |
-
return questions
|
| 277 |
-
|
| 278 |
-
# If nothing found, return one block with raw text as fallback
|
| 279 |
-
return [{"id": "1", "qp": text.strip(), "total_marks": None, "ms": ""}]
|
| 280 |
-
|
| 281 |
-
def parse_as_transcript(text):
|
| 282 |
-
"""
|
| 283 |
-
Parse AS transcript into answer blocks. Expected markers ANSWER BEGIN ... ANSWER END.
|
| 284 |
-
Return list: {id, ans}
|
| 285 |
-
"""
|
| 286 |
-
answers = []
|
| 287 |
-
blocks = re.findall(r"ANSWER BEGIN(.*?)ANSWER END", text, flags=re.DOTALL | re.IGNORECASE)
|
| 288 |
-
if blocks:
|
| 289 |
-
for block in blocks:
|
| 290 |
-
id_match = re.search(r"ID:\s*(.+)", block)
|
| 291 |
-
ans_match = re.search(r"ANSWER:\s*(.*)", block, flags=re.DOTALL)
|
| 292 |
-
qid = id_match.group(1).strip() if id_match else ""
|
| 293 |
-
ans = ans_match.group(1).strip() if ans_match else block.strip()
|
| 294 |
-
answers.append({
|
| 295 |
-
"id": qid,
|
| 296 |
-
"as": ans
|
| 297 |
-
})
|
| 298 |
-
return answers
|
| 299 |
-
|
| 300 |
-
# Fallback: split by likely question labels in the student's transcription, e.g., "1.", "1)", "1a."
|
| 301 |
-
parts = re.split(r"(?m)^\s*(\d+(?:[a-zA-Z]|\([^\)]+\))?)\s*[\.\):\-]\s*", text)
|
| 302 |
-
if len(parts) >= 3:
|
| 303 |
-
it = iter(parts)
|
| 304 |
-
pre = next(it)
|
| 305 |
-
while True:
|
| 306 |
-
try:
|
| 307 |
-
qid = next(it).strip()
|
| 308 |
-
body = next(it)
|
| 309 |
-
except StopIteration:
|
| 310 |
-
break
|
| 311 |
-
answers.append({"id": qid, "as": body.strip()})
|
| 312 |
-
return answers
|
| 313 |
-
|
| 314 |
-
# If no structure at all, try to chunk by double newlines
|
| 315 |
-
chunks = [c.strip() for c in text.split("\n\n") if c.strip()]
|
| 316 |
-
for i, c in enumerate(chunks, start=1):
|
| 317 |
-
answers.append({"id": f"INFERRED:{i}", "as": c})
|
| 318 |
-
return answers
|
| 319 |
-
|
| 320 |
-
# ---------- Gemini call wrapper ----------
|
| 321 |
-
def gemini_generate_content(model, prompt_text, file_upload_obj=None):
|
| 322 |
"""
|
| 323 |
-
|
| 324 |
-
Returns the textual response (str).
|
| 325 |
"""
|
| 326 |
-
inputs = [prompt_text]
|
| 327 |
-
if file_upload_obj:
|
| 328 |
-
inputs.append(file_upload_obj)
|
| 329 |
-
response = model.generate_content(inputs)
|
| 330 |
-
# Response handling as in original script
|
| 331 |
-
raw_text = getattr(response, "text", None)
|
| 332 |
-
if not raw_text and getattr(response, "candidates", None):
|
| 333 |
-
# new-style candidate chain
|
| 334 |
-
raw_text = response.candidates[0].content.parts[0].text
|
| 335 |
-
if not raw_text:
|
| 336 |
-
# attempt to stringify response
|
| 337 |
-
raw_text = str(response)
|
| 338 |
-
return raw_text
|
| 339 |
-
|
| 340 |
-
# ---------- Imprinting and mapping helpers remain unchanged ----------
|
| 341 |
-
def ask_gemini_for_mapping_for_page(model, image_path, grading_json, rows=GRID_ROWS, cols=GRID_COLS):
|
| 342 |
prompt = f"""
|
| 343 |
You are an exam marker. Your role is to identify where each question begins on the page.
|
| 344 |
The page is divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label (1..{rows*cols}).
|
|
@@ -360,16 +297,13 @@ Grading JSON:
|
|
| 360 |
raw_text = getattr(response, "text", None)
|
| 361 |
if not raw_text and getattr(response, "candidates", None):
|
| 362 |
raw_text = response.candidates[0].content.parts[0].text
|
| 363 |
-
|
| 364 |
-
print("π Gemini mapping raw output (page):")
|
| 365 |
-
print(raw_text[:1000] + ("..." if len(raw_text) > 1000 else ""))
|
| 366 |
try:
|
| 367 |
start = raw_text.index('[')
|
| 368 |
end = raw_text.rindex(']') + 1
|
| 369 |
json_part = raw_text[start:end]
|
| 370 |
mapping = json.loads(json_part)
|
| 371 |
return mapping
|
| 372 |
-
except Exception
|
| 373 |
match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
|
| 374 |
if match:
|
| 375 |
try:
|
|
@@ -380,11 +314,15 @@ Grading JSON:
|
|
| 380 |
return []
|
| 381 |
|
| 382 |
def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=GRID_ROWS, cols=GRID_COLS):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 383 |
pages = convert_from_path(pdf_path, dpi=200)
|
| 384 |
annotated_page_paths = []
|
| 385 |
-
print(f"π Converted answer PDF to {len(pages)} page image(s) for imprinting.")
|
| 386 |
-
|
| 387 |
temp_grid_images = []
|
|
|
|
|
|
|
| 388 |
for p_index, page in enumerate(pages):
|
| 389 |
img = page.convert("RGB")
|
| 390 |
w, h = img.size
|
|
@@ -412,19 +350,30 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 412 |
img.save(temp_path, "PNG")
|
| 413 |
temp_grid_images.append(temp_path)
|
| 414 |
|
| 415 |
-
|
| 416 |
-
|
| 417 |
-
|
| 418 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 419 |
|
| 420 |
-
|
|
|
|
|
|
|
| 421 |
img_cv = np.array(page_img)
|
| 422 |
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
|
| 423 |
h, w, _ = img_cv.shape
|
| 424 |
cell_w_px, cell_h_px = w / cols, h / rows
|
| 425 |
|
|
|
|
| 426 |
occupied = set()
|
| 427 |
-
|
| 428 |
for item in mapping:
|
| 429 |
qid = item.get("question")
|
| 430 |
cell_number = item.get("cell_number")
|
|
@@ -434,14 +383,13 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 434 |
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == qid), [])
|
| 435 |
if not marks_list:
|
| 436 |
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", [])
|
| 437 |
-
if g["question"].lower() == qid.lower()), [])
|
| 438 |
|
| 439 |
marks_text = ",".join(marks_list) if marks_list else "?"
|
| 440 |
|
| 441 |
row = (cell_number - 1) // cols
|
| 442 |
col = (cell_number - 1) % cols
|
| 443 |
|
| 444 |
-
placed = False
|
| 445 |
candidates = []
|
| 446 |
if col + 1 < cols:
|
| 447 |
candidates.append((row, col + 1))
|
|
@@ -456,7 +404,6 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 456 |
chosen = (r_c, c_c)
|
| 457 |
occupied.add(cell_id)
|
| 458 |
break
|
| 459 |
-
|
| 460 |
if chosen is None:
|
| 461 |
chosen = (row, col)
|
| 462 |
|
|
@@ -464,9 +411,6 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 464 |
x_c = int((c_c + 1) * cell_w_px - cell_w_px * 0.1)
|
| 465 |
y_c = int((r_c + 0.5) * cell_h_px)
|
| 466 |
|
| 467 |
-
print(f"Page {p_index+1} | Question {qid} -> mapped cell {cell_number} -> chosen cell ({r_c},{c_c})"
|
| 468 |
-
f" -> pixel coords ({x_c},{y_c}) | marks: {marks_text}")
|
| 469 |
-
|
| 470 |
font_scale = max(0.6, min(1.6, cell_h_px / 60.0))
|
| 471 |
thickness = max(1, int(font_scale * 2))
|
| 472 |
cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX,
|
|
@@ -475,192 +419,108 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 475 |
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 476 |
cv2.imwrite(annotated_path, img_cv)
|
| 477 |
annotated_page_paths.append(annotated_path)
|
| 478 |
-
print(f"π Annotated page saved: {annotated_path}")
|
| 479 |
|
| 480 |
with open(output_pdf, "wb") as f:
|
| 481 |
f.write(img2pdf.convert(annotated_page_paths))
|
| 482 |
|
| 483 |
-
print(f"π Imprinted PDF saved to: {output_pdf}")
|
| 484 |
compressed = compress_pdf(output_pdf)
|
| 485 |
-
if compressed != output_pdf:
|
| 486 |
-
print(f"π¦ Imprinted PDF compressed: {compressed}")
|
| 487 |
return compressed
|
| 488 |
|
| 489 |
-
# ----------
|
| 490 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
| 491 |
"""
|
| 492 |
-
|
| 493 |
-
1) compress
|
| 494 |
2) merge QP + MS -> merged_qpms.pdf
|
| 495 |
-
3) upload merged_qpms
|
| 496 |
-
4)
|
| 497 |
-
|
| 498 |
-
|
| 499 |
-
|
| 500 |
-
|
| 501 |
-
|
| 502 |
-
8) extract marks and optionally imprint
|
| 503 |
"""
|
| 504 |
try:
|
|
|
|
| 505 |
qp_path = compress_pdf(qp_path)
|
| 506 |
ms_path = compress_pdf(ms_path)
|
| 507 |
ans_path = compress_pdf(ans_path)
|
| 508 |
|
| 509 |
-
# Merge QP + MS
|
| 510 |
merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
|
| 511 |
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
| 512 |
-
print(f"π Merged QP + MS -> {merged_qpms_path}")
|
| 513 |
|
| 514 |
-
# Upload files
|
| 515 |
-
print("πΌ Uploading files to Gemini...")
|
| 516 |
merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
|
| 517 |
ans_uploaded = genai.upload_file(path=ans_path, display_name="Answer Sheet")
|
| 518 |
|
| 519 |
model = create_model()
|
| 520 |
|
| 521 |
-
#
|
| 522 |
-
qpms_prompt = PROMPTS["
|
| 523 |
-
|
| 524 |
-
|
| 525 |
-
# Send both requests in parallel
|
| 526 |
-
print("π‘ Sending transcription requests (QP+MS & AS) in parallel...")
|
| 527 |
-
transcripts = {}
|
| 528 |
-
with ThreadPoolExecutor(max_workers=2) as ex:
|
| 529 |
-
futures = {
|
| 530 |
-
ex.submit(gemini_generate_content, model, qpms_prompt, merged_uploaded): "qpms",
|
| 531 |
-
ex.submit(gemini_generate_content, model, as_prompt, ans_uploaded): "as"
|
| 532 |
-
}
|
| 533 |
-
for fut in as_completed(futures):
|
| 534 |
-
key = futures[fut]
|
| 535 |
-
try:
|
| 536 |
-
res_text = fut.result()
|
| 537 |
-
except Exception as e:
|
| 538 |
-
res_text = f"β Error during transcription: {e}"
|
| 539 |
-
transcripts[key] = res_text
|
| 540 |
-
print(f"β
Transcription complete for: {key} (chars: {len(res_text)})")
|
| 541 |
-
|
| 542 |
-
qpms_text = transcripts.get("qpms", "")
|
| 543 |
-
as_text = transcripts.get("as", "")
|
| 544 |
-
|
| 545 |
-
# Debug: save transcripts for review
|
| 546 |
with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f:
|
| 547 |
f.write(qpms_text)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 548 |
with open("debug_as_transcript.txt", "w", encoding="utf-8") as f:
|
| 549 |
f.write(as_text)
|
| 550 |
|
| 551 |
-
#
|
| 552 |
-
|
| 553 |
-
|
| 554 |
-
|
| 555 |
-
|
| 556 |
-
|
| 557 |
-
|
| 558 |
-
|
| 559 |
-
|
| 560 |
-
|
| 561 |
-
|
| 562 |
-
|
| 563 |
-
|
| 564 |
-
|
| 565 |
-
|
| 566 |
-
return s
|
| 567 |
-
|
| 568 |
-
answers_map = {}
|
| 569 |
-
for a in as_answers:
|
| 570 |
-
nid = normalize_id(a.get("id", ""))
|
| 571 |
-
if nid == "":
|
| 572 |
-
# if empty id, try to infer using INFERRED: or use a running fallback index
|
| 573 |
-
nid = a.get("id", "")
|
| 574 |
-
# store first matching block (if multiple blocks for same id, append)
|
| 575 |
-
if nid in answers_map:
|
| 576 |
-
answers_map[nid] += "\n\n" + a.get("as", "")
|
| 577 |
-
else:
|
| 578 |
-
answers_map[nid] = a.get("as", "")
|
| 579 |
-
|
| 580 |
-
aligned_questions = []
|
| 581 |
-
for q in qpms_questions:
|
| 582 |
-
qid = q.get("id", "")
|
| 583 |
-
nid = normalize_id(qid)
|
| 584 |
-
# try direct id match
|
| 585 |
-
student_ans = answers_map.get(nid)
|
| 586 |
-
# try alternative matches (e.g., '1a' vs '1(a)')
|
| 587 |
-
if student_ans is None:
|
| 588 |
-
for k in answers_map:
|
| 589 |
-
if k.startswith(nid) or nid.startswith(k) or (nid and nid.replace(" ", "") in k):
|
| 590 |
-
student_ans = answers_map[k]
|
| 591 |
-
break
|
| 592 |
-
# fallback: look for first answer that contains the question id as text (loose)
|
| 593 |
-
if student_ans is None:
|
| 594 |
-
for k, v in answers_map.items():
|
| 595 |
-
if qid and qid.lower() in k:
|
| 596 |
-
student_ans = v
|
| 597 |
-
break
|
| 598 |
-
|
| 599 |
-
aligned_questions.append({
|
| 600 |
-
"id": qid,
|
| 601 |
-
"qp": q.get("qp", ""),
|
| 602 |
-
"total_marks": q.get("total_marks"),
|
| 603 |
-
"ms": q.get("ms", ""), # verbatim markscheme block
|
| 604 |
-
"as": student_ans if student_ans is not None else ""
|
| 605 |
-
})
|
| 606 |
-
|
| 607 |
-
# If any answer blocks left unmatched, optionally append them as INFERRED entries
|
| 608 |
-
matched_ids = set([normalize_id(q["id"]) for q in aligned_questions])
|
| 609 |
-
for k, v in answers_map.items():
|
| 610 |
-
if k not in matched_ids:
|
| 611 |
-
aligned_questions.append({
|
| 612 |
-
"id": k,
|
| 613 |
-
"qp": "",
|
| 614 |
-
"total_marks": None,
|
| 615 |
-
"ms": "",
|
| 616 |
-
"as": v
|
| 617 |
-
})
|
| 618 |
-
|
| 619 |
-
# Build alignment JSON text to send to grading model
|
| 620 |
-
alignment_payload = {"questions": aligned_questions}
|
| 621 |
-
alignment_json_text = json.dumps(alignment_payload, indent=2, ensure_ascii=False)
|
| 622 |
-
print("π¦ Built alignment JSON (truncated):")
|
| 623 |
-
print(alignment_json_text[:1000] + ("..." if len(alignment_json_text) > 1000 else ""))
|
| 624 |
-
|
| 625 |
-
# Step: grading
|
| 626 |
-
print("2οΈβ£ Sending grading prompt to Gemini...")
|
| 627 |
-
# We send both the system grading prompt and the alignment JSON as content
|
| 628 |
-
response = model.generate_content([PROMPTS["GRADING_PROMPT"]["content"], alignment_json_text])
|
| 629 |
grading_text = getattr(response, "text", None)
|
| 630 |
if not grading_text and getattr(response, "candidates", None):
|
| 631 |
grading_text = response.candidates[0].content.parts[0].text
|
| 632 |
if not grading_text:
|
| 633 |
raise RuntimeError("No grading output returned from Gemini.")
|
| 634 |
|
| 635 |
-
print("β
Grading Markdown received (truncated preview):")
|
| 636 |
-
print((grading_text[:1000] + '...') if len(grading_text) > 1000 else grading_text)
|
| 637 |
-
|
| 638 |
# Save grading PDF
|
| 639 |
base_name = os.path.splitext(os.path.basename(ans_path))[0]
|
| 640 |
grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
|
| 641 |
-
print(f"π Grading PDF saved: {grading_pdf_path}")
|
| 642 |
|
| 643 |
-
# Extract marks for imprinting
|
| 644 |
grading_json = extract_marks_from_grading(grading_text)
|
| 645 |
-
|
| 646 |
-
|
| 647 |
|
| 648 |
imprinted_pdf_path = None
|
| 649 |
if imprint:
|
| 650 |
-
|
| 651 |
imprinted_pdf_path = f"{base_name}_imprinted.pdf"
|
| 652 |
imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, model)
|
| 653 |
-
print(f"β
Imprinting finished. Imprinted PDF at: {imprinted_pdf_path}")
|
| 654 |
|
| 655 |
-
return
|
| 656 |
|
| 657 |
except Exception as e:
|
| 658 |
-
|
| 659 |
-
return f"β Error: {e}", None, None, None
|
| 660 |
|
| 661 |
-
# ---------------- GRADIO UI
|
| 662 |
-
with gr.Blocks(title="LeadIB AI Grading (
|
| 663 |
-
gr.Markdown("## π LeadIB AI Grading β
|
| 664 |
|
| 665 |
with gr.Row():
|
| 666 |
qp_file = gr.File(label="π Upload Question Paper (PDF)")
|
|
@@ -668,12 +528,13 @@ with gr.Blocks(title="LeadIB AI Grading (New Flow: Parallel Transcription + Alig
|
|
| 668 |
ans_file = gr.File(label="π Upload Student Answer Sheet (PDF)")
|
| 669 |
|
| 670 |
imprint_toggle = gr.Checkbox(label="β Imprint Marks on Student Answer Sheet", value=False)
|
| 671 |
-
run_button = gr.Button("π Run
|
| 672 |
|
| 673 |
with gr.Row():
|
| 674 |
-
|
| 675 |
-
|
| 676 |
|
|
|
|
| 677 |
grading_pdf_file = gr.File(label="π₯ Download Grading PDF")
|
| 678 |
imprint_pdf_file = gr.File(label="π₯ Download Imprinted PDF (Optional)")
|
| 679 |
|
|
@@ -682,17 +543,16 @@ with gr.Blocks(title="LeadIB AI Grading (New Flow: Parallel Transcription + Alig
|
|
| 682 |
ms_path = ms_file_obj.name
|
| 683 |
ans_path = ans_file_obj.name
|
| 684 |
|
| 685 |
-
|
| 686 |
qp_path, ms_path, ans_path, imprint=imprint_flag
|
| 687 |
)
|
| 688 |
|
| 689 |
-
|
| 690 |
-
return alignment_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 691 |
|
| 692 |
run_button.click(
|
| 693 |
fn=run_pipeline,
|
| 694 |
inputs=[qp_file, ms_file, ans_file, imprint_toggle],
|
| 695 |
-
outputs=[
|
| 696 |
)
|
| 697 |
|
| 698 |
if __name__ == "__main__":
|
|
|
|
| 15 |
from concurrent.futures import ThreadPoolExecutor, as_completed
|
| 16 |
from PyPDF2 import PdfReader, PdfWriter
|
| 17 |
|
| 18 |
+
# ---------------- CONFIG ----------------
|
| 19 |
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
|
| 20 |
GRID_ROWS, GRID_COLS = 20, 14
|
| 21 |
|
| 22 |
+
# ---------------- PROMPTS ----------------
|
| 23 |
PROMPTS = {
|
| 24 |
+
"QP_MS_TRANSCRIPTION": {
|
| 25 |
"role": "system",
|
| 26 |
"content": """You are a high-quality OCR/Transcription assistant.
|
| 27 |
|
| 28 |
+
INPUT: This file is a PDF that first contains the Question Paper and immediately after it the Markscheme.
|
| 29 |
+
TASK: Produce an exact transcription in plain text with clear separators.
|
|
|
|
|
|
|
|
|
|
| 30 |
|
| 31 |
+
total marks of paper
|
| 32 |
+
question
|
| 33 |
+
total marks of that question
|
| 34 |
+
.
|
| 35 |
+
(continue this for all question )
|
| 36 |
+
mark scheme ( exact for each question)
|
| 37 |
|
| 38 |
+
M :Marks awarded for attempting to use a correct Method.
|
| 39 |
+
A : Marks awarded for an Answer or for Accuracy; often dependent on preceding M marks.
|
| 40 |
+
R :Marks awarded for clear Reasoning.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 41 |
|
| 42 |
+
REPRESENT THESE ABOVE NOTATION IN MS CLEARLY , EG : M1 , A1 ,M2 ...
|
| 43 |
+
( ignore THESE N1 , N2 , N3 )
|
|
|
|
| 44 |
|
| 45 |
+
-----------------------
|
| 46 |
+
OUTPUT FORMAT (use this exact block-style for each question; preserve formatting exactly):
|
| 47 |
+
Paper Total Marks: <number>
|
| 48 |
|
| 49 |
+
Question <id>
|
| 50 |
+
Total Marks: <number>
|
| 51 |
+
QP:
|
| 52 |
+
<transcribed question text>
|
| 53 |
+
|
| 54 |
+
MS:
|
| 55 |
+
<exact verbatim markscheme lines for this question>
|
| 56 |
+
|
| 57 |
+
--QUESTION-END--
|
| 58 |
+
(repeat for all questions in order)
|
| 59 |
"""
|
| 60 |
},
|
| 61 |
+
# AS_TRANSCRIPTION will be dynamically constructed (in code) after extracting IDs from QP+MS result
|
| 62 |
"GRADING_PROMPT": {
|
| 63 |
"role": "system",
|
| 64 |
"content": """Developer: You are an official examiner. Apply the following grading rules precisely.
|
|
|
|
| 80 |
7. Any lost mark: use red `<span style="color:red">M0</span>` and make Reason red.
|
| 81 |
---
|
| 82 |
## Output Format
|
| 83 |
+
Produce two sections per question/sub-question, following this structure:
|
| 84 |
+
|
| 85 |
+
## Question <id>
|
| 86 |
### Markscheme vs Student Answer
|
| 87 |
| Mark ID | Markscheme Expectation | Studentβs Response | Awarded |
|
| 88 |
|---------|------------------------|--------------------|---------|
|
| 89 |
| M1_1 | Recognise GP | "r=0.9" | M1 |
|
| 90 |
+
β‘οΈ **Total: X/Y**
|
| 91 |
+
|
| 92 |
---
|
| 93 |
+
|
| 94 |
### Examinerβs Report
|
| 95 |
At the very end, provide a summary table:
|
| 96 |
| Question Number | Marks | Remark |
|
| 97 |
|-----------------|-------|--------|
|
| 98 |
+
| 1 | X/Y | <remark> |
|
| 99 |
+
|
| 100 |
+
Then show total clearly as a final line:
|
| 101 |
+
`Total: <obtained_marks>/<max_marks>`
|
| 102 |
|
| 103 |
+
NOTES:
|
| 104 |
+
- The assistant will receive two transcripts (QP+MS transcription & AS transcription) in plain text. Use the QP+MS transcript as the authoritative source of question wording, total marks, and verbatim markscheme entries (M/A/R mark IDs).
|
| 105 |
+
- Match student answers to question IDs and grade according to the provided verbatim markscheme.
|
| 106 |
+
- Produce full markdown as above. Ensure mark IDs used in the grading are present and consistent with the markscheme.
|
| 107 |
"""
|
| 108 |
}
|
| 109 |
}
|
| 110 |
|
| 111 |
+
# ---------------- HELPERS ----------------
|
| 112 |
def save_as_pdf(text, filename="output.pdf"):
|
| 113 |
pdf = MarkdownPdf()
|
| 114 |
pdf.add_section(Section(text, toc=False))
|
|
|
|
| 126 |
return input_path
|
| 127 |
|
| 128 |
if size <= max_size:
|
|
|
|
| 129 |
return input_path
|
| 130 |
|
|
|
|
| 131 |
try:
|
| 132 |
gs_cmd = [
|
| 133 |
"gs", "-sDEVICE=pdfwrite",
|
|
|
|
| 138 |
]
|
| 139 |
subprocess.run(gs_cmd, check=True)
|
| 140 |
new_size = os.path.getsize(output_path)
|
|
|
|
| 141 |
if new_size <= max_size:
|
| 142 |
return output_path
|
| 143 |
else:
|
|
|
|
| 144 |
return input_path
|
| 145 |
+
except Exception:
|
|
|
|
| 146 |
return input_path
|
| 147 |
|
| 148 |
def create_model():
|
| 149 |
try:
|
|
|
|
| 150 |
return genai.GenerativeModel("gemini-2.5-pro", generation_config={"temperature": 0})
|
| 151 |
except Exception:
|
|
|
|
| 152 |
return genai.GenerativeModel("gemini-2.5-flash", generation_config={"temperature": 0})
|
| 153 |
|
| 154 |
+
def merge_pdfs(paths, output_path):
|
| 155 |
+
writer = PdfWriter()
|
| 156 |
+
for p in paths:
|
| 157 |
+
reader = PdfReader(p)
|
| 158 |
+
for page in reader.pages:
|
| 159 |
+
writer.add_page(page)
|
| 160 |
+
with open(output_path, "wb") as f:
|
| 161 |
+
writer.write(f)
|
| 162 |
+
return output_path
|
| 163 |
+
|
| 164 |
+
def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=None):
|
| 165 |
+
"""
|
| 166 |
+
Send prompt_text and optionally an uploaded file (or an image object) to the model.
|
| 167 |
+
Returns textual response.
|
| 168 |
+
"""
|
| 169 |
+
inputs = [prompt_text]
|
| 170 |
+
if file_upload_obj:
|
| 171 |
+
inputs.append(file_upload_obj)
|
| 172 |
+
if image_obj:
|
| 173 |
+
inputs.append(image_obj)
|
| 174 |
+
response = model.generate_content(inputs)
|
| 175 |
+
raw_text = getattr(response, "text", None)
|
| 176 |
+
if not raw_text and getattr(response, "candidates", None):
|
| 177 |
+
raw_text = response.candidates[0].content.parts[0].text
|
| 178 |
+
if not raw_text:
|
| 179 |
+
raw_text = str(response)
|
| 180 |
+
return raw_text
|
| 181 |
+
|
| 182 |
+
# ---------------- PARSERS ----------------
|
| 183 |
+
def extract_question_ids_from_qpms(text):
|
| 184 |
+
"""
|
| 185 |
+
Extract question IDs from QP+MS transcript output.
|
| 186 |
+
We expect QP_MS_TRANSCRIPTION to contain lines like: "Question <id>"
|
| 187 |
+
Return a list of unique IDs in order of appearance.
|
| 188 |
+
"""
|
| 189 |
+
ids = []
|
| 190 |
+
# Primary: lines starting with 'Question <id>'
|
| 191 |
+
for m in re.finditer(r"(?im)^\s*Question\s+([0-9]+(?:[.\-a-zA-Z()]+(?:\.[a-zA-Z0-9()]+)*)?)\b", text):
|
| 192 |
+
qid = m.group(1).strip()
|
| 193 |
+
if qid not in ids:
|
| 194 |
+
ids.append(qid)
|
| 195 |
+
# Secondary: if none found, look for explicit markers like "Question <id>" with colon/line
|
| 196 |
+
if not ids:
|
| 197 |
+
for m in re.finditer(r"(?im)Question\s*[:\-]?\s*([0-9]+(?:[.\-a-zA-Z()]+)*)", text):
|
| 198 |
+
qid = m.group(1).strip()
|
| 199 |
+
if qid not in ids:
|
| 200 |
+
ids.append(qid)
|
| 201 |
+
# Tertiary fallback: scan for typical serial patterns in the document
|
| 202 |
+
if not ids:
|
| 203 |
+
# match patterns like 1, 1.a, 3.a.i, 2(b), etc., where they appear at line starts
|
| 204 |
+
for m in re.finditer(r"(?m)^\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\s*[\.\):\-]", text):
|
| 205 |
+
qid = m.group(1).strip()
|
| 206 |
+
if qid not in ids:
|
| 207 |
+
ids.append(qid)
|
| 208 |
+
return ids
|
| 209 |
+
|
| 210 |
+
def build_as_prompt_with_expected_ids(expected_ids):
|
| 211 |
+
"""
|
| 212 |
+
Construct the AS transcription prompt injecting the expected IDs block (as {regex} slot).
|
| 213 |
+
The expected_ids is a list; we format them per user instruction inside braces.
|
| 214 |
+
"""
|
| 215 |
+
if not expected_ids:
|
| 216 |
+
ids_block = "{NA}"
|
| 217 |
+
else:
|
| 218 |
+
# Format exactly as user provided: curly brace block with each ID on its own line
|
| 219 |
+
ids_block = "{\n" + "\n".join(expected_ids) + "\n}"
|
| 220 |
+
prompt = f"""You are a high-quality handwritten transcription assistant.
|
| 221 |
+
|
| 222 |
+
INPUT: This PDF contains a student's handwritten answer sheet.
|
| 223 |
+
TASK: Transcribe the student's answers exactly (as text). Preserve step order and line breaks. Attempt to assign each answer to a question ID if the student has labelled it (e.g., "1", "1a", "2(b)", "3"). If the student hasn't labelled answers, segment contiguous answer blocks and attempt to infer question IDs from context β but mark inferred IDs clearly as "INFERRED: <id>"
|
| 224 |
+
|
| 225 |
+
Enclose all mathematical expressions in Markdown fenced code blocks (``` triple backticks).
|
| 226 |
+
|
| 227 |
+
If a diagram/graph is omitted, write [Graph omitted].
|
| 228 |
+
Unreadable parts: [illegible].
|
| 229 |
+
Unanswered: [No response].
|
| 230 |
+
|
| 231 |
+
Do NOT recreate diagrams.
|
| 232 |
+
|
| 233 |
+
Ensure consistency and determinism in formatting so subsequent models can grade directly from this aligned format.
|
| 234 |
+
|
| 235 |
+
Expected questions (if missing, write NA):
|
| 236 |
+
{ids_block}
|
| 237 |
+
-----------------------
|
| 238 |
+
OUTPUT FORMAT:
|
| 239 |
+
Question <id>
|
| 240 |
+
AS:
|
| 241 |
+
<transcribed answer or placeholder>
|
| 242 |
+
"""
|
| 243 |
+
return prompt
|
| 244 |
+
|
| 245 |
def extract_marks_from_grading(grading_text):
|
| 246 |
+
"""
|
| 247 |
+
Parse the grading markdown produced by the GRADING_PROMPT and extract marks per question.
|
| 248 |
+
Returns dict: {"grading": [{"question": "1.a", "marks_awarded": ["M1","A1"]}, ...]}
|
| 249 |
+
"""
|
| 250 |
grading_json = {"grading": []}
|
| 251 |
+
|
| 252 |
+
# Split by question sections using "## Question" header
|
| 253 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 254 |
for block in question_blocks[1:]:
|
| 255 |
first_line = block.strip().splitlines()[0].strip()
|
|
|
|
| 271 |
})
|
| 272 |
return grading_json
|
| 273 |
|
| 274 |
+
# ---------------- MAPPING/IMPRINT HELPERS ----------------
|
| 275 |
+
def ask_gemini_for_mapping_for_page(model, image_path, grading_json, rows=GRID_ROWS, cols=GRID_COLS):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 276 |
"""
|
| 277 |
+
Send a single page image along with the grading_json; LLM should return JSON mapping.
|
|
|
|
| 278 |
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 279 |
prompt = f"""
|
| 280 |
You are an exam marker. Your role is to identify where each question begins on the page.
|
| 281 |
The page is divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label (1..{rows*cols}).
|
|
|
|
| 297 |
raw_text = getattr(response, "text", None)
|
| 298 |
if not raw_text and getattr(response, "candidates", None):
|
| 299 |
raw_text = response.candidates[0].content.parts[0].text
|
|
|
|
|
|
|
|
|
|
| 300 |
try:
|
| 301 |
start = raw_text.index('[')
|
| 302 |
end = raw_text.rindex(']') + 1
|
| 303 |
json_part = raw_text[start:end]
|
| 304 |
mapping = json.loads(json_part)
|
| 305 |
return mapping
|
| 306 |
+
except Exception:
|
| 307 |
match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
|
| 308 |
if match:
|
| 309 |
try:
|
|
|
|
| 314 |
return []
|
| 315 |
|
| 316 |
def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=GRID_ROWS, cols=GRID_COLS):
|
| 317 |
+
"""
|
| 318 |
+
Convert PDF to images, create grid-numbered images for sending to Gemini,
|
| 319 |
+
send all page images in parallel to Gemini for mapping, then annotate and produce imprinted PDF.
|
| 320 |
+
"""
|
| 321 |
pages = convert_from_path(pdf_path, dpi=200)
|
| 322 |
annotated_page_paths = []
|
|
|
|
|
|
|
| 323 |
temp_grid_images = []
|
| 324 |
+
|
| 325 |
+
# Create grid-numbered images for mapping prompt
|
| 326 |
for p_index, page in enumerate(pages):
|
| 327 |
img = page.convert("RGB")
|
| 328 |
w, h = img.size
|
|
|
|
| 350 |
img.save(temp_path, "PNG")
|
| 351 |
temp_grid_images.append(temp_path)
|
| 352 |
|
| 353 |
+
# Send all grid images in parallel to Gemini to get mappings
|
| 354 |
+
mappings_per_page = {}
|
| 355 |
+
model_local = model # for thread scope
|
| 356 |
+
with ThreadPoolExecutor(max_workers=min(8, len(temp_grid_images))) as ex:
|
| 357 |
+
futures = {ex.submit(ask_gemini_for_mapping_for_page, model_local, img_path, grading_json, rows, cols): idx
|
| 358 |
+
for idx, img_path in enumerate(temp_grid_images)}
|
| 359 |
+
for fut in as_completed(futures):
|
| 360 |
+
idx = futures[fut]
|
| 361 |
+
try:
|
| 362 |
+
mapping = fut.result()
|
| 363 |
+
except Exception:
|
| 364 |
+
mapping = []
|
| 365 |
+
mappings_per_page[idx] = mapping
|
| 366 |
|
| 367 |
+
# Annotate original pages according to returned mappings
|
| 368 |
+
for p_index, page in enumerate(pages):
|
| 369 |
+
page_img = page.convert("RGB")
|
| 370 |
img_cv = np.array(page_img)
|
| 371 |
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
|
| 372 |
h, w, _ = img_cv.shape
|
| 373 |
cell_w_px, cell_h_px = w / cols, h / rows
|
| 374 |
|
| 375 |
+
mapping = mappings_per_page.get(p_index, [])
|
| 376 |
occupied = set()
|
|
|
|
| 377 |
for item in mapping:
|
| 378 |
qid = item.get("question")
|
| 379 |
cell_number = item.get("cell_number")
|
|
|
|
| 383 |
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == qid), [])
|
| 384 |
if not marks_list:
|
| 385 |
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", [])
|
| 386 |
+
if g["question"].lower() == (qid or "").lower()), [])
|
| 387 |
|
| 388 |
marks_text = ",".join(marks_list) if marks_list else "?"
|
| 389 |
|
| 390 |
row = (cell_number - 1) // cols
|
| 391 |
col = (cell_number - 1) % cols
|
| 392 |
|
|
|
|
| 393 |
candidates = []
|
| 394 |
if col + 1 < cols:
|
| 395 |
candidates.append((row, col + 1))
|
|
|
|
| 404 |
chosen = (r_c, c_c)
|
| 405 |
occupied.add(cell_id)
|
| 406 |
break
|
|
|
|
| 407 |
if chosen is None:
|
| 408 |
chosen = (row, col)
|
| 409 |
|
|
|
|
| 411 |
x_c = int((c_c + 1) * cell_w_px - cell_w_px * 0.1)
|
| 412 |
y_c = int((r_c + 0.5) * cell_h_px)
|
| 413 |
|
|
|
|
|
|
|
|
|
|
| 414 |
font_scale = max(0.6, min(1.6, cell_h_px / 60.0))
|
| 415 |
thickness = max(1, int(font_scale * 2))
|
| 416 |
cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX,
|
|
|
|
| 419 |
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 420 |
cv2.imwrite(annotated_path, img_cv)
|
| 421 |
annotated_page_paths.append(annotated_path)
|
|
|
|
| 422 |
|
| 423 |
with open(output_pdf, "wb") as f:
|
| 424 |
f.write(img2pdf.convert(annotated_page_paths))
|
| 425 |
|
|
|
|
| 426 |
compressed = compress_pdf(output_pdf)
|
|
|
|
|
|
|
| 427 |
return compressed
|
| 428 |
|
| 429 |
+
# ---------------- MAIN PIPELINE ----------------
|
| 430 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
| 431 |
"""
|
| 432 |
+
Flow:
|
| 433 |
+
1) compress files if needed
|
| 434 |
2) merge QP + MS -> merged_qpms.pdf
|
| 435 |
+
3) upload merged_qpms to Gemini, request transcription (QP+MS)
|
| 436 |
+
4) extract question IDs via regex from QP+MS result
|
| 437 |
+
5) build AS transcription prompt injecting expected IDs block
|
| 438 |
+
6) send AS transcription request (using injected expected IDs)
|
| 439 |
+
7) send both transcripts to grading prompt -> get grading markdown
|
| 440 |
+
8) extract marks for imprinting
|
| 441 |
+
9) optional imprint: convert pages, send page images in parallel to LLM for mapping, annotate and produce imprinted PDF
|
|
|
|
| 442 |
"""
|
| 443 |
try:
|
| 444 |
+
# Step 0: compress
|
| 445 |
qp_path = compress_pdf(qp_path)
|
| 446 |
ms_path = compress_pdf(ms_path)
|
| 447 |
ans_path = compress_pdf(ans_path)
|
| 448 |
|
| 449 |
+
# Merge QP + MS
|
| 450 |
merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
|
| 451 |
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
|
|
|
| 452 |
|
| 453 |
+
# Upload files
|
|
|
|
| 454 |
merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
|
| 455 |
ans_uploaded = genai.upload_file(path=ans_path, display_name="Answer Sheet")
|
| 456 |
|
| 457 |
model = create_model()
|
| 458 |
|
| 459 |
+
# Step 1: QP+MS transcription (first)
|
| 460 |
+
qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
|
| 461 |
+
qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
|
| 462 |
+
# save debug
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 463 |
with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f:
|
| 464 |
f.write(qpms_text)
|
| 465 |
+
|
| 466 |
+
# Step 2: extract serial numbers (question IDs) using regex from qpms_text
|
| 467 |
+
extracted_ids = extract_question_ids_from_qpms(qpms_text)
|
| 468 |
+
# if empty, we still provide a default list placeholder so AS model writes NA for missing ones
|
| 469 |
+
if not extracted_ids:
|
| 470 |
+
extracted_ids = ["NA"]
|
| 471 |
+
|
| 472 |
+
# Step 3: Build AS prompt injecting extracted IDs
|
| 473 |
+
as_prompt = build_as_prompt_with_expected_ids(extracted_ids)
|
| 474 |
+
|
| 475 |
+
# Step 4: AS transcription (after injecting IDs)
|
| 476 |
+
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
|
| 477 |
with open("debug_as_transcript.txt", "w", encoding="utf-8") as f:
|
| 478 |
f.write(as_text)
|
| 479 |
|
| 480 |
+
# Step 5: Grading - send both transcripts to grading model
|
| 481 |
+
# Build payload by concatenating transcripts with clear separators
|
| 482 |
+
grading_input = (
|
| 483 |
+
"=== QP+MS TRANSCRIPT BEGIN ===\n"
|
| 484 |
+
+ qpms_text
|
| 485 |
+
+ "\n=== QP+MS TRANSCRIPT END ===\n\n"
|
| 486 |
+
+ "=== ANSWER SHEET TRANSCRIPT BEGIN ===\n"
|
| 487 |
+
+ as_text
|
| 488 |
+
+ "\n=== ANSWER SHEET TRANSCRIPT END ===\n"
|
| 489 |
+
)
|
| 490 |
+
grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
|
| 491 |
+
grading_text = gemini_generate_content(model, grading_prompt_system, file_upload_obj=None, image_obj=None)
|
| 492 |
+
# The above call returns the system-only content if used incorrectly; instead we must pass both system prompt and content to generate_content
|
| 493 |
+
# Re-call properly:
|
| 494 |
+
response = model.generate_content([grading_prompt_system, grading_input])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 495 |
grading_text = getattr(response, "text", None)
|
| 496 |
if not grading_text and getattr(response, "candidates", None):
|
| 497 |
grading_text = response.candidates[0].content.parts[0].text
|
| 498 |
if not grading_text:
|
| 499 |
raise RuntimeError("No grading output returned from Gemini.")
|
| 500 |
|
|
|
|
|
|
|
|
|
|
| 501 |
# Save grading PDF
|
| 502 |
base_name = os.path.splitext(os.path.basename(ans_path))[0]
|
| 503 |
grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
|
|
|
|
| 504 |
|
| 505 |
+
# Step 6: Extract marks for imprinting
|
| 506 |
grading_json = extract_marks_from_grading(grading_text)
|
| 507 |
+
with open("debug_grading_json.json", "w", encoding="utf-8") as f:
|
| 508 |
+
json.dump(grading_json, f, indent=2, ensure_ascii=False)
|
| 509 |
|
| 510 |
imprinted_pdf_path = None
|
| 511 |
if imprint:
|
| 512 |
+
# Step 7: Imprinting - send all page images in parallel to LLM for mapping and annotate
|
| 513 |
imprinted_pdf_path = f"{base_name}_imprinted.pdf"
|
| 514 |
imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, model)
|
|
|
|
| 515 |
|
| 516 |
+
return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 517 |
|
| 518 |
except Exception as e:
|
| 519 |
+
return f"β Error: {e}", None, None, None, None
|
|
|
|
| 520 |
|
| 521 |
+
# ---------------- GRADIO UI ----------------
|
| 522 |
+
with gr.Blocks(title="LeadIB AI Grading (Updated Flow: QP+MS -> IDs -> AS -> Grade -> Imprint)") as demo:
|
| 523 |
+
gr.Markdown("## π LeadIB AI Grading β Final Flow\nUpload **Question Paper**, **Markscheme**, and **Student Answer Sheet**.\nFlow: merge QP+MS -> transcribe (QP+MS) -> extract IDs -> transcribe AS with expected IDs -> grade -> (optional) imprint.")
|
| 524 |
|
| 525 |
with gr.Row():
|
| 526 |
qp_file = gr.File(label="π Upload Question Paper (PDF)")
|
|
|
|
| 528 |
ans_file = gr.File(label="π Upload Student Answer Sheet (PDF)")
|
| 529 |
|
| 530 |
imprint_toggle = gr.Checkbox(label="β Imprint Marks on Student Answer Sheet", value=False)
|
| 531 |
+
run_button = gr.Button("π Run Pipeline")
|
| 532 |
|
| 533 |
with gr.Row():
|
| 534 |
+
qpms_box = gr.Textbox(label="π QP+MS Transcript", lines=12)
|
| 535 |
+
as_box = gr.Textbox(label="π AS Transcript", lines=12)
|
| 536 |
|
| 537 |
+
grading_output_box = gr.Textbox(label="π§Ύ Grading (Markdown)", lines=20)
|
| 538 |
grading_pdf_file = gr.File(label="π₯ Download Grading PDF")
|
| 539 |
imprint_pdf_file = gr.File(label="π₯ Download Imprinted PDF (Optional)")
|
| 540 |
|
|
|
|
| 543 |
ms_path = ms_file_obj.name
|
| 544 |
ans_path = ans_file_obj.name
|
| 545 |
|
| 546 |
+
qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path = align_and_grade_pipeline(
|
| 547 |
qp_path, ms_path, ans_path, imprint=imprint_flag
|
| 548 |
)
|
| 549 |
|
| 550 |
+
return qpms_text or "", as_text or "", grading_text or "", grading_pdf_path, imprinted_pdf_path
|
|
|
|
| 551 |
|
| 552 |
run_button.click(
|
| 553 |
fn=run_pipeline,
|
| 554 |
inputs=[qp_file, ms_file, ans_file, imprint_toggle],
|
| 555 |
+
outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file]
|
| 556 |
)
|
| 557 |
|
| 558 |
if __name__ == "__main__":
|