annoted pdf with tick mark
Browse files
app.py
CHANGED
|
@@ -263,11 +263,16 @@ def _init_gemini_client(key_index: int = 0) -> None:
|
|
| 263 |
|
| 264 |
|
| 265 |
def _is_rate_limit_error(error_msg: str) -> bool:
|
| 266 |
-
"""Check if the error is a rate limit error (429)."""
|
| 267 |
if not error_msg:
|
| 268 |
return False
|
| 269 |
lower = error_msg.lower()
|
| 270 |
-
return "429" in lower or
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 271 |
|
| 272 |
|
| 273 |
def _rotate_to_next_key() -> bool:
|
|
@@ -986,6 +991,110 @@ def extract_text_from_pdf(pdf_bytes: bytes, filename: str = "unknown.pdf") -> Di
|
|
| 986 |
return {"text": extracted, "used_ocr": used_ocr, "needs_ocr": False}
|
| 987 |
|
| 988 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 989 |
def create_annotated_pdf(
|
| 990 |
original_pdf_bytes: bytes,
|
| 991 |
mcq_results: List[Dict[str, Any]] = None,
|
|
@@ -995,205 +1104,200 @@ def create_annotated_pdf(
|
|
| 995 |
question_type: str = "mcq"
|
| 996 |
) -> bytes:
|
| 997 |
"""
|
| 998 |
-
|
| 999 |
-
|
| 1000 |
-
|
| 1001 |
-
|
| 1002 |
-
|
| 1003 |
-
|
| 1004 |
-
|
| 1005 |
-
student_level: Student level (Easy/Medium/Hard)
|
| 1006 |
-
question_type: Type of question ('mcq' or 'narrative')
|
| 1007 |
-
|
| 1008 |
-
Returns:
|
| 1009 |
-
Annotated PDF as bytes
|
| 1010 |
"""
|
| 1011 |
if not reportlab:
|
| 1012 |
print("[WARN] reportlab not available, returning original PDF")
|
| 1013 |
return original_pdf_bytes
|
| 1014 |
-
|
| 1015 |
try:
|
| 1016 |
from pypdf import PdfWriter, PdfReader
|
| 1017 |
from io import BytesIO
|
| 1018 |
-
|
| 1019 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1020 |
original_reader = PdfReader(BytesIO(original_pdf_bytes))
|
| 1021 |
writer = PdfWriter()
|
| 1022 |
-
|
| 1023 |
-
|
| 1024 |
for page_num, page in enumerate(original_reader.pages):
|
| 1025 |
-
|
| 1026 |
-
page_width = float(page.mediabox.width)
|
| 1027 |
page_height = float(page.mediabox.height)
|
| 1028 |
-
|
| 1029 |
-
# Create overlay canvas for annotations
|
| 1030 |
packet = BytesIO()
|
| 1031 |
c = canvas.Canvas(packet, pagesize=(page_width, page_height))
|
| 1032 |
-
|
| 1033 |
-
# Draw tickmarks for MCQ questions
|
| 1034 |
-
# Position marks along the right margin
|
| 1035 |
-
if mcq_results:
|
| 1036 |
-
y_start = page_height - 50
|
| 1037 |
-
y_spacing = 30
|
| 1038 |
-
|
| 1039 |
-
# Calculate which questions to show on this page
|
| 1040 |
-
# (show first few on first page, rest on subsequent pages)
|
| 1041 |
-
marks_per_page = int((page_height - 100) / y_spacing)
|
| 1042 |
-
|
| 1043 |
-
start_idx = page_num * marks_per_page
|
| 1044 |
-
end_idx = min(start_idx + marks_per_page, len(mcq_results))
|
| 1045 |
-
|
| 1046 |
-
for i in range(start_idx, end_idx):
|
| 1047 |
-
result = mcq_results[i]
|
| 1048 |
-
qid = result.get('qid', f'Q{i+1}')
|
| 1049 |
-
is_correct = result.get('correct', False)
|
| 1050 |
-
|
| 1051 |
-
y_pos = y_start - ((i - start_idx) * y_spacing)
|
| 1052 |
-
x_pos = page_width - 60
|
| 1053 |
-
|
| 1054 |
-
# Draw marks based on question type and correctness
|
| 1055 |
-
# Three states:
|
| 1056 |
-
# unattempted=True β plain CIRCLE (orange) β question was skipped
|
| 1057 |
-
# correct=True β TICK β (green) β answered correctly
|
| 1058 |
-
# correct=False β CROSS β (red) β answered incorrectly
|
| 1059 |
-
is_unattempted = result.get('unattempted', False)
|
| 1060 |
-
|
| 1061 |
-
if is_unattempted:
|
| 1062 |
-
# Plain circle β unattempted question
|
| 1063 |
-
c.setStrokeColor(colors.Color(1.0, 0.55, 0.0)) # orange
|
| 1064 |
-
c.setFillColor(colors.Color(1.0, 0.55, 0.0))
|
| 1065 |
-
c.setLineWidth(2.5)
|
| 1066 |
-
c.circle(x_pos, y_pos, 12, fill=0)
|
| 1067 |
-
# No symbol inside β the empty circle IS the mark
|
| 1068 |
-
|
| 1069 |
-
elif question_type == "narrative":
|
| 1070 |
-
# Narrative: green tick for Verified, red circle for others
|
| 1071 |
-
if is_correct:
|
| 1072 |
-
c.setStrokeColor(colors.green)
|
| 1073 |
-
c.setFillColor(colors.green)
|
| 1074 |
-
c.setLineWidth(2)
|
| 1075 |
-
c.circle(x_pos, y_pos, 12, fill=0)
|
| 1076 |
-
c.setFont("Helvetica-Bold", 14)
|
| 1077 |
-
c.drawString(x_pos - 5, y_pos - 5, "β")
|
| 1078 |
-
elif status == "Partial":
|
| 1079 |
-
c.setStrokeColor(colors.orange)
|
| 1080 |
-
c.setFillColor(colors.orange)
|
| 1081 |
-
c.setLineWidth(2)
|
| 1082 |
-
c.circle(x_pos, y_pos, 12, fill=0)
|
| 1083 |
-
else:
|
| 1084 |
-
c.setStrokeColor(colors.red)
|
| 1085 |
-
c.setFillColor(colors.red)
|
| 1086 |
-
c.setLineWidth(2)
|
| 1087 |
-
c.circle(x_pos, y_pos, 12, fill=0)
|
| 1088 |
|
| 1089 |
-
|
| 1090 |
-
|
| 1091 |
-
|
| 1092 |
-
|
| 1093 |
-
|
| 1094 |
-
|
| 1095 |
-
|
| 1096 |
-
|
| 1097 |
-
|
| 1098 |
-
|
| 1099 |
-
|
| 1100 |
-
|
| 1101 |
-
|
| 1102 |
-
|
| 1103 |
-
|
| 1104 |
-
|
| 1105 |
-
|
| 1106 |
-
|
| 1107 |
-
|
| 1108 |
-
|
| 1109 |
-
|
| 1110 |
-
|
| 1111 |
-
|
| 1112 |
-
|
| 1113 |
-
|
| 1114 |
-
|
| 1115 |
-
|
| 1116 |
-
|
| 1117 |
-
|
| 1118 |
-
|
| 1119 |
-
|
| 1120 |
-
|
| 1121 |
-
|
| 1122 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1123 |
if page_num == 0:
|
| 1124 |
-
|
| 1125 |
-
c.setFillColor(colors.
|
| 1126 |
-
c.rect(0, page_height -
|
| 1127 |
-
|
| 1128 |
-
# Draw status circle and text - LARGER FONT
|
| 1129 |
-
c.setFillColor(colors.black)
|
| 1130 |
-
c.setFont("Helvetica-Bold", 20)
|
| 1131 |
-
|
| 1132 |
-
# Determine circle color based on status
|
| 1133 |
if status == "Verified":
|
| 1134 |
-
|
| 1135 |
elif status == "Partial":
|
| 1136 |
-
|
| 1137 |
else:
|
| 1138 |
-
|
| 1139 |
-
|
| 1140 |
-
|
| 1141 |
-
c.
|
| 1142 |
-
c.
|
| 1143 |
-
c.
|
| 1144 |
-
|
| 1145 |
-
|
| 1146 |
-
# Draw status text
|
| 1147 |
-
c.setFillColor(status_circle_color)
|
| 1148 |
-
c.drawString(45, page_height - 30, f"Status: {status}")
|
| 1149 |
-
|
| 1150 |
c.setFillColor(colors.black)
|
| 1151 |
-
c.setFont("Helvetica-Bold",
|
| 1152 |
-
c.drawString(
|
| 1153 |
-
c.drawString(
|
| 1154 |
-
|
| 1155 |
-
|
| 1156 |
-
|
| 1157 |
-
|
| 1158 |
-
|
| 1159 |
-
|
| 1160 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1161 |
if question_type == "narrative":
|
| 1162 |
-
c.drawString(
|
|
|
|
| 1163 |
else:
|
| 1164 |
-
c.drawString(
|
| 1165 |
-
|
| 1166 |
-
|
| 1167 |
-
|
|
|
|
|
|
|
| 1168 |
c.setFont("Helvetica", 9)
|
| 1169 |
-
c.setFillColor(colors.
|
| 1170 |
-
c.drawString(
|
| 1171 |
-
c.setFillColor(colors.
|
| 1172 |
-
c.drawString(
|
| 1173 |
c.setFillColor(colors.Color(1.0, 0.55, 0.0))
|
| 1174 |
-
c.drawString(
|
| 1175 |
-
|
| 1176 |
c.save()
|
| 1177 |
packet.seek(0)
|
| 1178 |
-
|
| 1179 |
-
# Merge overlay with original page
|
| 1180 |
overlay_reader = PdfReader(packet)
|
| 1181 |
if overlay_reader.pages:
|
| 1182 |
page.merge_page(overlay_reader.pages[0])
|
| 1183 |
-
|
| 1184 |
writer.add_page(page)
|
| 1185 |
-
|
| 1186 |
-
# Write the final PDF
|
| 1187 |
output = BytesIO()
|
| 1188 |
writer.write(output)
|
| 1189 |
output.seek(0)
|
| 1190 |
return output.read()
|
| 1191 |
-
|
| 1192 |
except Exception as e:
|
| 1193 |
print(f"[ERROR] Failed to create annotated PDF: {e}")
|
| 1194 |
return original_pdf_bytes
|
| 1195 |
|
| 1196 |
-
|
| 1197 |
async def extract_text_from_upload(file: UploadFile) -> Dict[str, Any]:
|
| 1198 |
filename = getattr(file, "filename", "") or "upload"
|
| 1199 |
content_type = (getattr(file, "content_type", "") or "").lower()
|
|
|
|
| 263 |
|
| 264 |
|
| 265 |
def _is_rate_limit_error(error_msg: str) -> bool:
|
| 266 |
+
"""Check if the error is a rate limit error (429) or service unavailable (503)."""
|
| 267 |
if not error_msg:
|
| 268 |
return False
|
| 269 |
lower = error_msg.lower()
|
| 270 |
+
return ("429" in lower or
|
| 271 |
+
"503" in lower or
|
| 272 |
+
"rate_limit" in lower or
|
| 273 |
+
"resource_exhausted" in lower or
|
| 274 |
+
"rate limit" in lower or
|
| 275 |
+
"unavailable" in lower)
|
| 276 |
|
| 277 |
|
| 278 |
def _rotate_to_next_key() -> bool:
|
|
|
|
| 991 |
return {"text": extracted, "used_ocr": used_ocr, "needs_ocr": False}
|
| 992 |
|
| 993 |
|
| 994 |
+
def get_question_positions_from_pdf(pdf_bytes: bytes) -> Dict[int, List[Dict]]:
|
| 995 |
+
"""
|
| 996 |
+
Detect question number positions in a PDF.
|
| 997 |
+
Strategy 1: pypdf text-layer visitor (fast, for PDFs with text layer).
|
| 998 |
+
Strategy 2: pdf2image + pytesseract OCR (for image-based PDFs).
|
| 999 |
+
Returns dict mapping page_num -> list of {qid, y_pos, x_pos}
|
| 1000 |
+
where y_pos/x_pos are in PDF coordinate units (origin at bottom-left).
|
| 1001 |
+
"""
|
| 1002 |
+
try:
|
| 1003 |
+
from pypdf import PdfReader
|
| 1004 |
+
from io import BytesIO
|
| 1005 |
+
|
| 1006 |
+
reader = PdfReader(BytesIO(pdf_bytes))
|
| 1007 |
+
question_positions: Dict[int, List[Dict]] = {}
|
| 1008 |
+
|
| 1009 |
+
def _normalise_ocr_qid(token: str):
|
| 1010 |
+
t = token.strip().rstrip('.')
|
| 1011 |
+
m = re.match(r'^[Qq]\s*(\d+)$', t)
|
| 1012 |
+
if m:
|
| 1013 |
+
return f"Q{m.group(1)}"
|
| 1014 |
+
ocr_map = {'i': '1', 'I': '1', 'l': '1', 'o': '0', 'O': '0',
|
| 1015 |
+
'z': '2', 'Z': '2', 's': '5', 'S': '5', 'g': '9'}
|
| 1016 |
+
m2 = re.match(r'^[Qq]([a-zA-Z\d])$', t)
|
| 1017 |
+
if m2:
|
| 1018 |
+
digit = ocr_map.get(m2.group(1), m2.group(1))
|
| 1019 |
+
if digit.isdigit():
|
| 1020 |
+
return f"Q{digit}"
|
| 1021 |
+
return None
|
| 1022 |
+
|
| 1023 |
+
for page_num, page in enumerate(reader.pages):
|
| 1024 |
+
page_height = float(page.mediabox.height) if hasattr(page.mediabox, 'height') else 792
|
| 1025 |
+
page_width = float(page.mediabox.width) if hasattr(page.mediabox, 'width') else 595
|
| 1026 |
+
found: List[Dict] = []
|
| 1027 |
+
existing_qids: set = set()
|
| 1028 |
+
|
| 1029 |
+
# Strategy 1: text layer
|
| 1030 |
+
try:
|
| 1031 |
+
parts = []
|
| 1032 |
+
def _visitor(text, cm, tm, font_dict, font_size):
|
| 1033 |
+
if text and text.strip():
|
| 1034 |
+
x = float(tm[4]) if tm else 0
|
| 1035 |
+
y = float(tm[5]) if tm else 0
|
| 1036 |
+
parts.append((text.strip(), x, y))
|
| 1037 |
+
page.extract_text(visitor_text=_visitor)
|
| 1038 |
+
tl_patterns = [
|
| 1039 |
+
re.compile(r'\bQ\s*(\d+)\b', re.IGNORECASE),
|
| 1040 |
+
re.compile(r'\bQuestion\s*(\d+)\b', re.IGNORECASE),
|
| 1041 |
+
re.compile(r'^(\d+)[.):\s]'),
|
| 1042 |
+
]
|
| 1043 |
+
for text_frag, x, y in parts:
|
| 1044 |
+
for pat in tl_patterns:
|
| 1045 |
+
m = pat.match(text_frag)
|
| 1046 |
+
if m:
|
| 1047 |
+
qid = f"Q{m.group(1)}"
|
| 1048 |
+
if qid not in existing_qids:
|
| 1049 |
+
existing_qids.add(qid)
|
| 1050 |
+
found.append({'qid': qid, 'y_pos': y, 'x_pos': x})
|
| 1051 |
+
break
|
| 1052 |
+
except Exception as tl_err:
|
| 1053 |
+
print(f"[WARN] text-layer page {page_num}: {tl_err}")
|
| 1054 |
+
|
| 1055 |
+
# Strategy 2: OCR fallback
|
| 1056 |
+
if not found:
|
| 1057 |
+
try:
|
| 1058 |
+
from pdf2image import convert_from_bytes as _c2b
|
| 1059 |
+
import pytesseract
|
| 1060 |
+
rendered = _c2b(pdf_bytes, dpi=72, first_page=page_num+1, last_page=page_num+1)
|
| 1061 |
+
if rendered:
|
| 1062 |
+
img = rendered[0]
|
| 1063 |
+
img_w, img_h = img.size
|
| 1064 |
+
scale_x = page_width / img_w
|
| 1065 |
+
scale_y = page_height / img_h
|
| 1066 |
+
ocr_data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
|
| 1067 |
+
for i, token in enumerate(ocr_data['text']):
|
| 1068 |
+
if not token or not token.strip():
|
| 1069 |
+
continue
|
| 1070 |
+
if int(ocr_data['conf'][i]) < 20:
|
| 1071 |
+
continue
|
| 1072 |
+
dm = re.match(r'^[Qq]\s*(\d+)[.:]?$', token.strip())
|
| 1073 |
+
if dm:
|
| 1074 |
+
qid = f"Q{dm.group(1)}"
|
| 1075 |
+
else:
|
| 1076 |
+
qid = _normalise_ocr_qid(token)
|
| 1077 |
+
if qid and qid not in existing_qids:
|
| 1078 |
+
img_x = ocr_data['left'][i]
|
| 1079 |
+
img_y = ocr_data['top'][i]
|
| 1080 |
+
img_h_tok = ocr_data['height'][i]
|
| 1081 |
+
pdf_x = img_x * scale_x
|
| 1082 |
+
pdf_y = page_height - (img_y + img_h_tok * 0.5) * scale_y
|
| 1083 |
+
existing_qids.add(qid)
|
| 1084 |
+
found.append({'qid': qid, 'y_pos': pdf_y, 'x_pos': pdf_x})
|
| 1085 |
+
except Exception as ocr_err:
|
| 1086 |
+
print(f"[WARN] OCR fallback page {page_num}: {ocr_err}")
|
| 1087 |
+
|
| 1088 |
+
if found:
|
| 1089 |
+
found.sort(key=lambda d: -d['y_pos'])
|
| 1090 |
+
question_positions[page_num] = found
|
| 1091 |
+
|
| 1092 |
+
return question_positions
|
| 1093 |
+
|
| 1094 |
+
except Exception as e:
|
| 1095 |
+
print(f"[WARN] Failed to get question positions: {e}")
|
| 1096 |
+
return {}
|
| 1097 |
+
|
| 1098 |
def create_annotated_pdf(
|
| 1099 |
original_pdf_bytes: bytes,
|
| 1100 |
mcq_results: List[Dict[str, Any]] = None,
|
|
|
|
| 1104 |
question_type: str = "mcq"
|
| 1105 |
) -> bytes:
|
| 1106 |
"""
|
| 1107 |
+
Annotate every question number found in the PDF with a coloured mark:
|
| 1108 |
+
Correct -> filled green circle + white tick (β)
|
| 1109 |
+
Wrong -> filled red circle + white cross (β)
|
| 1110 |
+
Unattempted -> hollow orange circle (β)
|
| 1111 |
+
|
| 1112 |
+
Any question detected in the PDF that has NO entry in mcq_results is
|
| 1113 |
+
automatically treated as unattempted (hollow orange circle).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1114 |
"""
|
| 1115 |
if not reportlab:
|
| 1116 |
print("[WARN] reportlab not available, returning original PDF")
|
| 1117 |
return original_pdf_bytes
|
| 1118 |
+
|
| 1119 |
try:
|
| 1120 |
from pypdf import PdfWriter, PdfReader
|
| 1121 |
from io import BytesIO
|
| 1122 |
+
|
| 1123 |
+
# ββ Detect question positions ββββββββββββββββββββββββββββββββββββββ
|
| 1124 |
+
question_positions = get_question_positions_from_pdf(original_pdf_bytes)
|
| 1125 |
+
print(f"[INFO] Detected question positions: {question_positions}")
|
| 1126 |
+
|
| 1127 |
+
# Build lookup: qid -> (page_num, pdf_y, pdf_x)
|
| 1128 |
+
qid_location: Dict[str, tuple] = {}
|
| 1129 |
+
for pg, items in question_positions.items():
|
| 1130 |
+
for item in items:
|
| 1131 |
+
qid_location[item["qid"]] = (pg, item["y_pos"], item["x_pos"])
|
| 1132 |
+
|
| 1133 |
+
# Build a quick lookup from mcq_results: qid -> result dict
|
| 1134 |
+
results_by_qid: Dict[str, Dict] = {}
|
| 1135 |
+
for r in (mcq_results or []):
|
| 1136 |
+
qid = r.get("qid", "")
|
| 1137 |
+
if qid:
|
| 1138 |
+
results_by_qid[qid] = r
|
| 1139 |
+
|
| 1140 |
+
# ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 1141 |
+
def _draw_mark(c, x, y, is_correct, is_unattempted, radius=14):
|
| 1142 |
+
"""Draw a mark symbol centred at PDF coordinates (x, y)."""
|
| 1143 |
+
if is_unattempted:
|
| 1144 |
+
c.setStrokeColor(colors.Color(1.0, 0.55, 0.0))
|
| 1145 |
+
c.setFillColor(colors.Color(1.0, 0.55, 0.0))
|
| 1146 |
+
c.setLineWidth(3)
|
| 1147 |
+
c.circle(x, y, radius, fill=0)
|
| 1148 |
+
elif is_correct is None:
|
| 1149 |
+
c.setStrokeColor(colors.grey)
|
| 1150 |
+
c.setFillColor(colors.grey)
|
| 1151 |
+
c.setLineWidth(2)
|
| 1152 |
+
c.circle(x, y, radius, fill=0)
|
| 1153 |
+
c.setFont("Helvetica-Bold", int(radius * 0.9))
|
| 1154 |
+
c.drawString(x - radius * 0.3, y - radius * 0.4, "?")
|
| 1155 |
+
elif is_correct:
|
| 1156 |
+
c.setStrokeColor(colors.Color(0.0, 0.65, 0.0))
|
| 1157 |
+
c.setFillColor(colors.Color(0.0, 0.65, 0.0))
|
| 1158 |
+
c.setLineWidth(2)
|
| 1159 |
+
c.circle(x, y, radius, fill=1)
|
| 1160 |
+
c.setFillColor(colors.white)
|
| 1161 |
+
c.setFont("Helvetica-Bold", int(radius * 1.5))
|
| 1162 |
+
c.drawString(x - radius * 0.5, y - radius * 0.45, "\u2713")
|
| 1163 |
+
else:
|
| 1164 |
+
c.setStrokeColor(colors.Color(0.85, 0.1, 0.1))
|
| 1165 |
+
c.setFillColor(colors.Color(0.85, 0.1, 0.1))
|
| 1166 |
+
c.setLineWidth(2)
|
| 1167 |
+
c.circle(x, y, radius, fill=1)
|
| 1168 |
+
c.setFillColor(colors.white)
|
| 1169 |
+
c.setFont("Helvetica-Bold", int(radius * 1.5))
|
| 1170 |
+
c.drawString(x - radius * 0.5, y - radius * 0.45, "\u2717")
|
| 1171 |
+
|
| 1172 |
+
MARK_RADIUS = 14
|
| 1173 |
+
# Mark is drawn just to the LEFT of the detected Q-number text
|
| 1174 |
+
# x_pos from detection = left edge of "Q1." text; offset left by radius+4
|
| 1175 |
+
MARK_X_OFFSET = -(MARK_RADIUS + 4)
|
| 1176 |
+
|
| 1177 |
original_reader = PdfReader(BytesIO(original_pdf_bytes))
|
| 1178 |
writer = PdfWriter()
|
| 1179 |
+
total_pages = len(original_reader.pages)
|
| 1180 |
+
|
| 1181 |
for page_num, page in enumerate(original_reader.pages):
|
| 1182 |
+
page_width = float(page.mediabox.width)
|
|
|
|
| 1183 |
page_height = float(page.mediabox.height)
|
| 1184 |
+
|
|
|
|
| 1185 |
packet = BytesIO()
|
| 1186 |
c = canvas.Canvas(packet, pagesize=(page_width, page_height))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1187 |
|
| 1188 |
+
# ββ Draw a mark for every detected question on this page ββββββββ
|
| 1189 |
+
page_detected = question_positions.get(page_num, [])
|
| 1190 |
+
|
| 1191 |
+
for item in page_detected:
|
| 1192 |
+
qid = item["qid"]
|
| 1193 |
+
y_pos = item["y_pos"]
|
| 1194 |
+
x_pos = item["x_pos"]
|
| 1195 |
+
|
| 1196 |
+
# Get result for this qid (default = unattempted if not in results)
|
| 1197 |
+
result = results_by_qid.get(qid)
|
| 1198 |
+
is_unattempted = True # default: no result entry = unattempted
|
| 1199 |
+
is_correct = False
|
| 1200 |
+
|
| 1201 |
+
if result is not None:
|
| 1202 |
+
is_unattempted = bool(result.get("unattempted", False))
|
| 1203 |
+
is_correct = result.get("correct", False)
|
| 1204 |
+
# If no explicit unattempted flag but chosen is empty -> unattempted
|
| 1205 |
+
if not is_unattempted and not result.get("chosen", ""):
|
| 1206 |
+
is_unattempted = True
|
| 1207 |
+
|
| 1208 |
+
mark_x = max(MARK_RADIUS + 2, x_pos + MARK_X_OFFSET)
|
| 1209 |
+
mark_y = y_pos + MARK_RADIUS * 0.3
|
| 1210 |
+
_draw_mark(c, mark_x, mark_y, is_correct, is_unattempted, MARK_RADIUS)
|
| 1211 |
+
|
| 1212 |
+
# ββ Fallback marks for results whose qid was NOT detected βββββββ
|
| 1213 |
+
# (edge case: question number in results but OCR/text-layer missed it)
|
| 1214 |
+
undetected_results = [r for r in (mcq_results or [])
|
| 1215 |
+
if r.get("qid") not in qid_location]
|
| 1216 |
+
if undetected_results:
|
| 1217 |
+
per_page = max(1, (len(undetected_results) + total_pages - 1) // total_pages)
|
| 1218 |
+
start_idx = page_num * per_page
|
| 1219 |
+
page_slice = undetected_results[start_idx: start_idx + per_page]
|
| 1220 |
+
y_start = page_height - 100
|
| 1221 |
+
y_spacing = max(20, (page_height - 130) / max(1, per_page))
|
| 1222 |
+
for i, result in enumerate(page_slice):
|
| 1223 |
+
is_unattempted = bool(result.get("unattempted", False))
|
| 1224 |
+
if not is_unattempted and not result.get("chosen", ""):
|
| 1225 |
+
is_unattempted = True
|
| 1226 |
+
is_correct = result.get("correct", False)
|
| 1227 |
+
y_pos = y_start - i * y_spacing
|
| 1228 |
+
if y_pos < 30:
|
| 1229 |
+
break
|
| 1230 |
+
_draw_mark(c, 18, y_pos, is_correct, is_unattempted, 9)
|
| 1231 |
+
|
| 1232 |
+
# ββ Header bar (first page) βββββββββββββββββββββββββββββββββββββ
|
| 1233 |
if page_num == 0:
|
| 1234 |
+
header_h = 58
|
| 1235 |
+
c.setFillColor(colors.Color(0.93, 0.93, 0.93))
|
| 1236 |
+
c.rect(0, page_height - header_h, page_width, header_h, fill=1, stroke=0)
|
| 1237 |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1238 |
if status == "Verified":
|
| 1239 |
+
sc = colors.Color(0.0, 0.65, 0.0)
|
| 1240 |
elif status == "Partial":
|
| 1241 |
+
sc = colors.Color(1.0, 0.55, 0.0)
|
| 1242 |
else:
|
| 1243 |
+
sc = colors.Color(0.85, 0.1, 0.1)
|
| 1244 |
+
|
| 1245 |
+
c.setFillColor(sc); c.setStrokeColor(sc)
|
| 1246 |
+
c.circle(18, page_height - 22, 8, fill=1)
|
| 1247 |
+
c.setFont("Helvetica-Bold", 14)
|
| 1248 |
+
c.drawString(34, page_height - 27, f"Status: {status}")
|
| 1249 |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1250 |
c.setFillColor(colors.black)
|
| 1251 |
+
c.setFont("Helvetica-Bold", 14)
|
| 1252 |
+
c.drawString(page_width * 0.42, page_height - 27, f"Score: {match_percentage}%")
|
| 1253 |
+
c.drawString(page_width * 0.72, page_height - 27, f"Level: {student_level}")
|
| 1254 |
+
|
| 1255 |
+
if mcq_results or page_detected:
|
| 1256 |
+
# Count across ALL detected questions (not just those in results)
|
| 1257 |
+
all_qids_detected = [item["qid"] for pg_items in question_positions.values()
|
| 1258 |
+
for item in pg_items]
|
| 1259 |
+
correct_count = sum(1 for r in (mcq_results or []) if r.get("correct"))
|
| 1260 |
+
wrong_count = sum(1 for r in (mcq_results or [])
|
| 1261 |
+
if not r.get("correct") and not r.get("unattempted")
|
| 1262 |
+
and r.get("chosen", ""))
|
| 1263 |
+
unattempted_count = len(all_qids_detected) - correct_count - wrong_count
|
| 1264 |
+
total_count = len(all_qids_detected) or len(mcq_results or [])
|
| 1265 |
+
|
| 1266 |
+
c.setFont("Helvetica-Bold", 11)
|
| 1267 |
if question_type == "narrative":
|
| 1268 |
+
c.drawString(18, page_height - 46,
|
| 1269 |
+
f"Narrative Evaluation: Score {match_percentage}%")
|
| 1270 |
else:
|
| 1271 |
+
c.drawString(18, page_height - 46,
|
| 1272 |
+
f"MCQ: {correct_count} correct | "
|
| 1273 |
+
f"{wrong_count} wrong | "
|
| 1274 |
+
f"{unattempted_count} unattempted (of {total_count})")
|
| 1275 |
+
|
| 1276 |
+
lx = page_width - 240
|
| 1277 |
c.setFont("Helvetica", 9)
|
| 1278 |
+
c.setFillColor(colors.Color(0.0, 0.65, 0.0))
|
| 1279 |
+
c.drawString(lx, page_height - 46, "\u2713 Correct")
|
| 1280 |
+
c.setFillColor(colors.Color(0.85, 0.1, 0.1))
|
| 1281 |
+
c.drawString(lx + 68, page_height - 46, "\u2717 Wrong")
|
| 1282 |
c.setFillColor(colors.Color(1.0, 0.55, 0.0))
|
| 1283 |
+
c.drawString(lx + 130, page_height - 46, "\u25cb Unattempted")
|
| 1284 |
+
|
| 1285 |
c.save()
|
| 1286 |
packet.seek(0)
|
|
|
|
|
|
|
| 1287 |
overlay_reader = PdfReader(packet)
|
| 1288 |
if overlay_reader.pages:
|
| 1289 |
page.merge_page(overlay_reader.pages[0])
|
|
|
|
| 1290 |
writer.add_page(page)
|
| 1291 |
+
|
|
|
|
| 1292 |
output = BytesIO()
|
| 1293 |
writer.write(output)
|
| 1294 |
output.seek(0)
|
| 1295 |
return output.read()
|
| 1296 |
+
|
| 1297 |
except Exception as e:
|
| 1298 |
print(f"[ERROR] Failed to create annotated PDF: {e}")
|
| 1299 |
return original_pdf_bytes
|
| 1300 |
|
|
|
|
| 1301 |
async def extract_text_from_upload(file: UploadFile) -> Dict[str, Any]:
|
| 1302 |
filename = getattr(file, "filename", "") or "upload"
|
| 1303 |
content_type = (getattr(file, "content_type", "") or "").lower()
|