atz21 commited on
Commit
6941b48
·
verified ·
1 Parent(s): ca54958

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +412 -378
app.py CHANGED
@@ -1,387 +1,421 @@
1
- import os
2
- import re
3
- import json
4
- import subprocess
5
- import img2pdf
6
- import gradio as gr
7
- import google.generativeai as genai
8
- from pdf2image import convert_from_path
9
- from PIL import Image, ImageDraw, ImageFont
10
- import cv2
11
- import numpy as np
12
- from concurrent.futures import ThreadPoolExecutor, as_completed
13
- from PyPDF2 import PdfReader, PdfWriter
14
- from markdown_pdf import MarkdownPdf, Section
15
-
16
- # ---------------- CONFIG ----------------
17
- genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
18
- GRID_ROWS, GRID_COLS = 20, 14
19
-
20
- # ---------------- PROMPTS ----------------
21
- PROMPTS = {
22
- "QP_MS_TRANSCRIPTION": {
23
- "role": "system",
24
- "content": """You are a high-quality OCR/Transcription assistant.
25
- INPUT: This file is a PDF that first contains the Question Paper and immediately after it the Markscheme.
26
- TASK:
27
- 1. Transcribe EXACTLY all the questions FIRST (with their total marks).
28
- 2. After ALL questions, transcribe the Markscheme exactly, preserving M/A/R notation in brackets.
29
- 3. Always number the questions sequentially (Question 1, Question 2, Question 3, …) **in the order they appear in the PDF**, even if the PDF shows a different number or leaves it blank. Do NOT skip or leave Question: blank.
30
- FORMAT:
31
- ==== PAPER TOTAL MARKS ====
32
- <total marks>
33
- ==== QUESTIONS BEGIN ====
34
- Question 1.i
35
- Total Marks: <number>
36
- QP: <question text>
37
- --QUESTION-END--
38
- (repeat for all questions)
39
- ==== QUESTIONS END ====
40
- ==== MARKSCHEME BEGIN ====
41
- Answer 1.i:
42
- <exact MS for Q1.i with notations M1, A1, R1 etc>
43
- (repeat for all answers)
44
- ==== MARKSCHEME END ====
45
- """
46
- },
47
- "GRADING_PROMPT": {
48
- "role": "system",
49
- "content": """Developer: You are an official examiner. Apply the following grading rules precisely.
50
- ### Abbreviations:
51
- - **M**: Marks for Method
52
- - **A**: Marks for Accuracy/Answer
53
- - **R**: Marks for Reasoning
54
- - **AG**: Answer given in question—no marks
55
- - **FT**: Follow Through marks (if error carried forward correctly)
56
- - **MR**: Deduct for misread (once only)
57
- ---
58
- ## Grading Instructions
59
- 1. Award marks using official annotations (e.g., M1, A2).
60
- 2. Do not award full marks for answers alone; check for method marks.
61
- 3. A marks usually require a valid M mark first.
62
- 4. Accept valid equivalent forms unless otherwise specified.
63
- 5. Apply FT where appropriate.
64
- 6. Use proper notation: M1A0, A1, etc.
65
- 7. Any lost mark: use red `<span style="color:red">M0</span>` and make Reason red.
66
- ---
67
- ## Output Format
68
- Produce two sections per question/sub-question, following this structure:
69
- ## Question <id>
70
- ### Markscheme vs Student Answer
71
- | Mark ID | Markscheme Expectation | Student’s Response | Awarded |
72
- |---------|------------------------|--------------------|---------|
73
- | M1_1 | Recognise GP | "r=0.9" | M1 |
74
- ➡️ **Total: X/Y**
75
- ---
76
- ### Examiner’s Report
77
- At the very end, provide a summary table:
78
- | Question Number | Marks | Remark |
79
- |-----------------|-------|--------|
80
- | 1 | X/Y | <remark> |
81
- Then show total clearly as a final line:
82
- `Total: <obtained_marks>/<max_marks>`
83
- NOTES:
84
- - The assistant will receive two transcripts: (1) QP+MS transcription (questions then markscheme) and (2) AS transcription (student answers). Use the QP+MS transcript as the authoritative source.
85
- """
86
- }
87
- }
88
-
89
- # ---------------- HELPERS ----------------
90
- def save_as_pdf(text, filename="output.pdf"):
91
- pdf = MarkdownPdf()
92
- pdf.add_section(Section(text, toc=False))
93
- pdf.save(filename)
94
- return filename
95
-
96
- def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
97
- if output_path is None:
98
- base, ext = os.path.splitext(input_path)
99
- output_path = f"{base}_compressed{ext}"
100
- try:
101
- size = os.path.getsize(input_path)
102
- except Exception:
103
- return input_path
104
- if size <= max_size:
105
- return input_path
106
- try:
107
- gs_cmd = [
108
- "gs", "-sDEVICE=pdfwrite",
109
- "-dCompatibilityLevel=1.4",
110
- "-dPDFSETTINGS=/ebook",
111
- "-dNOPAUSE", "-dQUIET", "-dBATCH",
112
- f"-sOutputFile={output_path}", input_path
113
- ]
114
- subprocess.run(gs_cmd, check=True)
115
- new_size = os.path.getsize(output_path)
116
- if new_size <= max_size:
117
- return output_path
118
- return input_path
119
- except Exception:
120
- return input_path
121
-
122
- def create_model():
123
- try:
124
- return genai.GenerativeModel("gemini-2.5-pro", generation_config={"temperature": 0})
125
- except Exception:
126
- return genai.GenerativeModel("gemini-2.5-flash", generation_config={"temperature": 0})
127
-
128
- def merge_pdfs(paths, output_path):
129
- writer = PdfWriter()
130
- for p in paths:
131
- reader = PdfReader(p)
132
- for page in reader.pages:
133
- writer.add_page(page)
134
- with open(output_path, "wb") as f:
135
- writer.write(f)
136
- return output_path
137
-
138
- def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=None):
139
- inputs = [prompt_text]
140
- if file_upload_obj:
141
- inputs.append(file_upload_obj)
142
- if image_obj:
143
- inputs.append(image_obj)
144
- response = model.generate_content(inputs)
145
- raw_text = getattr(response, "text", None)
146
- if not raw_text and getattr(response, "candidates", None):
147
- raw_text = response.candidates[0].content.parts[0].text
148
- if raw_text is None:
149
- raw_text = str(response)
150
- return raw_text
151
-
152
- # ---------------- PARSERS ----------------
153
- def extract_question_ids_from_qpms(text):
154
- ids = []
155
- for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\b", text):
156
- ids.append(m.group(1).strip())
157
- if not ids:
158
- for m in re.finditer(r"(?m)^\s*([0-9]+(?:[a-zA-Z0-9\.\(\)]+)*)\s*[\.\):\-]\s", text):
159
- ids.append(m.group(1).strip())
160
- return ids if ids else ["NA"]
161
-
162
- def build_as_prompt_with_expected_ids(expected_ids):
163
- ids_block = "{\n" + "\n".join(expected_ids) + "\n}" if expected_ids else "{NA}"
164
- prompt = f"""You are a high-quality handwritten transcription assistant.
165
- INPUT: This PDF contains a student's handwritten answer sheet.
166
- TASK: Transcribe the student's answers exactly (as text), preserving step order and line breaks.
167
- Attempt to assign each answer to a question ID if student labelled it; else mark as INFERRED.
168
- Enclose math in ``` blocks, diagrams as [Graph omitted], unreadable as [illegible].
169
- Expected questions:
170
- {ids_block}
171
- -----------------------
172
- OUTPUT FORMAT:
173
- Question <id>
174
- AS:
175
- <transcribed answer or placeholder>
176
- """
177
- return prompt
178
-
179
- def extract_marks_from_grading_exact(grading_text):
180
- grading_json = {"grading": []}
181
- question_blocks = re.split(r"##\s*Question\s+", grading_text)
182
- for block in question_blocks[1:]:
183
- first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
184
- q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
185
- q_id = q_id_match.group(1).strip() if q_id_match else first_line.split()[0] if first_line else ""
186
- awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
187
- grading_json["grading"].append({"question": q_id, "marks_awarded": awarded})
188
- return grading_json
189
-
190
- # ---------------- IMPRINT ----------------
191
- def ask_gemini_for_mapping_for_page_v2(model, image_path, grading_json, question_scheme, expected_ids, rows=GRID_ROWS, cols=GRID_COLS):
192
- ids_block = "{\n" + "\n".join(expected_ids) + "\n}" if expected_ids else "{NA}"
193
- prompt = f"""
194
- You are an exam marker. Identify where each question begins on this page.
195
- The page has {rows}x{cols} grid (cells 1..{rows*cols}).
196
- Authoritative question scheme:
197
- {question_scheme}
198
-
199
- Expected IDs (spot only these):
200
- {ids_block}
201
-
202
- Grading JSON:
203
- {json.dumps(grading_json, indent=2)}
204
-
205
- Instructions:
206
- - Return cell number where first step begins for each question.
207
- - Only include questions on this page.
208
- - Handle mislabelled steps: e.g., Q4.i above Q4 may belong to Q3.ii.
209
- - Avoid placing marks inside another question's answer area.
210
- - Prefer blank cell to the RIGHT, else LEFT.
211
- - Never above or below the answer.
212
- - Return JSON only, like:
213
- [{{"question":"1.a","cell_number":15}}, ...]
214
- """
215
- img = Image.open(image_path)
216
- response = model.generate_content([prompt, img])
217
- raw_text = getattr(response, "text", None)
218
- if not raw_text and getattr(response, "candidates", None):
219
- raw_text = response.candidates[0].content.parts[0].text
220
- if not raw_text:
221
- raw_text = str(response)
222
- try:
223
- start = raw_text.index('[')
224
- end = raw_text.rindex(']') + 1
225
- return json.loads(raw_text[start:end])
226
- except Exception:
227
- return []
228
-
229
- def imprint_marks_using_mapping_v2(pdf_path, grading_json, output_pdf, question_scheme, expected_ids, model, rows=GRID_ROWS, cols=GRID_COLS):
230
- reader = PdfReader(pdf_path)
231
- annotated_page_paths = []
232
-
233
- pages = convert_from_path(pdf_path) # keep original size
234
- temp_grid_images = []
235
-
236
- for p_index, page_img in enumerate(pages):
237
- img = page_img.convert("RGB")
238
- draw = ImageDraw.Draw(img)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
239
  try:
240
- font = ImageFont.truetype("arial.ttf", 16)
241
- except:
242
- font = ImageFont.load_default()
243
-
244
- cell_w = img.width / cols
245
- cell_h = img.height / rows
246
- cell_num = 1
247
- for r in range(rows):
248
- for c in range(cols):
249
- x = int(c * cell_w + cell_w / 2)
250
- y = int(r * cell_h + cell_h / 2)
251
- bbox = draw.textbbox((0,0), str(cell_num), font=font)
252
- draw.text((x - (bbox[2]-bbox[0])/2, y - (bbox[3]-bbox[1])/2), str(cell_num), fill="black", font=font)
253
- cell_num +=1
254
- grid_path = f"page_{p_index+1}_grid.png"
255
- img.save(grid_path, "PNG")
256
- temp_grid_images.append(grid_path)
257
-
258
- mappings_per_page = {}
259
- with ThreadPoolExecutor(max_workers=min(8,len(temp_grid_images))) as ex:
260
- futures = {
261
- ex.submit(
262
- ask_gemini_for_mapping_for_page_v2, model, img_path, grading_json, question_scheme, expected_ids, rows, cols
263
- ): idx for idx,img_path in enumerate(temp_grid_images)
264
- }
265
- for fut in as_completed(futures):
266
- idx = futures[fut]
267
- try:
268
- mapping_result = fut.result()
269
- mappings_per_page[idx] = mapping_result
270
- print(f"[IMPRINT] Mapping received for page {idx+1}: {repr(mapping_result)}")
271
- except Exception as e:
272
- mappings_per_page[idx] = []
273
- print(f"[IMPRINT] Mapping failed for page {idx+1}: {repr(e)}")
274
-
275
- for p_index, page_img in enumerate(pages):
276
- img_cv = np.array(page_img.convert("RGB"))
277
- img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
278
- h, w, _ = img_cv.shape
279
- cell_w_px, cell_h_px = w/cols, h/rows
280
- mapping = mappings_per_page.get(p_index, [])
281
- occupied = set()
282
- for item in mapping:
283
- qid = item.get("question")
284
- cell_number = item.get("cell_number")
285
- if qid is None or cell_number is None: continue
286
- marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"]==qid), [])
287
- marks_text = ",".join(marks_list) if marks_list else "?"
288
- row = (cell_number-1)//cols
289
- col = (cell_number-1)%cols
290
- candidates = []
291
- if col+1<cols: candidates.append((row,col+1))
292
- candidates.append((row,col))
293
- if col-1>=0: candidates.append((row,col-1))
294
- chosen = next(((r,c) for r,c in candidates if (r*cols+c+1) not in occupied), (row,col))
295
- occupied.add(chosen[0]*cols+chosen[1]+1)
296
- x_c = int((chosen[1]+0.5)*cell_w_px)
297
- y_c = int((chosen[0]+0.5)*cell_h_px)
298
- font_scale = max(0.6,min(1.6,cell_h_px/60))
299
- thickness = max(1,int(font_scale*2))
300
- cv2.putText(img_cv, marks_text, (x_c,y_c), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0,0,255), thickness)
301
- annotated_path = f"annotated_page_{p_index+1}.png"
302
- cv2.imwrite(annotated_path, img_cv)
303
- annotated_page_paths.append(annotated_path)
304
-
305
- with open(output_pdf,"wb") as f:
306
- f.write(img2pdf.convert(annotated_page_paths))
307
-
308
- return compress_pdf(output_pdf)
309
-
310
- # ---------------- PIPELINE ----------------
311
- def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False):
312
- qp_path = compress_pdf(qp_path)
313
- ms_path = compress_pdf(ms_path)
314
- ans_path = compress_pdf(ans_path)
315
-
316
- merged_qpms_path = os.path.splitext(qp_path)[0]+"_merged_qp_ms.pdf"
317
- merge_pdfs([qp_path, ms_path], merged_qpms_path)
318
 
319
- merged_uploaded = genai.upload_file(path=merged_qpms_path, display_name="QP+MS (merged)")
320
- ans_uploaded = genai.upload_file(path=ans_path, display_name="Answer Sheet")
321
 
 
 
322
  model = create_model()
323
 
324
- qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"]
325
- qpms_text = gemini_generate_content(model, qpms_prompt, file_upload_obj=merged_uploaded)
326
- extracted_ids = extract_question_ids_from_qpms(qpms_text)
327
-
328
- as_prompt = build_as_prompt_with_expected_ids(extracted_ids)
329
- as_text = gemini_generate_content(model, as_prompt, file_upload_obj=ans_uploaded)
330
-
331
- grading_input = (
332
- "=== QP+MS TRANSCRIPT BEGIN ===\n"+qpms_text+
333
- "\n=== QP+MS TRANSCRIPT END ===\n\n"+
334
- "=== ANSWER SHEET TRANSCRIPT BEGIN ===\n"+as_text+
335
- "\n=== ANSWER SHEET TRANSCRIPT END ===\n"
336
- )
337
- grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"]
338
- grading_text = gemini_generate_content(model, grading_prompt_system+"\n\nPlease grade the following transcripts:\n"+grading_input)
339
-
340
- grading_pdf_path = save_as_pdf(grading_text, os.path.splitext(os.path.basename(ans_path))[0]+"_graded.pdf")
341
- grading_json = extract_marks_from_grading_exact(grading_text)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
342
 
343
- imprinted_pdf_path = None
344
- if imprint:
345
- question_scheme = qpms_text
346
- imprinted_pdf_path = os.path.splitext(os.path.basename(ans_path))[0]+"_imprinted.pdf"
347
- imprinted_pdf_path = imprint_marks_using_mapping_v2(ans_path, grading_json, imprinted_pdf_path, question_scheme, extracted_ids, model)
348
-
349
- return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path
350
-
351
- # ---------------- GRADIO ----------------
352
- with gr.Blocks(title="LeadIB AI Grading (Updated Imprint)") as demo:
353
- gr.Markdown("## 📘 LeadIB AI Grading — Updated Imprint Pipeline\nUpload QP, Markscheme, and Student Answer Sheet.")
354
-
355
- with gr.Row():
356
- qp_file = gr.File(label="📄 Question Paper (PDF)")
357
- ms_file = gr.File(label="📄 Markscheme (PDF)")
358
- ans_file = gr.File(label="📝 Student Answer Sheet (PDF)")
359
-
360
- imprint_toggle = gr.Checkbox(label="✍ Imprint Marks", value=False)
361
- run_button = gr.Button("🚀 Run Pipeline")
362
-
363
- with gr.Row():
364
- qpms_box = gr.Textbox(label="📑 QP+MS Transcript", lines=12)
365
- as_box = gr.Textbox(label="📝 AS Transcript", lines=12)
366
-
367
- grading_output_box = gr.Textbox(label="🧾 Grading Markdown", lines=20)
368
- grading_pdf_file = gr.File(label="📥 Download Grading PDF")
369
- imprint_pdf_file = gr.File(label="📥 Download Imprinted PDF (Optional)")
370
-
371
- def run_pipeline(qp_file_obj, ms_file_obj, ans_file_obj, imprint_flag):
372
- qp_path = qp_file_obj.name
373
- ms_path = ms_file_obj.name
374
- ans_path = ans_file_obj.name
375
- qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path = align_and_grade_pipeline(
376
- qp_path, ms_path, ans_path, imprint=imprint_flag
377
- )
378
- return qpms_text or "", as_text or "", grading_text or "", grading_pdf_path, imprinted_pdf_path
379
 
380
- run_button.click(
381
- fn=run_pipeline,
382
- inputs=[qp_file, ms_file, ans_file, imprint_toggle],
383
- outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file]
384
- )
385
 
386
  if __name__ == "__main__":
387
- demo.launch()
 
1
+ import os
2
+ import re
3
+ import json
4
+ import subprocess
5
+ import tempfile
6
+ import time
7
+ import img2pdf
8
+ import gradio as gr
9
+ import google.generativeai as genai
10
+ from markdown_pdf import MarkdownPdf, Section
11
+ from pdf2image import convert_from_path
12
+ from PIL import Image, ImageDraw, ImageFont
13
+ import cv2
14
+ import numpy as np
15
+ from concurrent.futures import ThreadPoolExecutor, as_completed
16
+ from PyPDF2 import PdfReader, PdfWriter
17
+
18
+ # ---------------- CONFIG ----------------
19
+ genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
20
+ GRID_ROWS, GRID_COLS = 20, 14
21
+
22
+ # ---------------- PROMPTS ----------------
23
+ PROMPTS = {
24
+ "QP_MS_TRANSCRIPTION" : {
25
+ "role": "system",
26
+ "content": """You are a high-quality OCR/Transcription assistant.
27
+ INPUT: This file is a PDF that first contains the Question Paper and immediately after it the Markscheme.
28
+ TASK:
29
+ 1. Transcribe EXACTLY all the questions FIRST (with their total marks).
30
+ 2. After ALL questions, transcribe the Markscheme exactly, preserving M/A/R notation in brackets.
31
+ 3. Always number the questions sequentially (Question 1, Question 2, Question 3, …) **in the order they appear in the PDF**, even if the PDF shows a different number or leaves it blank. Do NOT skip or leave Question: blank.
32
+ FORMAT:
33
+ ==== PAPER TOTAL MARKS ====
34
+ <total marks>
35
+ ==== QUESTIONS BEGIN ====
36
+ Question 1.i
37
+ Total Marks: <number>
38
+ QP: <question text>
39
+ --QUESTION-END--
40
+ Question 1.ii
41
+ Total Marks: <number>
42
+ QP: <question text>
43
+ --QUESTION-END--
44
+ (repeat for all questions in order of appearance)
45
+ ==== QUESTIONS END ====
46
+ ==== MARKSCHEME BEGIN ====
47
+ Answer 1.i:
48
+ <exact MS for Q1.i with notations M1, A1, R1 etc>
49
+ Answer 1.ii:
50
+ <exact MS for Q1.ii with notations>
51
+ Answer 2 :
52
+ <exact MS for Q2 with notations>
53
+ (repeat for all answers)
54
+ ==== MARKSCHEME END ====
55
+ """
56
+ }
57
+ ,
58
+
59
+ "GRADING_PROMPT": {
60
+ "role": "system",
61
+ "content": """Developer: You are an official examiner. Apply the following grading rules precisely.
62
+ ### Abbreviations:
63
+ - **M**: Marks for Method
64
+ - **A**: Marks for Accuracy/Answer
65
+ - **R**: Marks for Reasoning
66
+ - **AG**: Answer given in question—no marks
67
+ - **FT**: Follow Through marks (if error carried forward correctly)
68
+ - **MR**: Deduct for misread (once only)
69
+ ---
70
+ ## Grading Instructions
71
+ 1. Award marks using official annotations (e.g., M1, A2).
72
+ 2. Do not award full marks for answers alone; check for method marks.
73
+ 3. A marks usually require a valid M mark first.
74
+ 4. Accept valid equivalent forms unless otherwise specified.
75
+ 5. Apply FT where appropriate.
76
+ 6. Use proper notation: M1A0, A1, etc.
77
+ 7. Any lost mark: use red `<span style="color:red">M0</span>` and make Reason red.
78
+ ---
79
+ ## Output Format
80
+ Produce two sections per question/sub-question, following this structure:
81
+ ## Question <id>
82
+ ### Markscheme vs Student Answer
83
+ | Mark ID | Markscheme Expectation | Student’s Response | Awarded |
84
+ |---------|------------------------|--------------------|---------|
85
+ | M1_1 | Recognise GP | "r=0.9" | M1 |
86
+ ➡️ **Total: X/Y**
87
+ ---
88
+ ### Examiner’s Report
89
+ At the very end, provide a summary table:
90
+ | Question Number | Marks | Remark |
91
+ |-----------------|-------|--------|
92
+ | 1 | X/Y | <remark> |
93
+ Then show total clearly as a final line:
94
+ `Total: <obtained_marks>/<max_marks>`
95
+ NOTES:
96
+ - The assistant will receive two transcripts: (1) QP+MS transcription (questions then markscheme) and (2) AS transcription (student answers). Use the QP+MS transcript as the authoritative source of question wording, total marks, and verbatim markscheme entries (M/A/R mark IDs).
97
+ - Match student answers to question IDs and grade according to the provided verbatim markscheme.
98
+ - Produce full markdown as above. Ensure mark IDs used in the grading are present and consistent with the markscheme.
99
+ """
100
+ }
101
+ }
102
+
103
+ # ---------------- HELPERS ----------------
104
+ def save_as_pdf(text, filename="output.pdf"):
105
+ pdf = MarkdownPdf()
106
+ pdf.add_section(Section(text, toc=False))
107
+ pdf.save(filename)
108
+ return filename
109
+
110
+ def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
111
+ if output_path is None:
112
+ base, ext = os.path.splitext(input_path)
113
+ output_path = f"{base}_compressed{ext}"
114
+
115
+ try:
116
+ size = os.path.getsize(input_path)
117
+ except Exception:
118
+ return input_path
119
+
120
+ if size <= max_size:
121
+ print(f"ℹ️ Not compressing {input_path} ({size/1024/1024:.2f} MB <= {max_size/1024/1024} MB)")
122
+ return input_path
123
+
124
+ print(f"🔎 Compressing {input_path} ({size/1024/1024:.2f} MB) -> {output_path}")
125
+ try:
126
+ gs_cmd = [
127
+ "gs", "-sDEVICE=pdfwrite",
128
+ "-dCompatibilityLevel=1.4",
129
+ "-dPDFSETTINGS=/ebook",
130
+ "-dNOPAUSE", "-dQUIET", "-dBATCH",
131
+ f"-sOutputFile={output_path}", input_path
132
+ ]
133
+ subprocess.run(gs_cmd, check=True)
134
+ new_size = os.path.getsize(output_path)
135
+ print(f"✅ Compression done. New size: {new_size/1024/1024:.2f} MB")
136
+ if new_size <= max_size:
137
+ return output_path
138
+ else:
139
+ print("⚠️ Compressed file still larger than threshold; returning original")
140
+ return input_path
141
+ except Exception as e:
142
+ print("❌ Compression error:", e)
143
+ return input_path
144
+
145
+ def create_model():
146
+ try:
147
+ print("⚡ Attempting to use gemini-2.5-pro model")
148
+ model = genai.GenerativeModel("gemini-2.5-pro", generation_config={"temperature": 0})
149
+ print("✅ Selected model: gemini-2.5-pro")
150
+ return model
151
+ except Exception as e:
152
+ print("⚠️ Could not use gemini-2.5-pro:", e)
153
+ try:
154
+ print("⚡ Falling back to gemini-2.5-flash model")
155
+ model = genai.GenerativeModel("gemini-2.5-flash", generation_config={"temperature": 0})
156
+ print("✅ Selected model: gemini-2.5-flash")
157
+ return model
158
+ except Exception as e:
159
+ print("❌ Failed to create any Gemini model:", e)
160
+ raise
161
+
162
+ def merge_pdfs(paths, output_path):
163
+ writer = PdfWriter()
164
+ for p in paths:
165
+ reader = PdfReader(p)
166
+ for page in reader.pages:
167
+ writer.add_page(page)
168
+ with open(output_path, "wb") as f:
169
+ writer.write(f)
170
+ return output_path
171
+
172
+ def gemini_generate_content(model, prompt_text, file_upload_obj=None, image_obj=None):
173
+ inputs = [prompt_text]
174
+ if file_upload_obj:
175
+ inputs.append(file_upload_obj)
176
+ if image_obj:
177
+ inputs.append(image_obj)
178
+ print("📡 Sending request to Gemini (prompt length:", len(prompt_text), "chars )")
179
+ response = model.generate_content(inputs)
180
+ raw_text = getattr(response, "text", None)
181
+ if not raw_text and getattr(response, "candidates", None):
182
+ raw_text = response.candidates[0].content.parts[0].text
183
+ if raw_text is None:
184
+ raw_text = str(response)
185
+ print("📥 Received response (chars):", len(raw_text))
186
+ return raw_text
187
+
188
+ # ---------------- PARSERS ----------------
189
+ def extract_question_ids_from_qpms(text):
190
+ print("🔎 Extracting question IDs from QP+MS transcript using regex...")
191
+ ids = []
192
+ for m in re.finditer(r"(?im)^\s*Question\s*:\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\b", text):
193
+ qid = m.group(1).strip()
194
+ ids.append(qid)
195
+ if ids:
196
+ print(f"✅ Extracted {len(ids)} question IDs.")
197
+ print("IDs:", ids)
198
+ return ids
199
+
200
+ for m in re.finditer(r"(?m)^\s*([0-9]+(?:(?:\.[a-zA-Z0-9]+)+|(?:\([a-zA-Z0-9]+\))+|[a-zA-Z])*)\s*[\.\):\-]\s", text):
201
+ qid = m.group(1).strip()
202
+ ids.append(qid)
203
+ if ids:
204
+ print(f"✅ Extracted {len(ids)} question IDs (fallback heuristic).")
205
+ print("IDs:", ids)
206
+ else:
207
+ print("⚠️ No question IDs extracted; will send NA placeholder.")
208
+ return ids
209
+
210
+ def build_as_prompt_with_expected_ids(expected_ids):
211
+ if not expected_ids:
212
+ ids_block = "{NA}"
213
+ else:
214
+ ids_block = "{\n" + "\n".join(expected_ids) + "\n}"
215
+ prompt = f"""You are a high-quality handwritten transcription assistant.
216
+ INPUT: This PDF contains a student's handwritten answer sheet.
217
+ TASK: Transcribe the student's answers exactly (as text). Preserve step order and line breaks. Attempt to assign each answer to a question ID if the student has labelled it (e.g., "1", "1a", "2(b)", "3"). If the student hasn't labelled answers, segment contiguous answer blocks and attempt to infer question IDs from context — but mark inferred IDs clearly as "INFERRED: <id>"
218
+ Enclose all mathematical expressions in Markdown fenced code blocks (``` triple backticks).
219
+ If a diagram/graph is omitted, write [Graph omitted].
220
+ Unreadable parts: [illegible].
221
+ Unanswered: [No response].
222
+ Do NOT recreate diagrams.
223
+ Ensure consistency and determinism in formatting so subsequent models can grade directly from this aligned format.
224
+ Expected questions (if missing, write NA):
225
+ {ids_block}
226
+ -----------------------
227
+ OUTPUT FORMAT:
228
+ Question <id>
229
+ AS:
230
+ <transcribed answer or placeholder>
231
+ """
232
+ return prompt
233
+
234
+ def extract_marks_from_grading(grading_text):
235
+ print("🔎 Extracting awarded marks from grading output...")
236
+ grading_json = {"grading": []}
237
+ question_blocks = re.split(r"##\s*Question\s+", grading_text)
238
+ for block in question_blocks[1:]:
239
+ first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
240
+ q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^\)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
241
+ if not q_id_match:
242
+ q_id = first_line.split()[0] if first_line else ""
243
+ else:
244
+ q_id = q_id_match.group(1).strip()
245
+ awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block)
246
+ # 🔴 Change 1: DO NOT deduplicate, keep all marks in sequence
247
+ grading_json["grading"].append({
248
+ "question": q_id,
249
+ "marks_awarded": awarded
250
+ })
251
+ print("✅ Extracted grading marks for", len(grading_json["grading"]), "question blocks.")
252
+ print(json.dumps(grading_json, indent=2))
253
+ return grading_json
254
+
255
+ # ---------------- MAPPING/IMPRINT HELPERS ----------------
256
+ def ask_gemini_for_mapping_for_page(model, image_path, grading_json, rows=GRID_ROWS, cols=GRID_COLS, expected_ids=None):
257
+ if not expected_ids:
258
+ ids_block = "{NA}"
259
+ else:
260
+ ids_block = "{\n" + "\n".join(expected_ids) + "\n}"
261
+
262
+ prompt = f"""
263
+ You are an exam marker. Your role is to identify where each question begins on the page.
264
+ The page is divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label (1..{rows*cols}).
265
+
266
+ The only questions you should spot are listed here:
267
+ {ids_block}
268
+
269
+ For each question in the grading JSON, return the cell NUMBER where the FIRST STEP of that question begins.
270
+
271
+ IMPORTANT RULES:
272
+ - Do not place marks inside another question's answer area.
273
+ - Prefer placing the marks in a BLANK cell immediately to the RIGHT of the answer step. If no blank cell is available to the right, then place in a blank cell to the LEFT.
274
+ - Never place marks above or below the answer.
275
+ - If you find something like Q4.i but above it you see "ii)", interpret it as belonging to Q3.ii instead.
276
+
277
+ Return JSON only, like:
278
+ [{{"question": "1.a", "cell_number": 15}}, ...]
279
+
280
+ Grading JSON:
281
+ {json.dumps(grading_json, indent=2)}
282
+ """
283
+ print(f"📡 Sending mapping request for image {image_path} to Gemini...")
284
+ img = Image.open(image_path)
285
+ response = model.generate_content([prompt, img])
286
+ raw_text = getattr(response, "text", None)
287
+ if not raw_text and getattr(response, "candidates", None):
288
+ raw_text = response.candidates[0].content.parts[0].text
289
+ if not raw_text:
290
+ raw_text = str(response)
291
+ print("📥 Mapping response (chars):", len(raw_text))
292
+ try:
293
+ start = raw_text.index('[')
294
+ end = raw_text.rindex(']') + 1
295
+ json_part = raw_text[start:end]
296
+ mapping = json.loads(json_part)
297
+ print("✅ Parsed mapping JSON for", image_path, "| entries:", len(mapping))
298
+ return mapping
299
+ except Exception:
300
+ match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
301
+ if match:
302
+ try:
303
+ mapping = json.loads(match.group(1))
304
+ print("✅ Parsed mapping JSON (alt) for", image_path, "| entries:", len(mapping))
305
+ return mapping
306
+ except Exception:
307
+ pass
308
+ print("⚠️ Failed to parse mapping JSON for", image_path)
309
+ return []
310
+ # ---------------- IMPRINTING ----------------
311
+ def imprint_marks_using_mapping(image_path, mapping, output_path, rows=GRID_ROWS, cols=GRID_COLS):
312
+ print(f"🖊️ Imprinting marks on {image_path} -> {output_path}")
313
+ img = cv2.imread(image_path)
314
+ h, w, _ = img.shape
315
+ cell_h, cell_w = h // rows, w // cols
316
+
317
+ for entry in mapping:
318
  try:
319
+ q = entry["question"]
320
+ cell_num = int(entry["cell_number"])
321
+ awarded = entry.get("marks_awarded", [])
322
+ row = (cell_num - 1) // cols
323
+ col = (cell_num - 1) % cols
324
+ x = col * cell_w + 5
325
+ y = row * cell_h + 20
326
+ mark_text = f"{q}: {' '.join(awarded)}"
327
+ cv2.putText(img, mark_text, (x, y),
328
+ cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
329
+ except Exception as e:
330
+ print("⚠️ Imprint error for entry:", entry, "|", e)
331
+
332
+ cv2.imwrite(output_path, img)
333
+ return output_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
334
 
 
 
335
 
336
+ # ---------------- MAIN PIPELINE ----------------
337
+ def align_and_grade_pipeline(qp_ms_pdf, as_pdf):
338
  model = create_model()
339
 
340
+ # Step 1: Transcribe QP + MS
341
+ print("📄 Transcribing QP+MS PDF...")
342
+ qpms_text = gemini_generate_content(model, PROMPTS["QP_MS_TRANSCRIPTION"]["content"], file_upload_obj=qp_ms_pdf)
343
+
344
+ # Step 2: Extract IDs
345
+ expected_ids = extract_question_ids_from_qpms(qpms_text)
346
+
347
+ # Step 3: Transcribe AS
348
+ print("📄 Transcribing Answer Sheet PDF...")
349
+ as_prompt = build_as_prompt_with_expected_ids(expected_ids)
350
+ as_text = gemini_generate_content(model, as_prompt, file_upload_obj=as_pdf)
351
+
352
+ # Step 4: Grade
353
+ grading_prompt = PROMPTS["GRADING_PROMPT"]["content"] + "\n\n" + \
354
+ "QP+MS Transcript:\n" + qpms_text + "\n\nAS Transcript:\n" + as_text
355
+ grading_text = gemini_generate_content(model, grading_prompt)
356
+
357
+ # Step 5: Extract marks JSON
358
+ grading_json = extract_marks_from_grading(grading_text)
359
+
360
+ # Step 6: Convert AS to images
361
+ images = convert_from_path(as_pdf, dpi=200)
362
+ temp_dir = tempfile.mkdtemp()
363
+ image_paths = []
364
+ for i, img in enumerate(images):
365
+ img_path = os.path.join(temp_dir, f"page_{i+1}.png")
366
+ img.save(img_path, "PNG")
367
+ image_paths.append(img_path)
368
+
369
+ # Step 7: Mapping for each page
370
+ mappings = []
371
+ for img_path in image_paths:
372
+ mapping = ask_gemini_for_mapping_for_page(model, img_path, grading_json,
373
+ rows=GRID_ROWS, cols=GRID_COLS,
374
+ expected_ids=expected_ids)
375
+ # Merge awarded marks into mapping
376
+ for entry in mapping:
377
+ for g in grading_json["grading"]:
378
+ if g["question"] == entry["question"]:
379
+ entry["marks_awarded"] = g["marks_awarded"]
380
+ mappings.append((img_path, mapping))
381
+
382
+ # Step 8: Imprint marks
383
+ imprinted_paths = []
384
+ for img_path, mapping in mappings:
385
+ out_path = img_path.replace(".png", "_imprinted.png")
386
+ imprint_marks_using_mapping(img_path, mapping, out_path)
387
+ imprinted_paths.append(out_path)
388
+
389
+ # Step 9: Convert to PDF
390
+ output_pdf = os.path.join(temp_dir, "final_output.pdf")
391
+ with open(output_pdf, "wb") as f:
392
+ f.write(img2pdf.convert(imprinted_paths))
393
+
394
+ compressed_pdf = compress_pdf(output_pdf)
395
+ return grading_text, compressed_pdf
396
+
397
+
398
+ # ---------------- GRADIO UI ----------------
399
+ def run_gradio():
400
+ with gr.Blocks() as demo:
401
+ gr.Markdown("# 📘 Automated Exam Grader (QP + MS + AS)")
402
+
403
+ with gr.Row():
404
+ qpms_file = gr.File(label="Upload Question Paper + Markscheme PDF", file_types=[".pdf"])
405
+ as_file = gr.File(label="Upload Student Answer Sheet PDF", file_types=[".pdf"])
406
+
407
+ run_btn = gr.Button("Run Alignment + Grading")
408
+ grading_output = gr.Textbox(label="Grading Report (Markdown)", lines=20)
409
+ final_pdf = gr.File(label="Download Final Imprinted PDF")
410
+
411
+ def process(qpms_pdf, as_pdf):
412
+ grading_text, pdf_path = align_and_grade_pipeline(qpms_pdf, as_pdf)
413
+ return grading_text, pdf_path
414
+
415
+ run_btn.click(process, inputs=[qpms_file, as_file], outputs=[grading_output, final_pdf])
416
 
417
+ demo.launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
418
 
 
 
 
 
 
419
 
420
  if __name__ == "__main__":
421
+ run_gradio()