Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -151,23 +151,13 @@ def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=
|
|
| 151 |
|
| 152 |
# ---------------- PARSERS ----------------
|
| 153 |
def extract_question_ids_from_qpms(text):
|
| 154 |
-
"""
|
| 155 |
-
Extract question IDs in the order they appear.
|
| 156 |
-
NOTE: do NOT deduplicate — keep multiple occurrences as they are in the transcript.
|
| 157 |
-
"""
|
| 158 |
ids = []
|
| 159 |
-
# first attempt: explicit "Question :" lines
|
| 160 |
for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\b", text):
|
| 161 |
-
|
| 162 |
-
ids.append(qid)
|
| 163 |
-
# fallback: lines starting with numbering like "1." or "2)" etc.
|
| 164 |
-
for m in re.finditer(r"(?m)^\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\s*[\.\):\-]\s", text):
|
| 165 |
-
qid = m.group(1).strip()
|
| 166 |
-
ids.append(qid)
|
| 167 |
-
# If nothing found, record "NA" once
|
| 168 |
if not ids:
|
| 169 |
-
|
| 170 |
-
|
|
|
|
| 171 |
|
| 172 |
def build_as_prompt_with_expected_ids(expected_ids):
|
| 173 |
ids_block = "{\n" + "\n".join(expected_ids) + "\n}" if expected_ids else "{NA}"
|
|
@@ -187,54 +177,41 @@ AS:
|
|
| 187 |
return prompt
|
| 188 |
|
| 189 |
def extract_marks_from_grading_exact(grading_text):
|
| 190 |
-
"""
|
| 191 |
-
Extract grading marks in the order they appear and keep duplicates.
|
| 192 |
-
Output JSON with grading list preserving sequence (no deduplication).
|
| 193 |
-
"""
|
| 194 |
grading_json = {"grading": []}
|
| 195 |
-
# split by question blocks by heading "## Question "
|
| 196 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 197 |
for block in question_blocks[1:]:
|
| 198 |
-
# try to get the ID from the first line (robust)
|
| 199 |
first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
|
| 200 |
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
|
| 201 |
q_id = q_id_match.group(1).strip() if q_id_match else first_line.split()[0] if first_line else ""
|
| 202 |
-
# find all mark tokens in order and preserve duplicates
|
| 203 |
awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
|
| 204 |
grading_json["grading"].append({"question": q_id, "marks_awarded": awarded})
|
| 205 |
return grading_json
|
| 206 |
|
| 207 |
# ---------------- IMPRINT ----------------
|
| 208 |
-
def ask_gemini_for_mapping_for_page_v2(model, image_path, grading_json, question_scheme,
|
| 209 |
-
"""
|
| 210 |
-
Ask Gemini to map expected question IDs (ids_block) to grid cells on a single page image.
|
| 211 |
-
The prompt explicitly passes the expected IDs block and instructs the model to interpret
|
| 212 |
-
mislabelled steps (e.g., ii) above Q4 -> interpret as previous question's subpart).
|
| 213 |
-
"""
|
| 214 |
prompt = f"""
|
| 215 |
You are an exam marker. Identify where each question begins on this page.
|
| 216 |
The page has {rows}x{cols} grid (cells 1..{rows*cols}).
|
| 217 |
-
|
| 218 |
-
{ids_block}
|
| 219 |
-
|
| 220 |
-
Question scheme (authoritative full QP+MS text):
|
| 221 |
{question_scheme}
|
| 222 |
|
|
|
|
|
|
|
|
|
|
| 223 |
Grading JSON:
|
| 224 |
{json.dumps(grading_json, indent=2)}
|
| 225 |
|
| 226 |
-
|
| 227 |
-
-
|
| 228 |
-
-
|
| 229 |
-
-
|
| 230 |
-
-
|
| 231 |
-
-
|
| 232 |
-
-
|
| 233 |
-
|
| 234 |
-
|
| 235 |
-
[{"question":"1.a","cell_number":15}, ...]
|
| 236 |
"""
|
| 237 |
-
# Attach image plus prompt to Gemini
|
| 238 |
img = Image.open(image_path)
|
| 239 |
response = model.generate_content([prompt, img])
|
| 240 |
raw_text = getattr(response, "text", None)
|
|
@@ -242,47 +219,20 @@ Return JSON only in the format:
|
|
| 242 |
raw_text = response.candidates[0].content.parts[0].text
|
| 243 |
if not raw_text:
|
| 244 |
raw_text = str(response)
|
| 245 |
-
# attempt to parse JSON array from model output
|
| 246 |
try:
|
| 247 |
start = raw_text.index('[')
|
| 248 |
end = raw_text.rindex(']') + 1
|
| 249 |
return json.loads(raw_text[start:end])
|
| 250 |
except Exception:
|
| 251 |
-
|
| 252 |
-
|
| 253 |
-
|
| 254 |
-
parsed = [json.loads(l) for l in lines]
|
| 255 |
-
return parsed
|
| 256 |
-
except Exception:
|
| 257 |
-
return []
|
| 258 |
-
|
| 259 |
-
def imprint_marks_using_mapping_v2(pdf_path, grading_json, output_pdf, question_scheme, model, ids_block, rows=GRID_ROWS, cols=GRID_COLS):
|
| 260 |
-
"""
|
| 261 |
-
Imprint marks onto a PDF using mapping returned by Gemini.
|
| 262 |
-
Key changes:
|
| 263 |
-
- Use the PDF's original mediabox (width_pt, height_pt) and render pages at 72 DPI,
|
| 264 |
-
so that 1 pixel == 1 point and no scaling occurs.
|
| 265 |
-
- Create annotated images at native page size and recreate PDF using those exact dimensions.
|
| 266 |
-
- Print progress/log steps.
|
| 267 |
-
"""
|
| 268 |
-
print("[IMPRINT] Reading PDF and preparing page sizes...")
|
| 269 |
reader = PdfReader(pdf_path)
|
| 270 |
-
# Use first page mediabox as canonical (works if pages share same size). For multi-size PDFs,
|
| 271 |
-
# we will read each page size when processing that page.
|
| 272 |
-
pages_info = []
|
| 273 |
-
for p_index, p in enumerate(reader.pages):
|
| 274 |
-
width_pt = float(p.mediabox.width)
|
| 275 |
-
height_pt = float(p.mediabox.height)
|
| 276 |
-
pages_info.append({"index": p_index, "width_pt": width_pt, "height_pt": height_pt})
|
| 277 |
-
|
| 278 |
-
# Render pages at 72 DPI so pixel dimensions == points (1 pt = 1 px).
|
| 279 |
-
# This avoids any rescaling.
|
| 280 |
-
print("[IMPRINT] Converting PDF pages to images at 72 DPI (1 px == 1 point)...")
|
| 281 |
-
pages = convert_from_path(pdf_path, dpi=72)
|
| 282 |
annotated_page_paths = []
|
|
|
|
|
|
|
| 283 |
temp_grid_images = []
|
| 284 |
|
| 285 |
-
# Create grid overlays (for debugging/visual confirmation) and save images used for mapping
|
| 286 |
for p_index, page_img in enumerate(pages):
|
| 287 |
img = page_img.convert("RGB")
|
| 288 |
draw = ImageDraw.Draw(img)
|
|
@@ -291,123 +241,71 @@ def imprint_marks_using_mapping_v2(pdf_path, grading_json, output_pdf, question_
|
|
| 291 |
except:
|
| 292 |
font = ImageFont.load_default()
|
| 293 |
|
| 294 |
-
|
| 295 |
-
|
| 296 |
-
cell_w = img.width / cols_local
|
| 297 |
-
cell_h = img.height / rows_local
|
| 298 |
cell_num = 1
|
| 299 |
-
for r in range(
|
| 300 |
-
for c in range(
|
| 301 |
x = int(c * cell_w + cell_w / 2)
|
| 302 |
y = int(r * cell_h + cell_h / 2)
|
| 303 |
bbox = draw.textbbox((0,0), str(cell_num), font=font)
|
| 304 |
draw.text((x - (bbox[2]-bbox[0])/2, y - (bbox[3]-bbox[1])/2), str(cell_num), fill="black", font=font)
|
| 305 |
-
cell_num +=
|
| 306 |
grid_path = f"page_{p_index+1}_grid.png"
|
| 307 |
img.save(grid_path, "PNG")
|
| 308 |
temp_grid_images.append(grid_path)
|
| 309 |
-
print(f"[IMPRINT] Grid image saved: {grid_path} (pixels: {img.width}x{img.height})")
|
| 310 |
|
| 311 |
-
# Ask Gemini (concurrently) to map question starts to cells
|
| 312 |
-
print("[IMPRINT] Sending grid images to Gemini to obtain cell mappings...")
|
| 313 |
mappings_per_page = {}
|
| 314 |
-
with ThreadPoolExecutor(max_workers=min(8,
|
| 315 |
-
futures = {
|
| 316 |
-
|
|
|
|
|
|
|
|
|
|
| 317 |
for fut in as_completed(futures):
|
| 318 |
idx = futures[fut]
|
| 319 |
try:
|
| 320 |
mapping_result = fut.result()
|
| 321 |
mappings_per_page[idx] = mapping_result
|
| 322 |
-
print(f"[IMPRINT] Mapping received for page {idx+1}: {mapping_result}")
|
| 323 |
except Exception as e:
|
| 324 |
mappings_per_page[idx] = []
|
| 325 |
-
print(f"[IMPRINT] Mapping failed for page {idx+1}: {e}")
|
| 326 |
|
| 327 |
-
# Now annotate pages with marks text using the mapping results
|
| 328 |
-
print("[IMPRINT] Annotating pages with marks...")
|
| 329 |
for p_index, page_img in enumerate(pages):
|
| 330 |
img_cv = np.array(page_img.convert("RGB"))
|
| 331 |
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
|
| 332 |
h, w, _ = img_cv.shape
|
| 333 |
-
cell_w_px, cell_h_px = w
|
| 334 |
mapping = mappings_per_page.get(p_index, [])
|
| 335 |
occupied = set()
|
| 336 |
for item in mapping:
|
| 337 |
qid = item.get("question")
|
| 338 |
cell_number = item.get("cell_number")
|
| 339 |
-
if qid is None or cell_number is None:
|
| 340 |
-
|
| 341 |
-
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == qid), [])
|
| 342 |
marks_text = ",".join(marks_list) if marks_list else "?"
|
| 343 |
-
|
| 344 |
-
|
| 345 |
-
col = (cell_number - 1) % cols
|
| 346 |
-
# candidate placements (prefer right, then same, then left)
|
| 347 |
candidates = []
|
| 348 |
-
if col
|
| 349 |
-
candidates.append((row,
|
| 350 |
-
if col
|
| 351 |
-
chosen = next(((r,
|
| 352 |
-
occupied.add(chosen[0]
|
| 353 |
-
x_c = int((chosen[1]
|
| 354 |
-
y_c = int((chosen[0]
|
| 355 |
-
font_scale = max(0.6,
|
| 356 |
-
thickness = max(1,
|
| 357 |
-
cv2.putText(img_cv, marks_text, (x_c,
|
| 358 |
-
print(f"[IMPRINT] Placed marks '{marks_text}' for '{qid}' at page {p_index+1} cell {cell_number} -> pixel ({x_c},{y_c})")
|
| 359 |
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 360 |
cv2.imwrite(annotated_path, img_cv)
|
| 361 |
annotated_page_paths.append(annotated_path)
|
| 362 |
-
|
| 363 |
-
|
| 364 |
-
|
| 365 |
-
|
| 366 |
-
|
| 367 |
-
layout_sizes = []
|
| 368 |
-
for p_info in pages_info:
|
| 369 |
-
layout_sizes.append((p_info["width_pt"], p_info["height_pt"]))
|
| 370 |
-
# If every page has same mediabox, img2pdf.get_layout_fun can be given that size; otherwise fallback to a per-image function.
|
| 371 |
-
try:
|
| 372 |
-
# We will use the mediabox of the first page for layout function if single size, else create per-image layout
|
| 373 |
-
unique_sizes = { (p["width_pt"], p["height_pt"]) for p in pages_info }
|
| 374 |
-
if len(unique_sizes) == 1:
|
| 375 |
-
w_pt, h_pt = pages_info[0]["width_pt"], pages_info[0]["height_pt"]
|
| 376 |
-
with open(output_pdf, "wb") as f:
|
| 377 |
-
f.write(img2pdf.convert(annotated_page_paths, layout_fun=img2pdf.get_layout_fun((w_pt, h_pt))))
|
| 378 |
-
else:
|
| 379 |
-
# per-page layout: build a custom layout function for each image based on index
|
| 380 |
-
# img2pdf allows layout_fun that takes (img_width_px, img_height_px, px_density) but easier approach:
|
| 381 |
-
# create PDF by converting each annotated PNG individually to single-page PDF with proper size and then merge
|
| 382 |
-
per_page_pdfs = []
|
| 383 |
-
for idx, ann_path in enumerate(annotated_page_paths):
|
| 384 |
-
w_pt = pages_info[idx]["width_pt"]
|
| 385 |
-
h_pt = pages_info[idx]["height_pt"]
|
| 386 |
-
single_pdf = f"single_{idx+1}.pdf"
|
| 387 |
-
with open(single_pdf, "wb") as f:
|
| 388 |
-
f.write(img2pdf.convert(ann_path, layout_fun=img2pdf.get_layout_fun((w_pt, h_pt))))
|
| 389 |
-
per_page_pdfs.append(single_pdf)
|
| 390 |
-
# merge them
|
| 391 |
-
merge_pdfs(per_page_pdfs, output_pdf)
|
| 392 |
-
# cleanup single_page temp pdfs
|
| 393 |
-
for p in per_page_pdfs:
|
| 394 |
-
try:
|
| 395 |
-
os.remove(p)
|
| 396 |
-
except:
|
| 397 |
-
pass
|
| 398 |
-
except Exception as e:
|
| 399 |
-
print(f"[IMPRINT] Failed to create imprinted PDF with original sizes: {e}")
|
| 400 |
-
# fallback: create naive pdf (may be resized)
|
| 401 |
-
with open(output_pdf, "wb") as f:
|
| 402 |
-
f.write(img2pdf.convert(annotated_page_paths))
|
| 403 |
-
print(f"[IMPRINT] Imprinted PDF created: {output_pdf}")
|
| 404 |
-
|
| 405 |
-
# Optionally compress result
|
| 406 |
-
compressed = compress_pdf(output_pdf)
|
| 407 |
-
if compressed != output_pdf:
|
| 408 |
-
print(f"[IMPRINT] Compressed imprinted PDF saved as: {compressed}")
|
| 409 |
-
return compressed
|
| 410 |
-
return output_pdf
|
| 411 |
|
| 412 |
# ---------------- PIPELINE ----------------
|
| 413 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
@@ -415,7 +313,7 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 415 |
ms_path = compress_pdf(ms_path)
|
| 416 |
ans_path = compress_pdf(ans_path)
|
| 417 |
|
| 418 |
-
merged_qpms_path = os.path.splitext(qp_path)[0]
|
| 419 |
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
| 420 |
|
| 421 |
merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
|
|
@@ -424,42 +322,29 @@ def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
| 424 |
model = create_model()
|
| 425 |
|
| 426 |
qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
|
| 427 |
-
print("[STEP] Requesting QP+MS transcription from Gemini...")
|
| 428 |
qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
|
| 429 |
-
print("[STEP] QP+MS transcription received.")
|
| 430 |
-
|
| 431 |
extracted_ids = extract_question_ids_from_qpms(qpms_text)
|
| 432 |
-
print(f"[STEP] Extracted question IDs (in order, duplicates preserved): {extracted_ids}")
|
| 433 |
|
| 434 |
as_prompt = build_as_prompt_with_expected_ids(extracted_ids)
|
| 435 |
-
print("[STEP] Requesting AS transcription from Gemini (using expected IDs block)...")
|
| 436 |
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
|
| 437 |
-
print("[STEP] AS transcription received.")
|
| 438 |
|
| 439 |
grading_input = (
|
| 440 |
-
"=== QP+MS TRANSCRIPT BEGIN ===\n"
|
| 441 |
-
"\n=== QP+MS TRANSCRIPT END ===\n\n"
|
| 442 |
-
"=== ANSWER SHEET TRANSCRIPT BEGIN ===\n"
|
| 443 |
"\n=== ANSWER SHEET TRANSCRIPT END ===\n"
|
| 444 |
)
|
| 445 |
grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
|
| 446 |
-
|
| 447 |
-
grading_text = gemini_generate_content(model, grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input)
|
| 448 |
-
print("[STEP] Grading received from Gemini.")
|
| 449 |
|
| 450 |
-
grading_pdf_path = save_as_pdf(grading_text, os.path.splitext(os.path.basename(ans_path))[0]
|
| 451 |
grading_json = extract_marks_from_grading_exact(grading_text)
|
| 452 |
-
print(f"[STEP] Extracted grading JSON (duplicates preserved): {json.dumps(grading_json, indent=2)}")
|
| 453 |
|
| 454 |
imprinted_pdf_path = None
|
| 455 |
if imprint:
|
| 456 |
question_scheme = qpms_text
|
| 457 |
-
imprinted_pdf_path = os.path.splitext(os.path.basename(ans_path))[0]
|
| 458 |
-
|
| 459 |
-
ids_block = "{\n" + "\n".join(extracted_ids) + "\n}"
|
| 460 |
-
print("[IMPRINT] Starting imprinting with ids_block and question scheme...")
|
| 461 |
-
imprinted_pdf_path = imprint_marks_using_mapping_v2(ans_path, grading_json, imprinted_pdf_path, question_scheme, model, ids_block)
|
| 462 |
-
print(f"[IMPRINT] Completed imprinting. File: {imprinted_pdf_path}")
|
| 463 |
|
| 464 |
return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 465 |
|
|
|
|
| 151 |
|
| 152 |
# ---------------- PARSERS ----------------
|
| 153 |
def extract_question_ids_from_qpms(text):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 154 |
ids = []
|
|
|
|
| 155 |
for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\b", text):
|
| 156 |
+
ids.append(m.group(1).strip())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 157 |
if not ids:
|
| 158 |
+
for m in re.finditer(r"(?m)^\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\s*[\.\):\-]\s", text):
|
| 159 |
+
ids.append(m.group(1).strip())
|
| 160 |
+
return ids if ids else ["NA"]
|
| 161 |
|
| 162 |
def build_as_prompt_with_expected_ids(expected_ids):
|
| 163 |
ids_block = "{\n" + "\n".join(expected_ids) + "\n}" if expected_ids else "{NA}"
|
|
|
|
| 177 |
return prompt
|
| 178 |
|
| 179 |
def extract_marks_from_grading_exact(grading_text):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 180 |
grading_json = {"grading": []}
|
|
|
|
| 181 |
question_blocks = re.split(r"##\s*Question\s+", grading_text)
|
| 182 |
for block in question_blocks[1:]:
|
|
|
|
| 183 |
first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
|
| 184 |
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
|
| 185 |
q_id = q_id_match.group(1).strip() if q_id_match else first_line.split()[0] if first_line else ""
|
|
|
|
| 186 |
awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
|
| 187 |
grading_json["grading"].append({"question": q_id, "marks_awarded": awarded})
|
| 188 |
return grading_json
|
| 189 |
|
| 190 |
# ---------------- IMPRINT ----------------
|
| 191 |
+
def ask_gemini_for_mapping_for_page_v2(model, image_path, grading_json, question_scheme, expected_ids, rows=GRID_ROWS, cols=GRID_COLS):
|
| 192 |
+
ids_block = "{\n" + "\n".join(expected_ids) + "\n}" if expected_ids else "{NA}"
|
|
|
|
|
|
|
|
|
|
|
|
|
| 193 |
prompt = f"""
|
| 194 |
You are an exam marker. Identify where each question begins on this page.
|
| 195 |
The page has {rows}x{cols} grid (cells 1..{rows*cols}).
|
| 196 |
+
Authoritative question scheme:
|
|
|
|
|
|
|
|
|
|
| 197 |
{question_scheme}
|
| 198 |
|
| 199 |
+
Expected IDs (spot only these):
|
| 200 |
+
{ids_block}
|
| 201 |
+
|
| 202 |
Grading JSON:
|
| 203 |
{json.dumps(grading_json, indent=2)}
|
| 204 |
|
| 205 |
+
Instructions:
|
| 206 |
+
- Return cell number where first step begins for each question.
|
| 207 |
+
- Only include questions on this page.
|
| 208 |
+
- Handle mislabelled steps: e.g., Q4.i above Q4 may belong to Q3.ii.
|
| 209 |
+
- Avoid placing marks inside another question's answer area.
|
| 210 |
+
- Prefer blank cell to the RIGHT, else LEFT.
|
| 211 |
+
- Never above or below the answer.
|
| 212 |
+
- Return JSON only, like:
|
| 213 |
+
[{{"question":"1.a","cell_number":15}}, ...]
|
|
|
|
| 214 |
"""
|
|
|
|
| 215 |
img = Image.open(image_path)
|
| 216 |
response = model.generate_content([prompt, img])
|
| 217 |
raw_text = getattr(response, "text", None)
|
|
|
|
| 219 |
raw_text = response.candidates[0].content.parts[0].text
|
| 220 |
if not raw_text:
|
| 221 |
raw_text = str(response)
|
|
|
|
| 222 |
try:
|
| 223 |
start = raw_text.index('[')
|
| 224 |
end = raw_text.rindex(']') + 1
|
| 225 |
return json.loads(raw_text[start:end])
|
| 226 |
except Exception:
|
| 227 |
+
return []
|
| 228 |
+
|
| 229 |
+
def imprint_marks_using_mapping_v2(pdf_path, grading_json, output_pdf, question_scheme, expected_ids, model, rows=GRID_ROWS, cols=GRID_COLS):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 230 |
reader = PdfReader(pdf_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 231 |
annotated_page_paths = []
|
| 232 |
+
|
| 233 |
+
pages = convert_from_path(pdf_path) # keep original size
|
| 234 |
temp_grid_images = []
|
| 235 |
|
|
|
|
| 236 |
for p_index, page_img in enumerate(pages):
|
| 237 |
img = page_img.convert("RGB")
|
| 238 |
draw = ImageDraw.Draw(img)
|
|
|
|
| 241 |
except:
|
| 242 |
font = ImageFont.load_default()
|
| 243 |
|
| 244 |
+
cell_w = img.width / cols
|
| 245 |
+
cell_h = img.height / rows
|
|
|
|
|
|
|
| 246 |
cell_num = 1
|
| 247 |
+
for r in range(rows):
|
| 248 |
+
for c in range(cols):
|
| 249 |
x = int(c * cell_w + cell_w / 2)
|
| 250 |
y = int(r * cell_h + cell_h / 2)
|
| 251 |
bbox = draw.textbbox((0,0), str(cell_num), font=font)
|
| 252 |
draw.text((x - (bbox[2]-bbox[0])/2, y - (bbox[3]-bbox[1])/2), str(cell_num), fill="black", font=font)
|
| 253 |
+
cell_num +=1
|
| 254 |
grid_path = f"page_{p_index+1}_grid.png"
|
| 255 |
img.save(grid_path, "PNG")
|
| 256 |
temp_grid_images.append(grid_path)
|
|
|
|
| 257 |
|
|
|
|
|
|
|
| 258 |
mappings_per_page = {}
|
| 259 |
+
with ThreadPoolExecutor(max_workers=min(8,len(temp_grid_images))) as ex:
|
| 260 |
+
futures = {
|
| 261 |
+
ex.submit(
|
| 262 |
+
ask_gemini_for_mapping_for_page_v2, model, img_path, grading_json, question_scheme, expected_ids, rows, cols
|
| 263 |
+
): idx for idx,img_path in enumerate(temp_grid_images)
|
| 264 |
+
}
|
| 265 |
for fut in as_completed(futures):
|
| 266 |
idx = futures[fut]
|
| 267 |
try:
|
| 268 |
mapping_result = fut.result()
|
| 269 |
mappings_per_page[idx] = mapping_result
|
| 270 |
+
print(f"[IMPRINT] Mapping received for page {idx+1}: {repr(mapping_result)}")
|
| 271 |
except Exception as e:
|
| 272 |
mappings_per_page[idx] = []
|
| 273 |
+
print(f"[IMPRINT] Mapping failed for page {idx+1}: {repr(e)}")
|
| 274 |
|
|
|
|
|
|
|
| 275 |
for p_index, page_img in enumerate(pages):
|
| 276 |
img_cv = np.array(page_img.convert("RGB"))
|
| 277 |
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
|
| 278 |
h, w, _ = img_cv.shape
|
| 279 |
+
cell_w_px, cell_h_px = w/cols, h/rows
|
| 280 |
mapping = mappings_per_page.get(p_index, [])
|
| 281 |
occupied = set()
|
| 282 |
for item in mapping:
|
| 283 |
qid = item.get("question")
|
| 284 |
cell_number = item.get("cell_number")
|
| 285 |
+
if qid is None or cell_number is None: continue
|
| 286 |
+
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"]==qid), [])
|
|
|
|
| 287 |
marks_text = ",".join(marks_list) if marks_list else "?"
|
| 288 |
+
row = (cell_number-1)//cols
|
| 289 |
+
col = (cell_number-1)%cols
|
|
|
|
|
|
|
| 290 |
candidates = []
|
| 291 |
+
if col+1<cols: candidates.append((row,col+1))
|
| 292 |
+
candidates.append((row,col))
|
| 293 |
+
if col-1>=0: candidates.append((row,col-1))
|
| 294 |
+
chosen = next(((r,c) for r,c in candidates if (r*cols+c+1) not in occupied), (row,col))
|
| 295 |
+
occupied.add(chosen[0]*cols+chosen[1]+1)
|
| 296 |
+
x_c = int((chosen[1]+0.5)*cell_w_px)
|
| 297 |
+
y_c = int((chosen[0]+0.5)*cell_h_px)
|
| 298 |
+
font_scale = max(0.6,min(1.6,cell_h_px/60))
|
| 299 |
+
thickness = max(1,int(font_scale*2))
|
| 300 |
+
cv2.putText(img_cv, marks_text, (x_c,y_c), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0,0,255), thickness)
|
|
|
|
| 301 |
annotated_path = f"annotated_page_{p_index+1}.png"
|
| 302 |
cv2.imwrite(annotated_path, img_cv)
|
| 303 |
annotated_page_paths.append(annotated_path)
|
| 304 |
+
|
| 305 |
+
with open(output_pdf,"wb") as f:
|
| 306 |
+
f.write(img2pdf.convert(annotated_page_paths))
|
| 307 |
+
|
| 308 |
+
return compress_pdf(output_pdf)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 309 |
|
| 310 |
# ---------------- PIPELINE ----------------
|
| 311 |
def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
|
|
|
|
| 313 |
ms_path = compress_pdf(ms_path)
|
| 314 |
ans_path = compress_pdf(ans_path)
|
| 315 |
|
| 316 |
+
merged_qpms_path = os.path.splitext(qp_path)[0]+"_merged_qp_ms.pdf"
|
| 317 |
merge_pdfs([qp_path, ms_path], merged_qpms_path)
|
| 318 |
|
| 319 |
merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
|
|
|
|
| 322 |
model = create_model()
|
| 323 |
|
| 324 |
qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
|
|
|
|
| 325 |
qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
|
|
|
|
|
|
|
| 326 |
extracted_ids = extract_question_ids_from_qpms(qpms_text)
|
|
|
|
| 327 |
|
| 328 |
as_prompt = build_as_prompt_with_expected_ids(extracted_ids)
|
|
|
|
| 329 |
as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
|
|
|
|
| 330 |
|
| 331 |
grading_input = (
|
| 332 |
+
"=== QP+MS TRANSCRIPT BEGIN ===\n"+qpms_text+
|
| 333 |
+
"\n=== QP+MS TRANSCRIPT END ===\n\n"+
|
| 334 |
+
"=== ANSWER SHEET TRANSCRIPT BEGIN ===\n"+as_text+
|
| 335 |
"\n=== ANSWER SHEET TRANSCRIPT END ===\n"
|
| 336 |
)
|
| 337 |
grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
|
| 338 |
+
grading_text = gemini_generate_content(model, grading_prompt_system+"\n\nPlease grade the following transcripts:\n"+grading_input)
|
|
|
|
|
|
|
| 339 |
|
| 340 |
+
grading_pdf_path = save_as_pdf(grading_text, os.path.splitext(os.path.basename(ans_path))[0]+"_graded.pdf")
|
| 341 |
grading_json = extract_marks_from_grading_exact(grading_text)
|
|
|
|
| 342 |
|
| 343 |
imprinted_pdf_path = None
|
| 344 |
if imprint:
|
| 345 |
question_scheme = qpms_text
|
| 346 |
+
imprinted_pdf_path = os.path.splitext(os.path.basename(ans_path))[0]+"_imprinted.pdf"
|
| 347 |
+
imprinted_pdf_path = imprint_marks_using_mapping_v2(ans_path, grading_json, imprinted_pdf_path, question_scheme, extracted_ids, model)
|
|
|
|
|
|
|
|
|
|
|
|
|
| 348 |
|
| 349 |
return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
|
| 350 |
|