Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -63,7 +63,7 @@ Answer 1.i:
|
|
| 63 |
5. Apply FT where appropriate.
|
| 64 |
6. Use proper notation: M1A0, A1, etc.
|
| 65 |
7. Any lost mark: use red `<span style="color:red">M0</span>` and make Reason red.
|
| 66 |
-
---
|
| 67 |
## Output Format
|
| 68 |
Produce two sections per question/sub-question, following this structure:
|
| 69 |
## Question <id>
|
|
@@ -152,26 +152,24 @@ def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=
|
|
| 152 |
# ---------------- PARSERS ----------------
|
| 153 |
def extract_question_ids_from_qpms(text):
|
| 154 |
"""
|
| 155 |
-
|
| 156 |
-
|
| 157 |
"""
|
| 158 |
ids = []
|
| 159 |
-
# first
|
| 160 |
for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\b", text):
|
| 161 |
qid = m.group(1).strip()
|
| 162 |
ids.append(qid)
|
| 163 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
| 164 |
if not ids:
|
| 165 |
-
|
| 166 |
-
|
| 167 |
-
ids.append(qid)
|
| 168 |
-
return ids if ids else ["NA"]
|
| 169 |
|
| 170 |
def build_as_prompt_with_expected_ids(expected_ids):
|
| 171 |
-
"""
|
| 172 |
-
Build the AS transcription prompt; also useful to produce an ids_block string
|
| 173 |
-
that can be passed to the imprint mapping prompt.
|
| 174 |
-
"""
|
| 175 |
ids_block = "{\n" + "\n".join(expected_ids) + "\n}" if expected_ids else "{NA}"
|
| 176 |
prompt = f"""You are a high-quality handwritten transcription assistant.
|
| 177 |
INPUT: This PDF contains a student's handwritten answer sheet.
|
|
@@ -186,15 +184,22 @@ Question <id>
|
|
| 186 |
AS:
|
| 187 |
<transcribed answer or placeholder>
|
| 188 |
"""
|
| 189 |
-
return prompt
|
| 190 |
|
| 191 |
def extract_marks_from_grading_exact(grading_text):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 192 |
grading_json = {"grading": []}
|
|
|
|
| 193 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 194 |
for block in question_blocks[1:]:
|
|
|
|
| 195 |
first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
|
| 196 |
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
|
| 197 |
q_id = q_id_match.group(1).strip() if q_id_match else first_line.split()[0] if first_line else ""
|
|
|
|
| 198 |
awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
|
| 199 |
grading_json["grading"].append({"question": q_id, "marks_awarded": awarded})
|
| 200 |
return grading_json
|
|
@@ -202,39 +207,34 @@ def extract_marks_from_grading_exact(grading_text):
|
|
| 202 |
# ---------------- IMPRINT ----------------
|
| 203 |
def ask_gemini_for_mapping_for_page_v2(model, image_path, grading_json, question_scheme, ids_block, rows=GRID_ROWS, cols=GRID_COLS):
|
| 204 |
"""
|
| 205 |
-
Ask Gemini to map question IDs (
|
| 206 |
-
|
| 207 |
-
|
| 208 |
-
|
| 209 |
-
Also instruct the LLM about mislabelled subparts: e.g., if it sees 'ii)' above
|
| 210 |
-
'Q4.i' without a number, it may belong to Q3.ii.
|
| 211 |
"""
|
| 212 |
prompt = f"""
|
| 213 |
-
You are an exam marker. Identify where each question
|
| 214 |
The page has {rows}x{cols} grid (cells 1..{rows*cols}).
|
| 215 |
-
|
| 216 |
{ids_block}
|
| 217 |
|
| 218 |
-
Question scheme (authoritative
|
| 219 |
{question_scheme}
|
| 220 |
|
| 221 |
-
Grading JSON
|
| 222 |
{json.dumps(grading_json, indent=2)}
|
| 223 |
|
| 224 |
-
|
| 225 |
-
- Only return
|
| 226 |
-
-
|
| 227 |
-
-
|
| 228 |
-
-
|
| 229 |
-
-
|
| 230 |
-
|
| 231 |
-
|
| 232 |
-
|
| 233 |
-
|
| 234 |
-
]
|
| 235 |
-
- If no instances of an expected question appear on this page, return an empty list: [].
|
| 236 |
"""
|
| 237 |
-
#
|
| 238 |
img = Image.open(image_path)
|
| 239 |
response = model.generate_content([prompt, img])
|
| 240 |
raw_text = getattr(response, "text", None)
|
|
@@ -242,39 +242,47 @@ Instructions (IMPORTANT):
|
|
| 242 |
raw_text = response.candidates[0].content.parts[0].text
|
| 243 |
if not raw_text:
|
| 244 |
raw_text = str(response)
|
| 245 |
-
#
|
| 246 |
try:
|
| 247 |
start = raw_text.index('[')
|
| 248 |
end = raw_text.rindex(']') + 1
|
| 249 |
return json.loads(raw_text[start:end])
|
| 250 |
except Exception:
|
| 251 |
-
#
|
| 252 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 253 |
|
| 254 |
-
def imprint_marks_using_mapping_v2(pdf_path, grading_json, output_pdf, question_scheme, model,
|
| 255 |
"""
|
| 256 |
-
Imprint marks onto
|
| 257 |
-
|
| 258 |
-
-
|
| 259 |
-
|
| 260 |
-
-
|
| 261 |
-
-
|
| 262 |
-
- Writes final imprinted PDF using img2pdf with original page dimensions.
|
| 263 |
"""
|
|
|
|
| 264 |
reader = PdfReader(pdf_path)
|
| 265 |
-
#
|
| 266 |
-
|
| 267 |
-
|
| 268 |
-
|
| 269 |
-
|
| 270 |
-
|
| 271 |
-
|
| 272 |
-
|
| 273 |
-
|
|
|
|
|
|
|
|
|
|
| 274 |
annotated_page_paths = []
|
| 275 |
temp_grid_images = []
|
| 276 |
|
| 277 |
-
#
|
| 278 |
for p_index, page_img in enumerate(pages):
|
| 279 |
img = page_img.convert("RGB")
|
| 280 |
draw = ImageDraw.Draw(img)
|
|
@@ -283,89 +291,123 @@ def imprint_marks_using_mapping_v2(pdf_path, grading_json, output_pdf, question_
|
|
| 283 |
except:
|
| 284 |
font = ImageFont.load_default()
|
| 285 |
|
| 286 |
-
|
| 287 |
-
|
|
|
|
|
|
|
| 288 |
cell_num = 1
|
| 289 |
-
for r in range(
|
| 290 |
-
for c in range(
|
| 291 |
-
# center of cell
|
| 292 |
x = int(c * cell_w + cell_w / 2)
|
| 293 |
y = int(r * cell_h + cell_h / 2)
|
| 294 |
bbox = draw.textbbox((0,0), str(cell_num), font=font)
|
| 295 |
draw.text((x - (bbox[2]-bbox[0])/2, y - (bbox[3]-bbox[1])/2), str(cell_num), fill="black", font=font)
|
| 296 |
-
cell_num +=1
|
| 297 |
grid_path = f"page_{p_index+1}_grid.png"
|
| 298 |
img.save(grid_path, "PNG")
|
| 299 |
temp_grid_images.append(grid_path)
|
|
|
|
| 300 |
|
| 301 |
-
#
|
| 302 |
-
|
| 303 |
-
|
| 304 |
-
# Ask model to map each page (parallel)
|
| 305 |
mappings_per_page = {}
|
| 306 |
-
with ThreadPoolExecutor(max_workers=min(8,len(temp_grid_images))) as ex:
|
| 307 |
futures = {ex.submit(ask_gemini_for_mapping_for_page_v2, model, img_path, grading_json, question_scheme, ids_block, rows, cols): idx
|
| 308 |
-
for idx,img_path in enumerate(temp_grid_images)}
|
| 309 |
for fut in as_completed(futures):
|
| 310 |
idx = futures[fut]
|
| 311 |
try:
|
| 312 |
-
|
| 313 |
-
|
|
|
|
|
|
|
| 314 |
mappings_per_page[idx] = []
|
|
|
|
| 315 |
|
| 316 |
-
#
|
|
|
|
| 317 |
for p_index, page_img in enumerate(pages):
|
| 318 |
img_cv = np.array(page_img.convert("RGB"))
|
| 319 |
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
|
| 320 |
h, w, _ = img_cv.shape
|
| 321 |
-
cell_w_px, cell_h_px = w/cols, h/rows
|
| 322 |
mapping = mappings_per_page.get(p_index, [])
|
| 323 |
occupied = set()
|
| 324 |
for item in mapping:
|
| 325 |
qid = item.get("question")
|
| 326 |
cell_number = item.get("cell_number")
|
| 327 |
-
if qid is None or cell_number is None:
|
| 328 |
-
|
| 329 |
-
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"]==qid), [])
|
| 330 |
marks_text = ",".join(marks_list) if marks_list else "?"
|
| 331 |
-
#
|
| 332 |
-
row = (cell_number-1)//cols
|
| 333 |
-
col = (cell_number-1)%cols
|
| 334 |
-
#
|
| 335 |
candidates = []
|
| 336 |
-
if col+1<cols: candidates.append((row,col+1))
|
| 337 |
-
candidates.append((row,col))
|
| 338 |
-
if col-1>=0: candidates.append((row,col-1))
|
| 339 |
-
chosen = next(((r,c) for r,c in candidates if (r*cols+c+1) not in occupied), (row,col))
|
| 340 |
-
occupied.add(chosen[0]*cols+chosen[1]+1)
|
| 341 |
-
x_c = int((chosen[1]+0.5)*cell_w_px)
|
| 342 |
-
y_c = int((chosen[0]+0.5)*cell_h_px)
|
| 343 |
-
|
| 344 |
-
|
| 345 |
-
|
| 346 |
-
|
| 347 |
-
text_size = cv2.getTextSize(marks_text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness)[0]
|
| 348 |
-
x_draw = max(0, min(w - text_size[0], x_c - text_size[0]//2))
|
| 349 |
-
y_draw = max(text_size[1], min(h - 1, y_c + text_size[1]//2))
|
| 350 |
-
cv2.putText(img_cv, marks_text, (x_draw,y_draw), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0,0,255), thickness)
|
| 351 |
-
|
| 352 |
-
# optional: if mapping includes "note", draw a small 'i' icon nearby
|
| 353 |
-
if item.get("note"):
|
| 354 |
-
note_text = "i"
|
| 355 |
-
ns = cv2.getTextSize(note_text, cv2.FONT_HERSHEY_SIMPLEX, font_scale*0.8, max(1,int(thickness/2)))[0]
|
| 356 |
-
nx = max(0, x_draw + text_size[0] + 4)
|
| 357 |
-
ny = max(ns[1], y_draw)
|
| 358 |
-
cv2.putText(img_cv, note_text, (nx, ny), cv2.FONT_HERSHEY_SIMPLEX, font_scale*0.8, (0,0,0), max(1,int(thickness/2)))
|
| 359 |
-
|
| 360 |
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 361 |
cv2.imwrite(annotated_path, img_cv)
|
| 362 |
annotated_page_paths.append(annotated_path)
|
| 363 |
-
|
| 364 |
-
|
| 365 |
-
with
|
| 366 |
-
|
| 367 |
-
|
| 368 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 369 |
|
| 370 |
# ---------------- PIPELINE ----------------
|
| 371 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
@@ -373,7 +415,7 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 373 |
ms_path = compress_pdf(ms_path)
|
| 374 |
ans_path = compress_pdf(ans_path)
|
| 375 |
|
| 376 |
-
merged_qpms_path = os.path.splitext(qp_path)[0]+"_merged_qp_ms.pdf"
|
| 377 |
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
| 378 |
|
| 379 |
merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
|
|
@@ -382,33 +424,42 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 382 |
model = create_model()
|
| 383 |
|
| 384 |
qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
|
|
|
|
| 385 |
qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
|
|
|
|
| 386 |
|
| 387 |
-
# extract question ids (no deduplication)
|
| 388 |
extracted_ids = extract_question_ids_from_qpms(qpms_text)
|
|
|
|
| 389 |
|
| 390 |
-
|
| 391 |
-
|
| 392 |
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
|
|
|
|
| 393 |
|
| 394 |
grading_input = (
|
| 395 |
-
"=== QP+MS TRANSCRIPT BEGIN ===\n"+qpms_text+
|
| 396 |
-
"\n=== QP+MS TRANSCRIPT END ===\n\n"+
|
| 397 |
-
"=== ANSWER SHEET TRANSCRIPT BEGIN ===\n"+as_text+
|
| 398 |
"\n=== ANSWER SHEET TRANSCRIPT END ===\n"
|
| 399 |
)
|
| 400 |
grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
|
| 401 |
-
|
|
|
|
|
|
|
| 402 |
|
| 403 |
-
grading_pdf_path = save_as_pdf(grading_text, os.path.splitext(os.path.basename(ans_path))[0]+"_graded.pdf")
|
| 404 |
grading_json = extract_marks_from_grading_exact(grading_text)
|
|
|
|
| 405 |
|
| 406 |
imprinted_pdf_path = None
|
| 407 |
if imprint:
|
| 408 |
question_scheme = qpms_text
|
| 409 |
-
imprinted_pdf_path = os.path.splitext(os.path.basename(ans_path))[0]+"_imprinted.pdf"
|
| 410 |
-
#
|
| 411 |
-
|
|
|
|
|
|
|
|
|
|
| 412 |
|
| 413 |
return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 414 |
|
|
|
|
| 63 |
5. Apply FT where appropriate.
|
| 64 |
6. Use proper notation: M1A0, A1, etc.
|
| 65 |
7. Any lost mark: use red `<span style="color:red">M0</span>` and make Reason red.
|
| 66 |
+
---
|
| 67 |
## Output Format
|
| 68 |
Produce two sections per question/sub-question, following this structure:
|
| 69 |
## Question <id>
|
|
|
|
| 152 |
# ---------------- PARSERS ----------------
|
| 153 |
def extract_question_ids_from_qpms(text):
|
| 154 |
"""
|
| 155 |
+
Extract question IDs in the order they appear.
|
| 156 |
+
NOTE: do NOT deduplicate — keep multiple occurrences as they are in the transcript.
|
| 157 |
"""
|
| 158 |
ids = []
|
| 159 |
+
# first attempt: explicit "Question :" lines
|
| 160 |
for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\b", text):
|
| 161 |
qid = m.group(1).strip()
|
| 162 |
ids.append(qid)
|
| 163 |
+
# fallback: lines starting with numbering like "1." or "2)" etc.
|
| 164 |
+
for m in re.finditer(r"(?m)^\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\s*[\.\):\-]\s", text):
|
| 165 |
+
qid = m.group(1).strip()
|
| 166 |
+
ids.append(qid)
|
| 167 |
+
# If nothing found, record "NA" once
|
| 168 |
if not ids:
|
| 169 |
+
ids = ["NA"]
|
| 170 |
+
return ids
|
|
|
|
|
|
|
| 171 |
|
| 172 |
def build_as_prompt_with_expected_ids(expected_ids):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 173 |
ids_block = "{\n" + "\n".join(expected_ids) + "\n}" if expected_ids else "{NA}"
|
| 174 |
prompt = f"""You are a high-quality handwritten transcription assistant.
|
| 175 |
INPUT: This PDF contains a student's handwritten answer sheet.
|
|
|
|
| 184 |
AS:
|
| 185 |
<transcribed answer or placeholder>
|
| 186 |
"""
|
| 187 |
+
return prompt
|
| 188 |
|
| 189 |
def extract_marks_from_grading_exact(grading_text):
|
| 190 |
+
"""
|
| 191 |
+
Extract grading marks in the order they appear and keep duplicates.
|
| 192 |
+
Output JSON with grading list preserving sequence (no deduplication).
|
| 193 |
+
"""
|
| 194 |
grading_json = {"grading": []}
|
| 195 |
+
# split by question blocks by heading "## Question "
|
| 196 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 197 |
for block in question_blocks[1:]:
|
| 198 |
+
# try to get the ID from the first line (robust)
|
| 199 |
first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
|
| 200 |
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
|
| 201 |
q_id = q_id_match.group(1).strip() if q_id_match else first_line.split()[0] if first_line else ""
|
| 202 |
+
# find all mark tokens in order and preserve duplicates
|
| 203 |
awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
|
| 204 |
grading_json["grading"].append({"question": q_id, "marks_awarded": awarded})
|
| 205 |
return grading_json
|
|
|
|
| 207 |
# ---------------- IMPRINT ----------------
|
| 208 |
def ask_gemini_for_mapping_for_page_v2(model, image_path, grading_json, question_scheme, ids_block, rows=GRID_ROWS, cols=GRID_COLS):
|
| 209 |
"""
|
| 210 |
+
Ask Gemini to map expected question IDs (ids_block) to grid cells on a single page image.
|
| 211 |
+
The prompt explicitly passes the expected IDs block and instructs the model to interpret
|
| 212 |
+
mislabelled steps (e.g., ii) above Q4 -> interpret as previous question's subpart).
|
|
|
|
|
|
|
|
|
|
| 213 |
"""
|
| 214 |
prompt = f"""
|
| 215 |
+
You are an exam marker. Identify where each question begins on this page.
|
| 216 |
The page has {rows}x{cols} grid (cells 1..{rows*cols}).
|
| 217 |
+
These are the QUESTIONS YOU MUST SPOT (expected IDs):
|
| 218 |
{ids_block}
|
| 219 |
|
| 220 |
+
Question scheme (authoritative full QP+MS text):
|
| 221 |
{question_scheme}
|
| 222 |
|
| 223 |
+
Grading JSON:
|
| 224 |
{json.dumps(grading_json, indent=2)}
|
| 225 |
|
| 226 |
+
Important instructions:
|
| 227 |
+
- Only return cell numbers for the expected question IDs listed above.
|
| 228 |
+
- If you detect a fragment like "ii)" above a later question heading (for example: you find "Q4" on the page and above it you see "ii)"), interpret that fragment as belonging to the previous question (e.g., "Q3.ii"). In other words: if a subpart appears spatially above a heading for the next question, reassign it to the previous question's appropriate subpart.
|
| 229 |
+
- Return the earliest cell number where the student's first written step (the start of that answer) appears.
|
| 230 |
+
- Prefer a blank cell immediately to the RIGHT of detected starting cell for placing marks; if not available, prefer LEFT.
|
| 231 |
+
- Avoid placing marks inside another question's answer area where possible.
|
| 232 |
+
- Only include questions that actually appear on this page.
|
| 233 |
+
|
| 234 |
+
Return JSON only in the format:
|
| 235 |
+
[{"question":"1.a","cell_number":15}, ...]
|
|
|
|
|
|
|
| 236 |
"""
|
| 237 |
+
# Attach image plus prompt to Gemini
|
| 238 |
img = Image.open(image_path)
|
| 239 |
response = model.generate_content([prompt, img])
|
| 240 |
raw_text = getattr(response, "text", None)
|
|
|
|
| 242 |
raw_text = response.candidates[0].content.parts[0].text
|
| 243 |
if not raw_text:
|
| 244 |
raw_text = str(response)
|
| 245 |
+
# attempt to parse JSON array from model output
|
| 246 |
try:
|
| 247 |
start = raw_text.index('[')
|
| 248 |
end = raw_text.rindex(']') + 1
|
| 249 |
return json.loads(raw_text[start:end])
|
| 250 |
except Exception:
|
| 251 |
+
# Best-effort: try to extract lines like {"question":"1.a","cell_number":15}
|
| 252 |
+
try:
|
| 253 |
+
lines = re.findall(r'\{[^}]*\}', raw_text)
|
| 254 |
+
parsed = [json.loads(l) for l in lines]
|
| 255 |
+
return parsed
|
| 256 |
+
except Exception:
|
| 257 |
+
return []
|
| 258 |
|
| 259 |
+
def imprint_marks_using_mapping_v2(pdf_path, grading_json, output_pdf, question_scheme, model, ids_block, rows=GRID_ROWS, cols=GRID_COLS):
|
| 260 |
"""
|
| 261 |
+
Imprint marks onto a PDF using mapping returned by Gemini.
|
| 262 |
+
Key changes:
|
| 263 |
+
- Use the PDF's original mediabox (width_pt, height_pt) and render pages at 72 DPI,
|
| 264 |
+
so that 1 pixel == 1 point and no scaling occurs.
|
| 265 |
+
- Create annotated images at native page size and recreate PDF using those exact dimensions.
|
| 266 |
+
- Print progress/log steps.
|
|
|
|
| 267 |
"""
|
| 268 |
+
print("[IMPRINT] Reading PDF and preparing page sizes...")
|
| 269 |
reader = PdfReader(pdf_path)
|
| 270 |
+
# Use first page mediabox as canonical (works if pages share same size). For multi-size PDFs,
|
| 271 |
+
# we will read each page size when processing that page.
|
| 272 |
+
pages_info = []
|
| 273 |
+
for p_index, p in enumerate(reader.pages):
|
| 274 |
+
width_pt = float(p.mediabox.width)
|
| 275 |
+
height_pt = float(p.mediabox.height)
|
| 276 |
+
pages_info.append({"index": p_index, "width_pt": width_pt, "height_pt": height_pt})
|
| 277 |
+
|
| 278 |
+
# Render pages at 72 DPI so pixel dimensions == points (1 pt = 1 px).
|
| 279 |
+
# This avoids any rescaling.
|
| 280 |
+
print("[IMPRINT] Converting PDF pages to images at 72 DPI (1 px == 1 point)...")
|
| 281 |
+
pages = convert_from_path(pdf_path, dpi=72)
|
| 282 |
annotated_page_paths = []
|
| 283 |
temp_grid_images = []
|
| 284 |
|
| 285 |
+
# Create grid overlays (for debugging/visual confirmation) and save images used for mapping
|
| 286 |
for p_index, page_img in enumerate(pages):
|
| 287 |
img = page_img.convert("RGB")
|
| 288 |
draw = ImageDraw.Draw(img)
|
|
|
|
| 291 |
except:
|
| 292 |
font = ImageFont.load_default()
|
| 293 |
|
| 294 |
+
cols_local = cols
|
| 295 |
+
rows_local = rows
|
| 296 |
+
cell_w = img.width / cols_local
|
| 297 |
+
cell_h = img.height / rows_local
|
| 298 |
cell_num = 1
|
| 299 |
+
for r in range(rows_local):
|
| 300 |
+
for c in range(cols_local):
|
|
|
|
| 301 |
x = int(c * cell_w + cell_w / 2)
|
| 302 |
y = int(r * cell_h + cell_h / 2)
|
| 303 |
bbox = draw.textbbox((0,0), str(cell_num), font=font)
|
| 304 |
draw.text((x - (bbox[2]-bbox[0])/2, y - (bbox[3]-bbox[1])/2), str(cell_num), fill="black", font=font)
|
| 305 |
+
cell_num += 1
|
| 306 |
grid_path = f"page_{p_index+1}_grid.png"
|
| 307 |
img.save(grid_path, "PNG")
|
| 308 |
temp_grid_images.append(grid_path)
|
| 309 |
+
print(f"[IMPRINT] Grid image saved: {grid_path} (pixels: {img.width}x{img.height})")
|
| 310 |
|
| 311 |
+
# Ask Gemini (concurrently) to map question starts to cells
|
| 312 |
+
print("[IMPRINT] Sending grid images to Gemini to obtain cell mappings...")
|
|
|
|
|
|
|
| 313 |
mappings_per_page = {}
|
| 314 |
+
with ThreadPoolExecutor(max_workers=min(8, len(temp_grid_images))) as ex:
|
| 315 |
futures = {ex.submit(ask_gemini_for_mapping_for_page_v2, model, img_path, grading_json, question_scheme, ids_block, rows, cols): idx
|
| 316 |
+
for idx, img_path in enumerate(temp_grid_images)}
|
| 317 |
for fut in as_completed(futures):
|
| 318 |
idx = futures[fut]
|
| 319 |
try:
|
| 320 |
+
mapping_result = fut.result()
|
| 321 |
+
mappings_per_page[idx] = mapping_result
|
| 322 |
+
print(f"[IMPRINT] Mapping received for page {idx+1}: {mapping_result}")
|
| 323 |
+
except Exception as e:
|
| 324 |
mappings_per_page[idx] = []
|
| 325 |
+
print(f"[IMPRINT] Mapping failed for page {idx+1}: {e}")
|
| 326 |
|
| 327 |
+
# Now annotate pages with marks text using the mapping results
|
| 328 |
+
print("[IMPRINT] Annotating pages with marks...")
|
| 329 |
for p_index, page_img in enumerate(pages):
|
| 330 |
img_cv = np.array(page_img.convert("RGB"))
|
| 331 |
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
|
| 332 |
h, w, _ = img_cv.shape
|
| 333 |
+
cell_w_px, cell_h_px = w / cols, h / rows
|
| 334 |
mapping = mappings_per_page.get(p_index, [])
|
| 335 |
occupied = set()
|
| 336 |
for item in mapping:
|
| 337 |
qid = item.get("question")
|
| 338 |
cell_number = item.get("cell_number")
|
| 339 |
+
if qid is None or cell_number is None:
|
| 340 |
+
continue
|
| 341 |
+
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == qid), [])
|
| 342 |
marks_text = ",".join(marks_list) if marks_list else "?"
|
| 343 |
+
# compute canonical cell row/col
|
| 344 |
+
row = (cell_number - 1) // cols
|
| 345 |
+
col = (cell_number - 1) % cols
|
| 346 |
+
# candidate placements (prefer right, then same, then left)
|
| 347 |
candidates = []
|
| 348 |
+
if col + 1 < cols: candidates.append((row, col + 1))
|
| 349 |
+
candidates.append((row, col))
|
| 350 |
+
if col - 1 >= 0: candidates.append((row, col - 1))
|
| 351 |
+
chosen = next(((r, c) for r, c in candidates if (r * cols + c + 1) not in occupied), (row, col))
|
| 352 |
+
occupied.add(chosen[0] * cols + chosen[1] + 1)
|
| 353 |
+
x_c = int((chosen[1] + 0.5) * cell_w_px)
|
| 354 |
+
y_c = int((chosen[0] + 0.5) * cell_h_px)
|
| 355 |
+
font_scale = max(0.6, min(1.6, cell_h_px / 60))
|
| 356 |
+
thickness = max(1, int(font_scale * 2))
|
| 357 |
+
cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), thickness)
|
| 358 |
+
print(f"[IMPRINT] Placed marks '{marks_text}' for '{qid}' at page {p_index+1} cell {cell_number} -> pixel ({x_c},{y_c})")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 359 |
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 360 |
cv2.imwrite(annotated_path, img_cv)
|
| 361 |
annotated_page_paths.append(annotated_path)
|
| 362 |
+
print(f"[IMPRINT] Annotated page saved: {annotated_path}")
|
| 363 |
+
|
| 364 |
+
# Recreate PDF using img2pdf with the original page dimensions (points).
|
| 365 |
+
# Since we rendered at 72 DPI, pixels == points and using layout_fun with (width_pt, height_pt) will preserve size.
|
| 366 |
+
print("[IMPRINT] Recreating PDF from annotated pages with original page sizes...")
|
| 367 |
+
layout_sizes = []
|
| 368 |
+
for p_info in pages_info:
|
| 369 |
+
layout_sizes.append((p_info["width_pt"], p_info["height_pt"]))
|
| 370 |
+
# If every page has same mediabox, img2pdf.get_layout_fun can be given that size; otherwise fallback to a per-image function.
|
| 371 |
+
try:
|
| 372 |
+
# We will use the mediabox of the first page for layout function if single size, else create per-image layout
|
| 373 |
+
unique_sizes = { (p["width_pt"], p["height_pt"]) for p in pages_info }
|
| 374 |
+
if len(unique_sizes) == 1:
|
| 375 |
+
w_pt, h_pt = pages_info[0]["width_pt"], pages_info[0]["height_pt"]
|
| 376 |
+
with open(output_pdf, "wb") as f:
|
| 377 |
+
f.write(img2pdf.convert(annotated_page_paths, layout_fun=img2pdf.get_layout_fun((w_pt, h_pt))))
|
| 378 |
+
else:
|
| 379 |
+
# per-page layout: build a custom layout function for each image based on index
|
| 380 |
+
# img2pdf allows layout_fun that takes (img_width_px, img_height_px, px_density) but easier approach:
|
| 381 |
+
# create PDF by converting each annotated PNG individually to single-page PDF with proper size and then merge
|
| 382 |
+
per_page_pdfs = []
|
| 383 |
+
for idx, ann_path in enumerate(annotated_page_paths):
|
| 384 |
+
w_pt = pages_info[idx]["width_pt"]
|
| 385 |
+
h_pt = pages_info[idx]["height_pt"]
|
| 386 |
+
single_pdf = f"single_{idx+1}.pdf"
|
| 387 |
+
with open(single_pdf, "wb") as f:
|
| 388 |
+
f.write(img2pdf.convert(ann_path, layout_fun=img2pdf.get_layout_fun((w_pt, h_pt))))
|
| 389 |
+
per_page_pdfs.append(single_pdf)
|
| 390 |
+
# merge them
|
| 391 |
+
merge_pdfs(per_page_pdfs, output_pdf)
|
| 392 |
+
# cleanup single_page temp pdfs
|
| 393 |
+
for p in per_page_pdfs:
|
| 394 |
+
try:
|
| 395 |
+
os.remove(p)
|
| 396 |
+
except:
|
| 397 |
+
pass
|
| 398 |
+
except Exception as e:
|
| 399 |
+
print(f"[IMPRINT] Failed to create imprinted PDF with original sizes: {e}")
|
| 400 |
+
# fallback: create naive pdf (may be resized)
|
| 401 |
+
with open(output_pdf, "wb") as f:
|
| 402 |
+
f.write(img2pdf.convert(annotated_page_paths))
|
| 403 |
+
print(f"[IMPRINT] Imprinted PDF created: {output_pdf}")
|
| 404 |
+
|
| 405 |
+
# Optionally compress result
|
| 406 |
+
compressed = compress_pdf(output_pdf)
|
| 407 |
+
if compressed != output_pdf:
|
| 408 |
+
print(f"[IMPRINT] Compressed imprinted PDF saved as: {compressed}")
|
| 409 |
+
return compressed
|
| 410 |
+
return output_pdf
|
| 411 |
|
| 412 |
# ---------------- PIPELINE ----------------
|
| 413 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
|
|
| 415 |
ms_path = compress_pdf(ms_path)
|
| 416 |
ans_path = compress_pdf(ans_path)
|
| 417 |
|
| 418 |
+
merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
|
| 419 |
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
| 420 |
|
| 421 |
merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
|
|
|
|
| 424 |
model = create_model()
|
| 425 |
|
| 426 |
qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
|
| 427 |
+
print("[STEP] Requesting QP+MS transcription from Gemini...")
|
| 428 |
qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
|
| 429 |
+
print("[STEP] QP+MS transcription received.")
|
| 430 |
|
|
|
|
| 431 |
extracted_ids = extract_question_ids_from_qpms(qpms_text)
|
| 432 |
+
print(f"[STEP] Extracted question IDs (in order, duplicates preserved): {extracted_ids}")
|
| 433 |
|
| 434 |
+
as_prompt = build_as_prompt_with_expected_ids(extracted_ids)
|
| 435 |
+
print("[STEP] Requesting AS transcription from Gemini (using expected IDs block)...")
|
| 436 |
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
|
| 437 |
+
print("[STEP] AS transcription received.")
|
| 438 |
|
| 439 |
grading_input = (
|
| 440 |
+
"=== QP+MS TRANSCRIPT BEGIN ===\n" + qpms_text +
|
| 441 |
+
"\n=== QP+MS TRANSCRIPT END ===\n\n" +
|
| 442 |
+
"=== ANSWER SHEET TRANSCRIPT BEGIN ===\n" + as_text +
|
| 443 |
"\n=== ANSWER SHEET TRANSCRIPT END ===\n"
|
| 444 |
)
|
| 445 |
grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
|
| 446 |
+
print("[STEP] Sending transcripts to Gemini for grading...")
|
| 447 |
+
grading_text = gemini_generate_content(model, grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input)
|
| 448 |
+
print("[STEP] Grading received from Gemini.")
|
| 449 |
|
| 450 |
+
grading_pdf_path = save_as_pdf(grading_text, os.path.splitext(os.path.basename(ans_path))[0] + "_graded.pdf")
|
| 451 |
grading_json = extract_marks_from_grading_exact(grading_text)
|
| 452 |
+
print(f"[STEP] Extracted grading JSON (duplicates preserved): {json.dumps(grading_json, indent=2)}")
|
| 453 |
|
| 454 |
imprinted_pdf_path = None
|
| 455 |
if imprint:
|
| 456 |
question_scheme = qpms_text
|
| 457 |
+
imprinted_pdf_path = os.path.splitext(os.path.basename(ans_path))[0] + "_imprinted.pdf"
|
| 458 |
+
# Build ids_block to pass to ask_gemini_for_mapping_for_page_v2
|
| 459 |
+
ids_block = "{\n" + "\n".join(extracted_ids) + "\n}"
|
| 460 |
+
print("[IMPRINT] Starting imprinting with ids_block and question scheme...")
|
| 461 |
+
imprinted_pdf_path = imprint_marks_using_mapping_v2(ans_path, grading_json, imprinted_pdf_path, question_scheme, model, ids_block)
|
| 462 |
+
print(f"[IMPRINT] Completed imprinting. File: {imprinted_pdf_path}")
|
| 463 |
|
| 464 |
return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 465 |
|