Spaces:
Sleeping
Sleeping
Update app.py
Browse filesi hope this works
app.py
CHANGED
|
@@ -12,41 +12,61 @@ from pdf2image import convert_from_path
|
|
| 12 |
from PIL import Image, ImageDraw, ImageFont
|
| 13 |
import cv2
|
| 14 |
import numpy as np
|
|
|
|
|
|
|
| 15 |
|
| 16 |
-
# ----------
|
|
|
|
|
|
|
|
|
|
|
|
|
| 17 |
PROMPTS = {
|
| 18 |
-
"
|
| 19 |
"role": "system",
|
| 20 |
-
"content": """
|
| 21 |
-
|
| 22 |
-
|
| 23 |
-
|
| 24 |
-
|
| 25 |
-
|
| 26 |
-
|
| 27 |
-
|
| 28 |
-
|
| 29 |
-
|
| 30 |
-
|
| 31 |
-
|
| 32 |
-
|
| 33 |
-
|
| 34 |
-
|
| 35 |
-
|
| 36 |
-
|
| 37 |
-
|
| 38 |
-
|
| 39 |
-
|
| 40 |
-
|
| 41 |
-
|
| 42 |
-
|
| 43 |
-
|
| 44 |
-
|
| 45 |
-
|
| 46 |
-
|
| 47 |
-
|
| 48 |
-
|
| 49 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 50 |
},
|
| 51 |
|
| 52 |
"GRADING_PROMPT": {
|
|
@@ -85,18 +105,14 @@ At the very end, provide a summary table:
|
|
| 85 |
|-----------------|-------|--------|
|
| 86 |
| 1 | 6/7 | C |
|
| 87 |
Then show total clearly:
|
| 88 |
-
`Total: 6/7`
|
|
|
|
|
|
|
|
|
|
| 89 |
}
|
| 90 |
}
|
| 91 |
|
| 92 |
-
# --------------------
|
| 93 |
-
# The Gemini API key must be set in the environment
|
| 94 |
-
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
|
| 95 |
-
|
| 96 |
-
# Grid config for imprinting
|
| 97 |
-
GRID_ROWS, GRID_COLS = 20, 14
|
| 98 |
-
|
| 99 |
-
# ---------- HELPERS ----------
|
| 100 |
def save_as_pdf(text, filename="output.pdf"):
|
| 101 |
pdf = MarkdownPdf()
|
| 102 |
pdf.add_section(Section(text, toc=False))
|
|
@@ -104,10 +120,6 @@ def save_as_pdf(text, filename="output.pdf"):
|
|
| 104 |
return filename
|
| 105 |
|
| 106 |
def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
| 107 |
-
"""
|
| 108 |
-
Compress PDF only if its size is larger than max_size (default 20MB).
|
| 109 |
-
Returns path to (possibly compressed) file.
|
| 110 |
-
"""
|
| 111 |
if output_path is None:
|
| 112 |
base, ext = os.path.splitext(input_path)
|
| 113 |
output_path = f"{base}_compressed{ext}"
|
|
@@ -150,52 +162,183 @@ def create_model():
|
|
| 150 |
print("β‘ Falling back to gemini-2.5-flash model")
|
| 151 |
return genai.GenerativeModel("gemini-2.5-flash", generation_config={"temperature": 0})
|
| 152 |
|
| 153 |
-
# ---------- Extract marks per question (parse grading Markdown) ----------
|
| 154 |
def extract_marks_from_grading(grading_text):
|
| 155 |
-
"""
|
| 156 |
-
Parse the grading markdown produced by the GRADING_PROMPT and extract marks per question.
|
| 157 |
-
Returns dict: {"grading": [{"question": "1.a", "marks_awarded": ["M1","A1"]}, ...]}
|
| 158 |
-
"""
|
| 159 |
grading_json = {"grading": []}
|
| 160 |
-
|
| 161 |
-
# Split by question sections using "## Question" header
|
| 162 |
-
# We allow various header spacing, e.g. "## Question 1(a)" or "## Question 1(a)\n..."
|
| 163 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 164 |
-
for block in question_blocks[1:]:
|
| 165 |
-
# The first token up to newline is the question id line, e.g. "1(a)\n### Markscheme..."
|
| 166 |
first_line = block.strip().splitlines()[0].strip()
|
| 167 |
-
# Extract the question id - keep typical formats like 1, 1(a), 2.b, 3.d(ii)
|
| 168 |
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
|
| 169 |
if not q_id_match:
|
| 170 |
-
# fallback: try to extract tokens until first space
|
| 171 |
q_id = first_line.split()[0]
|
| 172 |
else:
|
| 173 |
q_id = q_id_match.group(1).strip()
|
| 174 |
-
|
| 175 |
-
# Now find all awarded marks in that block. Search the "Awarded" column entries like M1, A1, A0, R1 etc.
|
| 176 |
-
# We use a word-boundary regex to capture tokens.
|
| 177 |
awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
|
| 178 |
-
# Deduplicate preserving order
|
| 179 |
seen = set()
|
| 180 |
awarded_unique = []
|
| 181 |
for m in awarded:
|
| 182 |
if m not in seen:
|
| 183 |
awarded_unique.append(m)
|
| 184 |
seen.add(m)
|
| 185 |
-
|
| 186 |
grading_json["grading"].append({
|
| 187 |
"question": q_id,
|
| 188 |
"marks_awarded": awarded_unique
|
| 189 |
})
|
| 190 |
-
|
| 191 |
return grading_json
|
| 192 |
|
| 193 |
-
# ----------
|
| 194 |
-
def
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 195 |
"""
|
| 196 |
-
|
| 197 |
-
Returns
|
| 198 |
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 199 |
prompt = f"""
|
| 200 |
You are an exam marker. Your role is to identify where each question begins on the page.
|
| 201 |
The page is divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label (1..{rows*cols}).
|
|
@@ -212,53 +355,35 @@ Return JSON only, like:
|
|
| 212 |
Grading JSON:
|
| 213 |
{json.dumps(grading_json, indent=2)}
|
| 214 |
"""
|
| 215 |
-
# Load image file
|
| 216 |
img = Image.open(image_path)
|
| 217 |
-
# Send both prompt and image to Gemini
|
| 218 |
response = model.generate_content([prompt, img])
|
| 219 |
raw_text = getattr(response, "text", None)
|
| 220 |
if not raw_text and getattr(response, "candidates", None):
|
| 221 |
raw_text = response.candidates[0].content.parts[0].text
|
| 222 |
|
| 223 |
print("π Gemini mapping raw output (page):")
|
| 224 |
-
print(raw_text)
|
| 225 |
-
|
| 226 |
-
# Try to extract JSON from response
|
| 227 |
-
# Commonly model will return JSON; attempt to parse the first JSON array/list block
|
| 228 |
-
json_part = None
|
| 229 |
try:
|
| 230 |
-
# naive: find first '[' and last ']' and json.loads
|
| 231 |
start = raw_text.index('[')
|
| 232 |
end = raw_text.rindex(']') + 1
|
| 233 |
json_part = raw_text[start:end]
|
| 234 |
mapping = json.loads(json_part)
|
| 235 |
return mapping
|
| 236 |
except Exception as e:
|
| 237 |
-
print("β οΈ Failed to parse mapping JSON directly:", e)
|
| 238 |
-
# try to find 'json\n{...}\n' patterns
|
| 239 |
match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
|
| 240 |
if match:
|
| 241 |
try:
|
| 242 |
mapping = json.loads(match.group(1))
|
| 243 |
return mapping
|
| 244 |
-
except Exception
|
| 245 |
-
|
| 246 |
-
# fallback empty list
|
| 247 |
return []
|
| 248 |
|
| 249 |
-
# ---------- Imprinting Logic (uses mapping) ----------
|
| 250 |
def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=GRID_ROWS, cols=GRID_COLS):
|
| 251 |
-
"""
|
| 252 |
-
Convert PDF to images, create grid-numbered images, ask Gemini for mapping per page,
|
| 253 |
-
and then annotate marks beside the mapped cells.
|
| 254 |
-
Returns path to final imprinted (and possibly compressed) PDF.
|
| 255 |
-
Prints imprint steps in console for each page/question.
|
| 256 |
-
"""
|
| 257 |
pages = convert_from_path(pdf_path, dpi=200)
|
| 258 |
annotated_page_paths = []
|
| 259 |
print(f"π Converted answer PDF to {len(pages)} page image(s) for imprinting.")
|
| 260 |
|
| 261 |
-
# create grid-numbered temporary images for sending to Gemini
|
| 262 |
temp_grid_images = []
|
| 263 |
for p_index, page in enumerate(pages):
|
| 264 |
img = page.convert("RGB")
|
|
@@ -272,7 +397,6 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 272 |
num_font = ImageFont.load_default()
|
| 273 |
|
| 274 |
cell_num = 1
|
| 275 |
-
# We only need numbers for clarity when sending to model (but we won't draw gridlines)
|
| 276 |
for r in range(rows):
|
| 277 |
for c in range(cols):
|
| 278 |
x = int(c * cell_w + cell_w / 2)
|
|
@@ -288,52 +412,40 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 288 |
img.save(temp_path, "PNG")
|
| 289 |
temp_grid_images.append(temp_path)
|
| 290 |
|
| 291 |
-
# Now for each page, ask Gemini for mapping
|
| 292 |
for p_index, grid_img_path in enumerate(temp_grid_images):
|
| 293 |
print(f"\nπ° Sending page {p_index+1} to Gemini for mapping...")
|
| 294 |
mapping = ask_gemini_for_mapping_for_page(model, grid_img_path, grading_json, rows, cols)
|
| 295 |
print(f"π Parsed mapping for page {p_index+1}: {mapping}")
|
| 296 |
|
| 297 |
-
# Prepare a clean copy of the original page to annotate (no grid numbers)
|
| 298 |
page_img = pages[p_index].convert("RGB")
|
| 299 |
img_cv = np.array(page_img)
|
| 300 |
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
|
| 301 |
h, w, _ = img_cv.shape
|
| 302 |
cell_w_px, cell_h_px = w / cols, h / rows
|
| 303 |
|
| 304 |
-
# We will maintain a set of occupied cells to prefer right/left placement heuristics
|
| 305 |
occupied = set()
|
| 306 |
|
| 307 |
-
# For each mapping entry, place the corresponding marks
|
| 308 |
for item in mapping:
|
| 309 |
qid = item.get("question")
|
| 310 |
cell_number = item.get("cell_number")
|
| 311 |
if qid is None or cell_number is None:
|
| 312 |
continue
|
| 313 |
|
| 314 |
-
# Find marks for this question from grading_json
|
| 315 |
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == qid), [])
|
| 316 |
if not marks_list:
|
| 317 |
-
# possible the grading JSON uses slightly different formatting of q ids; try case-insensitive match
|
| 318 |
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", [])
|
| 319 |
if g["question"].lower() == qid.lower()), [])
|
| 320 |
|
| 321 |
marks_text = ",".join(marks_list) if marks_list else "?"
|
| 322 |
|
| 323 |
-
# Compute candidate cell coordinates
|
| 324 |
-
# Convert cell_number -> (row, col)
|
| 325 |
row = (cell_number - 1) // cols
|
| 326 |
col = (cell_number - 1) % cols
|
| 327 |
|
| 328 |
-
# Preference: place in cell to the right (col + 1), if within grid and not occupied.
|
| 329 |
placed = False
|
| 330 |
candidates = []
|
| 331 |
-
# Right cell
|
| 332 |
if col + 1 < cols:
|
| 333 |
candidates.append((row, col + 1))
|
| 334 |
-
# same cell (fallback)
|
| 335 |
candidates.append((row, col))
|
| 336 |
-
# left cell
|
| 337 |
if col - 1 >= 0:
|
| 338 |
candidates.append((row, col - 1))
|
| 339 |
|
|
@@ -346,93 +458,177 @@ def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=
|
|
| 346 |
break
|
| 347 |
|
| 348 |
if chosen is None:
|
| 349 |
-
# all occupied? just pick original cell
|
| 350 |
chosen = (row, col)
|
| 351 |
|
| 352 |
-
# Convert chosen cell to pixel coordinates (approx center-right)
|
| 353 |
r_c, c_c = chosen
|
| 354 |
-
x_c = int((c_c + 1) * cell_w_px - cell_w_px * 0.1)
|
| 355 |
y_c = int((r_c + 0.5) * cell_h_px)
|
| 356 |
|
| 357 |
-
# Print the imprint step to console
|
| 358 |
print(f"Page {p_index+1} | Question {qid} -> mapped cell {cell_number} -> chosen cell ({r_c},{c_c})"
|
| 359 |
f" -> pixel coords ({x_c},{y_c}) | marks: {marks_text}")
|
| 360 |
|
| 361 |
-
# Draw the text on the image (scale font according to cell size)
|
| 362 |
font_scale = max(0.6, min(1.6, cell_h_px / 60.0))
|
| 363 |
thickness = max(1, int(font_scale * 2))
|
| 364 |
-
# Use cv2.putText (BGR)
|
| 365 |
cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX,
|
| 366 |
font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
|
| 367 |
|
| 368 |
-
# Save annotated page
|
| 369 |
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 370 |
cv2.imwrite(annotated_path, img_cv)
|
| 371 |
annotated_page_paths.append(annotated_path)
|
| 372 |
print(f"π Annotated page saved: {annotated_path}")
|
| 373 |
|
| 374 |
-
# Merge annotated pages into a PDF
|
| 375 |
with open(output_pdf, "wb") as f:
|
| 376 |
f.write(img2pdf.convert(annotated_page_paths))
|
| 377 |
|
| 378 |
print(f"π Imprinted PDF saved to: {output_pdf}")
|
| 379 |
-
# Compress output PDF only if > 20MB
|
| 380 |
compressed = compress_pdf(output_pdf)
|
| 381 |
if compressed != output_pdf:
|
| 382 |
print(f"π¦ Imprinted PDF compressed: {compressed}")
|
| 383 |
return compressed
|
| 384 |
|
| 385 |
-
# ---------- Main pipeline ----------
|
| 386 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
| 387 |
"""
|
| 388 |
-
|
| 389 |
-
|
| 390 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 391 |
"""
|
| 392 |
try:
|
| 393 |
-
# Step 0: compress only if >20MB
|
| 394 |
qp_path = compress_pdf(qp_path)
|
| 395 |
ms_path = compress_pdf(ms_path)
|
| 396 |
ans_path = compress_pdf(ans_path)
|
| 397 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 398 |
# Upload files to Gemini
|
| 399 |
print("πΌ Uploading files to Gemini...")
|
| 400 |
-
|
| 401 |
-
ms_uploaded = genai.upload_file(path=ms_path, display_name="Markscheme")
|
| 402 |
ans_uploaded = genai.upload_file(path=ans_path, display_name="Answer Sheet")
|
| 403 |
|
| 404 |
model = create_model()
|
| 405 |
|
| 406 |
-
#
|
| 407 |
-
|
| 408 |
-
|
| 409 |
-
|
| 410 |
-
|
| 411 |
-
|
| 412 |
-
|
| 413 |
-
|
| 414 |
-
|
| 415 |
-
|
| 416 |
-
|
| 417 |
-
|
| 418 |
-
|
| 419 |
-
|
| 420 |
-
|
| 421 |
-
|
| 422 |
-
|
| 423 |
-
|
| 424 |
-
|
| 425 |
-
|
| 426 |
-
|
| 427 |
-
|
| 428 |
-
|
| 429 |
-
|
| 430 |
-
|
| 431 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 432 |
grading_text = getattr(response, "text", None)
|
| 433 |
if not grading_text and getattr(response, "candidates", None):
|
| 434 |
grading_text = response.candidates[0].content.parts[0].text
|
| 435 |
-
|
| 436 |
if not grading_text:
|
| 437 |
raise RuntimeError("No grading output returned from Gemini.")
|
| 438 |
|
|
@@ -444,7 +640,7 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 444 |
grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
|
| 445 |
print(f"π Grading PDF saved: {grading_pdf_path}")
|
| 446 |
|
| 447 |
-
#
|
| 448 |
grading_json = extract_marks_from_grading(grading_text)
|
| 449 |
print("π§ Extracted grading JSON (per-question marks):")
|
| 450 |
print(json.dumps(grading_json, indent=2))
|
|
@@ -452,20 +648,19 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 452 |
imprinted_pdf_path = None
|
| 453 |
if imprint:
|
| 454 |
print("β Imprint option enabled. Starting imprinting process...")
|
| 455 |
-
# Convert answer PDF to grid pages, ask Gemini for mapping per page and annotate
|
| 456 |
imprinted_pdf_path = f"{base_name}_imprinted.pdf"
|
| 457 |
imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, model)
|
| 458 |
print(f"β
Imprinting finished. Imprinted PDF at: {imprinted_pdf_path}")
|
| 459 |
|
| 460 |
-
return
|
| 461 |
|
| 462 |
except Exception as e:
|
| 463 |
print("β Pipeline error:", e)
|
| 464 |
return f"β Error: {e}", None, None, None
|
| 465 |
|
| 466 |
-
# ---------- GRADIO
|
| 467 |
-
with gr.Blocks(title="LeadIB AI Grading (
|
| 468 |
-
gr.Markdown("## π LeadIB AI Grading\nUpload **Question Paper**, **Markscheme**, and **Student Answer Sheet**.\nSystem
|
| 469 |
|
| 470 |
with gr.Row():
|
| 471 |
qp_file = gr.File(label="π Upload Question Paper (PDF)")
|
|
@@ -476,14 +671,13 @@ with gr.Blocks(title="LeadIB AI Grading (Alignment + Auto-Grading + Imprint)") a
|
|
| 476 |
run_button = gr.Button("π Run Alignment + Grading")
|
| 477 |
|
| 478 |
with gr.Row():
|
| 479 |
-
json_output_box = gr.Textbox(label="π Step
|
| 480 |
-
grading_output_box = gr.Textbox(label="π Step
|
| 481 |
|
| 482 |
grading_pdf_file = gr.File(label="π₯ Download Grading PDF")
|
| 483 |
imprint_pdf_file = gr.File(label="π₯ Download Imprinted PDF (Optional)")
|
| 484 |
|
| 485 |
def run_pipeline(qp_file_obj, ms_file_obj, ans_file_obj, imprint_flag):
|
| 486 |
-
# Gradio File objects have .name attribute when saved locally
|
| 487 |
qp_path = qp_file_obj.name
|
| 488 |
ms_path = ms_file_obj.name
|
| 489 |
ans_path = ans_file_obj.name
|
|
@@ -492,7 +686,7 @@ with gr.Blocks(title="LeadIB AI Grading (Alignment + Auto-Grading + Imprint)") a
|
|
| 492 |
qp_path, ms_path, ans_path, imprint=imprint_flag
|
| 493 |
)
|
| 494 |
|
| 495 |
-
# For Gradio file outputs: return
|
| 496 |
return alignment_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 497 |
|
| 498 |
run_button.click(
|
|
|
|
| 12 |
from PIL import Image, ImageDraw, ImageFont
|
| 13 |
import cv2
|
| 14 |
import numpy as np
|
| 15 |
+
from concurrent.futures import ThreadPoolExecutor, as_completed
|
| 16 |
+
from PyPDF2 import PdfReader, PdfWriter
|
| 17 |
|
| 18 |
+
# ---------- CONFIG ----------
|
| 19 |
+
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
|
| 20 |
+
GRID_ROWS, GRID_COLS = 20, 14
|
| 21 |
+
|
| 22 |
+
# ---------- PROMPTS (updated) ----------
|
| 23 |
PROMPTS = {
|
| 24 |
+
"QP_MS_TRANSCRIBE": {
|
| 25 |
"role": "system",
|
| 26 |
+
"content": """You are a high-quality OCR/Transcription assistant.
|
| 27 |
+
|
| 28 |
+
INPUT: This file is a PDF that **first contains the Question Paper** and immediately after it **the Markscheme**.
|
| 29 |
+
TASK: Produce an exact transcription in plain text with clear separators. For every question in the Question Paper extract and output:
|
| 30 |
+
- Question ID (exact as printed, e.g., "1", "2(a)", "3.b", "4(ii)")
|
| 31 |
+
- Question text (exact wording; do not change punctuation)
|
| 32 |
+
- Total marks for the question (if printed; otherwise try to infer/leave blank)
|
| 33 |
+
|
| 34 |
+
FOR THE MARKSCHEME: Transcribe the markscheme **verbatim** exactly as it appears. Do NOT alter mark IDs, abbreviations, indentation, or descriptions. The markscheme transcription must be faithful β errors in transcription should be kept as-is rather than "corrected".
|
| 35 |
+
|
| 36 |
+
OUTPUT FORMAT:
|
| 37 |
+
- Plain text with clearly delimited blocks. Use a pattern like:
|
| 38 |
+
----
|
| 39 |
+
QUESTION BEGIN
|
| 40 |
+
ID: <id>
|
| 41 |
+
QTEXT:
|
| 42 |
+
<question text (multiline)>
|
| 43 |
+
TOTAL_MARKS: <integer or empty>
|
| 44 |
+
MARKSCHEME:
|
| 45 |
+
<verbatim markscheme lines for this question (multiline)>
|
| 46 |
+
QUESTION END
|
| 47 |
+
----
|
| 48 |
+
Repeat for every question in order. If some part is not available, leave the field empty but keep the block structure.
|
| 49 |
+
"""
|
| 50 |
+
},
|
| 51 |
+
|
| 52 |
+
"AS_TRANSCRIBE": {
|
| 53 |
+
"role": "system",
|
| 54 |
+
"content": """You are a high-quality handwritten transcription assistant.
|
| 55 |
+
|
| 56 |
+
INPUT: This PDF contains a student's handwritten answer sheet.
|
| 57 |
+
TASK: Transcribe the student's answers exactly (as text). Preserve step order and line breaks. Attempt to assign each answer to a question ID if the student has labelled it (e.g., "1", "1a", "2(b)", "3"). If the student hasn't labelled answers, segment contiguous answer blocks and attempt to infer question IDs from context β but mark inferred IDs clearly as "INFERRED: <id>".
|
| 58 |
+
|
| 59 |
+
OUTPUT FORMAT:
|
| 60 |
+
Produce plain text with clearly delimited answer blocks using the pattern:
|
| 61 |
+
----
|
| 62 |
+
ANSWER BEGIN
|
| 63 |
+
ID: <id or INFERRED:... or EMPTY>
|
| 64 |
+
ANSWER:
|
| 65 |
+
<transcribed student answer text (multiline)>
|
| 66 |
+
ANSWER END
|
| 67 |
+
----
|
| 68 |
+
Repeat for each student answer block found.
|
| 69 |
+
"""
|
| 70 |
},
|
| 71 |
|
| 72 |
"GRADING_PROMPT": {
|
|
|
|
| 105 |
|-----------------|-------|--------|
|
| 106 |
| 1 | 6/7 | C |
|
| 107 |
Then show total clearly:
|
| 108 |
+
`Total: 6/7`
|
| 109 |
+
|
| 110 |
+
NOTES: The assistant will receive a structured alignment JSON (questions list with qp, total_marks, ms verbatim, and as transcribed). Grade each question independently, using the markscheme provided in the `ms` field (verbatim) and the student's `as`. Provide full markdown output as described above.
|
| 111 |
+
"""
|
| 112 |
}
|
| 113 |
}
|
| 114 |
|
| 115 |
+
# -------------------- HELPERS (unchanged unless needed) --------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 116 |
def save_as_pdf(text, filename="output.pdf"):
|
| 117 |
pdf = MarkdownPdf()
|
| 118 |
pdf.add_section(Section(text, toc=False))
|
|
|
|
| 120 |
return filename
|
| 121 |
|
| 122 |
def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 123 |
if output_path is None:
|
| 124 |
base, ext = os.path.splitext(input_path)
|
| 125 |
output_path = f"{base}_compressed{ext}"
|
|
|
|
| 162 |
print("β‘ Falling back to gemini-2.5-flash model")
|
| 163 |
return genai.GenerativeModel("gemini-2.5-flash", generation_config={"temperature": 0})
|
| 164 |
|
|
|
|
| 165 |
def extract_marks_from_grading(grading_text):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 166 |
grading_json = {"grading": []}
|
| 167 |
+
# Split by question header
|
|
|
|
|
|
|
| 168 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 169 |
+
for block in question_blocks[1:]:
|
|
|
|
| 170 |
first_line = block.strip().splitlines()[0].strip()
|
|
|
|
| 171 |
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
|
| 172 |
if not q_id_match:
|
|
|
|
| 173 |
q_id = first_line.split()[0]
|
| 174 |
else:
|
| 175 |
q_id = q_id_match.group(1).strip()
|
|
|
|
|
|
|
|
|
|
| 176 |
awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
|
|
|
|
| 177 |
seen = set()
|
| 178 |
awarded_unique = []
|
| 179 |
for m in awarded:
|
| 180 |
if m not in seen:
|
| 181 |
awarded_unique.append(m)
|
| 182 |
seen.add(m)
|
|
|
|
| 183 |
grading_json["grading"].append({
|
| 184 |
"question": q_id,
|
| 185 |
"marks_awarded": awarded_unique
|
| 186 |
})
|
|
|
|
| 187 |
return grading_json
|
| 188 |
|
| 189 |
+
# ---------- PDF merging helper ----------
|
| 190 |
+
def merge_pdfs(paths, output_path):
|
| 191 |
+
writer = PdfWriter()
|
| 192 |
+
for p in paths:
|
| 193 |
+
reader = PdfReader(p)
|
| 194 |
+
for page in reader.pages:
|
| 195 |
+
writer.add_page(page)
|
| 196 |
+
with open(output_path, "wb") as f:
|
| 197 |
+
writer.write(f)
|
| 198 |
+
return output_path
|
| 199 |
+
|
| 200 |
+
# ---------- Transcript parsing helpers ----------
|
| 201 |
+
def parse_qp_ms_transcript(text):
|
| 202 |
+
"""
|
| 203 |
+
Parse QP+MS transcript produced according to the QP_MS_TRANSCRIBE prompt blocks.
|
| 204 |
+
Expected block markers: QUESTION BEGIN ... QUESTION END with fields ID, QTEXT, TOTAL_MARKS, MARKSCHEME.
|
| 205 |
+
Return list of questions: {id, qp, total_marks, ms}
|
| 206 |
+
"""
|
| 207 |
+
questions = []
|
| 208 |
+
# Try to find blocks using the explicit markers we requested
|
| 209 |
+
blocks = re.findall(r"QUESTION BEGIN(.*?)QUESTION END", text, flags=re.DOTALL | re.IGNORECASE)
|
| 210 |
+
if blocks:
|
| 211 |
+
for block in blocks:
|
| 212 |
+
id_match = re.search(r"ID:\s*(.+)", block)
|
| 213 |
+
qtext_match = re.search(r"QTEXT:\s*(.*?)\n(?:TOTAL_MARKS:|MARKSCHEME:)", block, flags=re.DOTALL)
|
| 214 |
+
tm_match = re.search(r"TOTAL_MARKS:\s*(.*)", block)
|
| 215 |
+
ms_match = re.search(r"MARKSCHEME:\s*(.*)", block, flags=re.DOTALL)
|
| 216 |
+
qid = id_match.group(1).strip() if id_match else ""
|
| 217 |
+
qtext = qtext_match.group(1).strip() if qtext_match else ""
|
| 218 |
+
total_marks = tm_match.group(1).strip() if tm_match else ""
|
| 219 |
+
# try to normalize total_marks to int if possible
|
| 220 |
+
try:
|
| 221 |
+
total_marks = int(re.search(r"\d+", total_marks).group(0)) if total_marks else None
|
| 222 |
+
except Exception:
|
| 223 |
+
total_marks = None
|
| 224 |
+
ms = ms_match.group(1).strip() if ms_match else ""
|
| 225 |
+
questions.append({
|
| 226 |
+
"id": qid,
|
| 227 |
+
"qp": qtext,
|
| 228 |
+
"total_marks": total_marks,
|
| 229 |
+
"ms": ms
|
| 230 |
+
})
|
| 231 |
+
return questions
|
| 232 |
+
|
| 233 |
+
# Fallback: If model didn't follow markers, try splitting by lines that look like question headers
|
| 234 |
+
# This is conservative: find headings like "1", "1.", "1(a)" at line starts
|
| 235 |
+
parts = re.split(r"(?m)^\s*(\d+(?:\([a-zA-Z0-9]+\)|[a-zA-Z]|\.[a-zA-Z0-9]+)?)\s*[\.\):\-]\s*", text)
|
| 236 |
+
# parts list pattern: [pretext, id1, body1, id2, body2, ...]
|
| 237 |
+
if len(parts) >= 3:
|
| 238 |
+
it = iter(parts)
|
| 239 |
+
pre = next(it)
|
| 240 |
+
while True:
|
| 241 |
+
try:
|
| 242 |
+
qid = next(it).strip()
|
| 243 |
+
body = next(it)
|
| 244 |
+
except StopIteration:
|
| 245 |
+
break
|
| 246 |
+
# try to separate question text and markscheme inside body using "Markscheme" keyword
|
| 247 |
+
ms_split = re.split(r"(?i)\bmarkscheme\b|(?i)\bmark scheme\b", body, maxsplit=1)
|
| 248 |
+
if len(ms_split) == 2:
|
| 249 |
+
qtext = ms_split[0].strip(":-\n ")
|
| 250 |
+
ms = ms_split[1].strip()
|
| 251 |
+
else:
|
| 252 |
+
# try to look for "Marks" summary then rest
|
| 253 |
+
m_search = re.search(r"(?i)\bmarks[:\s]*\d+", body)
|
| 254 |
+
if m_search:
|
| 255 |
+
# take text before marks as qtext
|
| 256 |
+
qtext = body[:m_search.start()].strip()
|
| 257 |
+
ms = body[m_search.start():].strip()
|
| 258 |
+
else:
|
| 259 |
+
# fallback: put entire body into qp and ms empty
|
| 260 |
+
qtext = body.strip()
|
| 261 |
+
ms = ""
|
| 262 |
+
# try to find total marks integer
|
| 263 |
+
tm = None
|
| 264 |
+
tm_found = re.search(r"(?i)(?:total\s*marks|marks|[\/]\s*\d+|out of)\s*[:\s]*?(\d+)", body)
|
| 265 |
+
if tm_found:
|
| 266 |
+
try:
|
| 267 |
+
tm = int(tm_found.group(1))
|
| 268 |
+
except:
|
| 269 |
+
tm = None
|
| 270 |
+
questions.append({
|
| 271 |
+
"id": qid,
|
| 272 |
+
"qp": qtext,
|
| 273 |
+
"total_marks": tm,
|
| 274 |
+
"ms": ms
|
| 275 |
+
})
|
| 276 |
+
return questions
|
| 277 |
+
|
| 278 |
+
# If nothing found, return one block with raw text as fallback
|
| 279 |
+
return [{"id": "1", "qp": text.strip(), "total_marks": None, "ms": ""}]
|
| 280 |
+
|
| 281 |
+
def parse_as_transcript(text):
|
| 282 |
+
"""
|
| 283 |
+
Parse AS transcript into answer blocks. Expected markers ANSWER BEGIN ... ANSWER END.
|
| 284 |
+
Return list: {id, ans}
|
| 285 |
+
"""
|
| 286 |
+
answers = []
|
| 287 |
+
blocks = re.findall(r"ANSWER BEGIN(.*?)ANSWER END", text, flags=re.DOTALL | re.IGNORECASE)
|
| 288 |
+
if blocks:
|
| 289 |
+
for block in blocks:
|
| 290 |
+
id_match = re.search(r"ID:\s*(.+)", block)
|
| 291 |
+
ans_match = re.search(r"ANSWER:\s*(.*)", block, flags=re.DOTALL)
|
| 292 |
+
qid = id_match.group(1).strip() if id_match else ""
|
| 293 |
+
ans = ans_match.group(1).strip() if ans_match else block.strip()
|
| 294 |
+
answers.append({
|
| 295 |
+
"id": qid,
|
| 296 |
+
"as": ans
|
| 297 |
+
})
|
| 298 |
+
return answers
|
| 299 |
+
|
| 300 |
+
# Fallback: split by likely question labels in the student's transcription, e.g., "1.", "1)", "1a."
|
| 301 |
+
parts = re.split(r"(?m)^\s*(\d+(?:[a-zA-Z]|\([^\)]+\))?)\s*[\.\):\-]\s*", text)
|
| 302 |
+
if len(parts) >= 3:
|
| 303 |
+
it = iter(parts)
|
| 304 |
+
pre = next(it)
|
| 305 |
+
while True:
|
| 306 |
+
try:
|
| 307 |
+
qid = next(it).strip()
|
| 308 |
+
body = next(it)
|
| 309 |
+
except StopIteration:
|
| 310 |
+
break
|
| 311 |
+
answers.append({"id": qid, "as": body.strip()})
|
| 312 |
+
return answers
|
| 313 |
+
|
| 314 |
+
# If no structure at all, try to chunk by double newlines
|
| 315 |
+
chunks = [c.strip() for c in text.split("\n\n") if c.strip()]
|
| 316 |
+
for i, c in enumerate(chunks, start=1):
|
| 317 |
+
answers.append({"id": f"INFERRED:{i}", "as": c})
|
| 318 |
+
return answers
|
| 319 |
+
|
| 320 |
+
# ---------- Gemini call wrapper ----------
|
| 321 |
+
def gemini_generate_content(model, prompt_text, file_upload_obj=None):
|
| 322 |
"""
|
| 323 |
+
Helper: send prompt_text and optionally a single uploaded file to model.generate_content.
|
| 324 |
+
Returns the textual response (str).
|
| 325 |
"""
|
| 326 |
+
inputs = [prompt_text]
|
| 327 |
+
if file_upload_obj:
|
| 328 |
+
inputs.append(file_upload_obj)
|
| 329 |
+
response = model.generate_content(inputs)
|
| 330 |
+
# Response handling as in original script
|
| 331 |
+
raw_text = getattr(response, "text", None)
|
| 332 |
+
if not raw_text and getattr(response, "candidates", None):
|
| 333 |
+
# new-style candidate chain
|
| 334 |
+
raw_text = response.candidates[0].content.parts[0].text
|
| 335 |
+
if not raw_text:
|
| 336 |
+
# attempt to stringify response
|
| 337 |
+
raw_text = str(response)
|
| 338 |
+
return raw_text
|
| 339 |
+
|
| 340 |
+
# ---------- Imprinting and mapping helpers remain unchanged ----------
|
| 341 |
+
def ask_gemini_for_mapping_for_page(model, image_path, grading_json, rows=GRID_ROWS, cols=GRID_COLS):
|
| 342 |
prompt = f"""
|
| 343 |
You are an exam marker. Your role is to identify where each question begins on the page.
|
| 344 |
The page is divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label (1..{rows*cols}).
|
|
|
|
| 355 |
Grading JSON:
|
| 356 |
{json.dumps(grading_json, indent=2)}
|
| 357 |
"""
|
|
|
|
| 358 |
img = Image.open(image_path)
|
|
|
|
| 359 |
response = model.generate_content([prompt, img])
|
| 360 |
raw_text = getattr(response, "text", None)
|
| 361 |
if not raw_text and getattr(response, "candidates", None):
|
| 362 |
raw_text = response.candidates[0].content.parts[0].text
|
| 363 |
|
| 364 |
print("π Gemini mapping raw output (page):")
|
| 365 |
+
print(raw_text[:1000] + ("..." if len(raw_text) > 1000 else ""))
|
|
|
|
|
|
|
|
|
|
|
|
|
| 366 |
try:
|
|
|
|
| 367 |
start = raw_text.index('[')
|
| 368 |
end = raw_text.rindex(']') + 1
|
| 369 |
json_part = raw_text[start:end]
|
| 370 |
mapping = json.loads(json_part)
|
| 371 |
return mapping
|
| 372 |
except Exception as e:
|
|
|
|
|
|
|
| 373 |
match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
|
| 374 |
if match:
|
| 375 |
try:
|
| 376 |
mapping = json.loads(match.group(1))
|
| 377 |
return mapping
|
| 378 |
+
except Exception:
|
| 379 |
+
pass
|
|
|
|
| 380 |
return []
|
| 381 |
|
|
|
|
| 382 |
def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, model, rows=GRID_ROWS, cols=GRID_COLS):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 383 |
pages = convert_from_path(pdf_path, dpi=200)
|
| 384 |
annotated_page_paths = []
|
| 385 |
print(f"π Converted answer PDF to {len(pages)} page image(s) for imprinting.")
|
| 386 |
|
|
|
|
| 387 |
temp_grid_images = []
|
| 388 |
for p_index, page in enumerate(pages):
|
| 389 |
img = page.convert("RGB")
|
|
|
|
| 397 |
num_font = ImageFont.load_default()
|
| 398 |
|
| 399 |
cell_num = 1
|
|
|
|
| 400 |
for r in range(rows):
|
| 401 |
for c in range(cols):
|
| 402 |
x = int(c * cell_w + cell_w / 2)
|
|
|
|
| 412 |
img.save(temp_path, "PNG")
|
| 413 |
temp_grid_images.append(temp_path)
|
| 414 |
|
|
|
|
| 415 |
for p_index, grid_img_path in enumerate(temp_grid_images):
|
| 416 |
print(f"\nπ° Sending page {p_index+1} to Gemini for mapping...")
|
| 417 |
mapping = ask_gemini_for_mapping_for_page(model, grid_img_path, grading_json, rows, cols)
|
| 418 |
print(f"π Parsed mapping for page {p_index+1}: {mapping}")
|
| 419 |
|
|
|
|
| 420 |
page_img = pages[p_index].convert("RGB")
|
| 421 |
img_cv = np.array(page_img)
|
| 422 |
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
|
| 423 |
h, w, _ = img_cv.shape
|
| 424 |
cell_w_px, cell_h_px = w / cols, h / rows
|
| 425 |
|
|
|
|
| 426 |
occupied = set()
|
| 427 |
|
|
|
|
| 428 |
for item in mapping:
|
| 429 |
qid = item.get("question")
|
| 430 |
cell_number = item.get("cell_number")
|
| 431 |
if qid is None or cell_number is None:
|
| 432 |
continue
|
| 433 |
|
|
|
|
| 434 |
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == qid), [])
|
| 435 |
if not marks_list:
|
|
|
|
| 436 |
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", [])
|
| 437 |
if g["question"].lower() == qid.lower()), [])
|
| 438 |
|
| 439 |
marks_text = ",".join(marks_list) if marks_list else "?"
|
| 440 |
|
|
|
|
|
|
|
| 441 |
row = (cell_number - 1) // cols
|
| 442 |
col = (cell_number - 1) % cols
|
| 443 |
|
|
|
|
| 444 |
placed = False
|
| 445 |
candidates = []
|
|
|
|
| 446 |
if col + 1 < cols:
|
| 447 |
candidates.append((row, col + 1))
|
|
|
|
| 448 |
candidates.append((row, col))
|
|
|
|
| 449 |
if col - 1 >= 0:
|
| 450 |
candidates.append((row, col - 1))
|
| 451 |
|
|
|
|
| 458 |
break
|
| 459 |
|
| 460 |
if chosen is None:
|
|
|
|
| 461 |
chosen = (row, col)
|
| 462 |
|
|
|
|
| 463 |
r_c, c_c = chosen
|
| 464 |
+
x_c = int((c_c + 1) * cell_w_px - cell_w_px * 0.1)
|
| 465 |
y_c = int((r_c + 0.5) * cell_h_px)
|
| 466 |
|
|
|
|
| 467 |
print(f"Page {p_index+1} | Question {qid} -> mapped cell {cell_number} -> chosen cell ({r_c},{c_c})"
|
| 468 |
f" -> pixel coords ({x_c},{y_c}) | marks: {marks_text}")
|
| 469 |
|
|
|
|
| 470 |
font_scale = max(0.6, min(1.6, cell_h_px / 60.0))
|
| 471 |
thickness = max(1, int(font_scale * 2))
|
|
|
|
| 472 |
cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX,
|
| 473 |
font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
|
| 474 |
|
|
|
|
| 475 |
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 476 |
cv2.imwrite(annotated_path, img_cv)
|
| 477 |
annotated_page_paths.append(annotated_path)
|
| 478 |
print(f"π Annotated page saved: {annotated_path}")
|
| 479 |
|
|
|
|
| 480 |
with open(output_pdf, "wb") as f:
|
| 481 |
f.write(img2pdf.convert(annotated_page_paths))
|
| 482 |
|
| 483 |
print(f"π Imprinted PDF saved to: {output_pdf}")
|
|
|
|
| 484 |
compressed = compress_pdf(output_pdf)
|
| 485 |
if compressed != output_pdf:
|
| 486 |
print(f"π¦ Imprinted PDF compressed: {compressed}")
|
| 487 |
return compressed
|
| 488 |
|
| 489 |
+
# ---------- Main pipeline (rewritten) ----------
|
| 490 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
| 491 |
"""
|
| 492 |
+
New flow:
|
| 493 |
+
1) compress as needed
|
| 494 |
+
2) merge QP + MS -> merged_qpms.pdf
|
| 495 |
+
3) upload merged_qpms and ans separately
|
| 496 |
+
4) send two parallel transcription requests:
|
| 497 |
+
- merged_qpms with QP_MS_TRANSCRIBE prompt
|
| 498 |
+
- ans with AS_TRANSCRIBE prompt
|
| 499 |
+
5) parse transcripts to get per-question qp, ms, and per-answer as
|
| 500 |
+
6) align locally by question ID
|
| 501 |
+
7) send aligned structure to grading prompt
|
| 502 |
+
8) extract marks and optionally imprint
|
| 503 |
"""
|
| 504 |
try:
|
|
|
|
| 505 |
qp_path = compress_pdf(qp_path)
|
| 506 |
ms_path = compress_pdf(ms_path)
|
| 507 |
ans_path = compress_pdf(ans_path)
|
| 508 |
|
| 509 |
+
# Merge QP + MS into single PDF
|
| 510 |
+
merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
|
| 511 |
+
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
| 512 |
+
print(f"π Merged QP + MS -> {merged_qpms_path}")
|
| 513 |
+
|
| 514 |
# Upload files to Gemini
|
| 515 |
print("πΌ Uploading files to Gemini...")
|
| 516 |
+
merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
|
|
|
|
| 517 |
ans_uploaded = genai.upload_file(path=ans_path, display_name="Answer Sheet")
|
| 518 |
|
| 519 |
model = create_model()
|
| 520 |
|
| 521 |
+
# Prepare prompts
|
| 522 |
+
qpms_prompt = PROMPTS["QP_MS_TRANSCRIBE"]["content"]
|
| 523 |
+
as_prompt = PROMPTS["AS_TRANSCRIBE"]["content"]
|
| 524 |
+
|
| 525 |
+
# Send both requests in parallel
|
| 526 |
+
print("π‘ Sending transcription requests (QP+MS & AS) in parallel...")
|
| 527 |
+
transcripts = {}
|
| 528 |
+
with ThreadPoolExecutor(max_workers=2) as ex:
|
| 529 |
+
futures = {
|
| 530 |
+
ex.submit(gemini_generate_content, model, qpms_prompt, merged_uploaded): "qpms",
|
| 531 |
+
ex.submit(gemini_generate_content, model, as_prompt, ans_uploaded): "as"
|
| 532 |
+
}
|
| 533 |
+
for fut in as_completed(futures):
|
| 534 |
+
key = futures[fut]
|
| 535 |
+
try:
|
| 536 |
+
res_text = fut.result()
|
| 537 |
+
except Exception as e:
|
| 538 |
+
res_text = f"β Error during transcription: {e}"
|
| 539 |
+
transcripts[key] = res_text
|
| 540 |
+
print(f"β
Transcription complete for: {key} (chars: {len(res_text)})")
|
| 541 |
+
|
| 542 |
+
qpms_text = transcripts.get("qpms", "")
|
| 543 |
+
as_text = transcripts.get("as", "")
|
| 544 |
+
|
| 545 |
+
# Debug: save transcripts for review
|
| 546 |
+
with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f:
|
| 547 |
+
f.write(qpms_text)
|
| 548 |
+
with open("debug_as_transcript.txt", "w", encoding="utf-8") as f:
|
| 549 |
+
f.write(as_text)
|
| 550 |
+
|
| 551 |
+
# Parse transcripts
|
| 552 |
+
print("π§ Parsing QP+MS transcript...")
|
| 553 |
+
qpms_questions = parse_qp_ms_transcript(qpms_text)
|
| 554 |
+
print(f"Found {len(qpms_questions)} questions in QP+MS transcript.")
|
| 555 |
+
|
| 556 |
+
print("π§ Parsing Answer Sheet transcript...")
|
| 557 |
+
as_answers = parse_as_transcript(as_text)
|
| 558 |
+
print(f"Found {len(as_answers)} answer blocks in AS transcript.")
|
| 559 |
+
|
| 560 |
+
# Build alignment: map by normalized IDs
|
| 561 |
+
def normalize_id(qid):
|
| 562 |
+
if not qid:
|
| 563 |
+
return ""
|
| 564 |
+
s = qid.strip().lower()
|
| 565 |
+
s = re.sub(r"[\.\)\(:\s]+", "", s)
|
| 566 |
+
return s
|
| 567 |
+
|
| 568 |
+
answers_map = {}
|
| 569 |
+
for a in as_answers:
|
| 570 |
+
nid = normalize_id(a.get("id", ""))
|
| 571 |
+
if nid == "":
|
| 572 |
+
# if empty id, try to infer using INFERRED: or use a running fallback index
|
| 573 |
+
nid = a.get("id", "")
|
| 574 |
+
# store first matching block (if multiple blocks for same id, append)
|
| 575 |
+
if nid in answers_map:
|
| 576 |
+
answers_map[nid] += "\n\n" + a.get("as", "")
|
| 577 |
+
else:
|
| 578 |
+
answers_map[nid] = a.get("as", "")
|
| 579 |
+
|
| 580 |
+
aligned_questions = []
|
| 581 |
+
for q in qpms_questions:
|
| 582 |
+
qid = q.get("id", "")
|
| 583 |
+
nid = normalize_id(qid)
|
| 584 |
+
# try direct id match
|
| 585 |
+
student_ans = answers_map.get(nid)
|
| 586 |
+
# try alternative matches (e.g., '1a' vs '1(a)')
|
| 587 |
+
if student_ans is None:
|
| 588 |
+
for k in answers_map:
|
| 589 |
+
if k.startswith(nid) or nid.startswith(k) or (nid and nid.replace(" ", "") in k):
|
| 590 |
+
student_ans = answers_map[k]
|
| 591 |
+
break
|
| 592 |
+
# fallback: look for first answer that contains the question id as text (loose)
|
| 593 |
+
if student_ans is None:
|
| 594 |
+
for k, v in answers_map.items():
|
| 595 |
+
if qid and qid.lower() in k:
|
| 596 |
+
student_ans = v
|
| 597 |
+
break
|
| 598 |
+
|
| 599 |
+
aligned_questions.append({
|
| 600 |
+
"id": qid,
|
| 601 |
+
"qp": q.get("qp", ""),
|
| 602 |
+
"total_marks": q.get("total_marks"),
|
| 603 |
+
"ms": q.get("ms", ""), # verbatim markscheme block
|
| 604 |
+
"as": student_ans if student_ans is not None else ""
|
| 605 |
+
})
|
| 606 |
+
|
| 607 |
+
# If any answer blocks left unmatched, optionally append them as INFERRED entries
|
| 608 |
+
matched_ids = set([normalize_id(q["id"]) for q in aligned_questions])
|
| 609 |
+
for k, v in answers_map.items():
|
| 610 |
+
if k not in matched_ids:
|
| 611 |
+
aligned_questions.append({
|
| 612 |
+
"id": k,
|
| 613 |
+
"qp": "",
|
| 614 |
+
"total_marks": None,
|
| 615 |
+
"ms": "",
|
| 616 |
+
"as": v
|
| 617 |
+
})
|
| 618 |
+
|
| 619 |
+
# Build alignment JSON text to send to grading model
|
| 620 |
+
alignment_payload = {"questions": aligned_questions}
|
| 621 |
+
alignment_json_text = json.dumps(alignment_payload, indent=2, ensure_ascii=False)
|
| 622 |
+
print("π¦ Built alignment JSON (truncated):")
|
| 623 |
+
print(alignment_json_text[:1000] + ("..." if len(alignment_json_text) > 1000 else ""))
|
| 624 |
+
|
| 625 |
+
# Step: grading
|
| 626 |
+
print("2οΈβ£ Sending grading prompt to Gemini...")
|
| 627 |
+
# We send both the system grading prompt and the alignment JSON as content
|
| 628 |
+
response = model.generate_content([PROMPTS["GRADING_PROMPT"]["content"], alignment_json_text])
|
| 629 |
grading_text = getattr(response, "text", None)
|
| 630 |
if not grading_text and getattr(response, "candidates", None):
|
| 631 |
grading_text = response.candidates[0].content.parts[0].text
|
|
|
|
| 632 |
if not grading_text:
|
| 633 |
raise RuntimeError("No grading output returned from Gemini.")
|
| 634 |
|
|
|
|
| 640 |
grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
|
| 641 |
print(f"π Grading PDF saved: {grading_pdf_path}")
|
| 642 |
|
| 643 |
+
# Extract marks for imprinting
|
| 644 |
grading_json = extract_marks_from_grading(grading_text)
|
| 645 |
print("π§ Extracted grading JSON (per-question marks):")
|
| 646 |
print(json.dumps(grading_json, indent=2))
|
|
|
|
| 648 |
imprinted_pdf_path = None
|
| 649 |
if imprint:
|
| 650 |
print("β Imprint option enabled. Starting imprinting process...")
|
|
|
|
| 651 |
imprinted_pdf_path = f"{base_name}_imprinted.pdf"
|
| 652 |
imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, model)
|
| 653 |
print(f"β
Imprinting finished. Imprinted PDF at: {imprinted_pdf_path}")
|
| 654 |
|
| 655 |
+
return alignment_json_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 656 |
|
| 657 |
except Exception as e:
|
| 658 |
print("β Pipeline error:", e)
|
| 659 |
return f"β Error: {e}", None, None, None
|
| 660 |
|
| 661 |
+
# ---------------- GRADIO UI (adapted) ----------------
|
| 662 |
+
with gr.Blocks(title="LeadIB AI Grading (New Flow: Parallel Transcription + Align + Grade)") as demo:
|
| 663 |
+
gr.Markdown("## π LeadIB AI Grading β Updated Flow\nUpload **Question Paper**, **Markscheme**, and **Student Answer Sheet**.\nSystem: merge QP+MS -> transcribe QP+MS and AS in parallel -> align locally -> grade -> (optional) imprint marks.")
|
| 664 |
|
| 665 |
with gr.Row():
|
| 666 |
qp_file = gr.File(label="π Upload Question Paper (PDF)")
|
|
|
|
| 671 |
run_button = gr.Button("π Run Alignment + Grading")
|
| 672 |
|
| 673 |
with gr.Row():
|
| 674 |
+
json_output_box = gr.Textbox(label="π Step: Alignment (JSON)", lines=20)
|
| 675 |
+
grading_output_box = gr.Textbox(label="π Step: Grading (Markdown)", lines=20)
|
| 676 |
|
| 677 |
grading_pdf_file = gr.File(label="π₯ Download Grading PDF")
|
| 678 |
imprint_pdf_file = gr.File(label="π₯ Download Imprinted PDF (Optional)")
|
| 679 |
|
| 680 |
def run_pipeline(qp_file_obj, ms_file_obj, ans_file_obj, imprint_flag):
|
|
|
|
| 681 |
qp_path = qp_file_obj.name
|
| 682 |
ms_path = ms_file_obj.name
|
| 683 |
ans_path = ans_file_obj.name
|
|
|
|
| 686 |
qp_path, ms_path, ans_path, imprint=imprint_flag
|
| 687 |
)
|
| 688 |
|
| 689 |
+
# For Gradio file outputs: return text/paths
|
| 690 |
return alignment_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 691 |
|
| 692 |
run_button.click(
|