SuriRaja commited on
Commit
9a392e7
·
verified ·
1 Parent(s): a449a02

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +155 -41
app.py CHANGED
@@ -10,7 +10,6 @@ import pandas as pd
10
  from PIL import Image, ImageDraw, ImageFont
11
 
12
  import cv2
13
- from urllib.parse import urlparse, parse_qs
14
 
15
  # Optional: YOLO for phone detection
16
  YOLO_MODEL = None
@@ -20,20 +19,17 @@ def load_yolo():
20
  if YOLO_MODEL is None:
21
  try:
22
  from ultralytics import YOLO
23
- YOLO_MODEL = YOLO('yolov8n.pt') # lightweight pretrained model
24
  except Exception as e:
25
  st.warning(f"YOLO model could not be loaded: {e}")
26
  YOLO_MODEL = None
27
  return YOLO_MODEL
28
 
29
- # ✅ FIXED iou function
30
  def iou(boxA, boxB) -> float:
31
- # boxes in [x1,y1,x2,y2]
32
  xA = max(boxA[0], boxB[0])
33
  yA = max(boxA[1], boxB[1])
34
  xB = min(boxA[2], boxB[2])
35
- yB = min(boxA[3], boxB[3]) # ✅ fixed
36
-
37
  interW = max(0, xB - xA)
38
  interH = max(0, yB - yA)
39
  interArea = interW * interH
@@ -44,6 +40,7 @@ def iou(boxA, boxB) -> float:
44
 
45
  def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
46
  det = cv2.QRCodeDetector()
 
47
  retval, data_list, points, _ = det.detectAndDecodeMulti(image_np)
48
  results = []
49
  if points is None:
@@ -52,9 +49,11 @@ def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
52
  pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
53
  x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
54
  x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
55
- results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],
56
- "data": data_single,
57
- "points": pts.tolist()})
 
 
58
  return results
59
 
60
  if isinstance(data_list, (list, tuple)):
@@ -67,9 +66,11 @@ def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
67
  x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
68
  x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
69
  payload = decoded_list[i] if i < len(decoded_list) else ""
70
- results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],
71
- "data": payload,
72
- "points": pts.tolist()})
 
 
73
  return results
74
 
75
  def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
@@ -80,11 +81,12 @@ def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[fl
80
  bboxes = []
81
  for r in results:
82
  for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
83
- if int(cls) == 67: # COCO: class 67 = cell phone
84
  bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
85
  return bboxes
86
 
87
- def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_boxes: List[List[float]], flags: Dict[int, List[str]]) -> Image.Image:
 
88
  img = pil_img.copy().convert("RGB")
89
  draw = ImageDraw.Draw(img)
90
  try:
@@ -108,13 +110,13 @@ def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_b
108
  draw.text((qr["bbox"][0], qr["bbox"][1]-12), label, fill=color, font=font)
109
 
110
  for i, msgs in flags.items():
111
- if not msgs:
112
- continue
113
  x1, y1, x2, y2 = qr_boxes[i]["bbox"]
114
  y_text = y2 + 4
115
  for msg in msgs:
116
  draw.text((x1, y_text), f"[{msg}]", fill=(255,0,0), font=font)
117
  y_text += 12
 
118
  return img
119
 
120
  def unpack_zip(uploaded_file, workdir):
@@ -139,43 +141,23 @@ def read_approved_list(file) -> List[str]:
139
  vals = df.iloc[:,0].dropna().astype(str).tolist()
140
  else:
141
  content = file.read().decode("utf-8", errors="ignore")
 
142
  vals = [line.strip() for line in content.splitlines() if line.strip()]
143
  return [v.strip() for v in vals if v.strip()]
144
  except Exception as e:
145
  st.error(f"Failed to parse approved list: {e}")
146
  return []
147
 
148
- # ✅ FIXED payload normalization
149
- def normalize_payload(payload: str) -> str:
150
- if not payload:
151
- return ""
152
- p = payload.strip().lower()
153
- if p.startswith("upi://"):
154
- try:
155
- parsed = urlparse(p)
156
- qs = parse_qs(parsed.query)
157
- if "pa" in qs:
158
- return qs["pa"][0].strip().lower()
159
- except Exception:
160
- pass
161
- for prefix in ["upi://", "http://", "https://"]:
162
- if p.startswith(prefix):
163
- p = p[len(prefix):]
164
- return p
165
-
166
  def match_payload(payload: str, approved: List[str]) -> bool:
167
  if not payload:
168
  return False
169
- norm_payload = normalize_payload(payload)
170
  for a in approved:
171
- norm_a = normalize_payload(a)
172
- if not norm_a:
173
- continue
174
- if norm_a in norm_payload or norm_payload in norm_a:
175
  return True
176
  return False
177
 
178
- # ---------------- STREAMLIT UI (unchanged) -----------------
179
  st.set_page_config(page_title="QR Code Anomaly Scanner", layout="wide")
180
  st.title("🕵️ QR Code Anomaly Scanner (Retail Store 360° CCTV Frames)")
181
 
@@ -188,6 +170,7 @@ The app will:
188
  - **UNAPPROVED_QR**: decoded payload not in the approved list.
189
  - **ON_PHONE**: QR bounding box overlaps a detected phone.
190
  - **UNDECODED_QR**: QR detected but not decodable.
 
191
  """)
192
 
193
  with st.sidebar:
@@ -202,4 +185,135 @@ with st.sidebar:
202
 
203
  workdir = tempfile.mkdtemp()
204
 
205
- # (rest of your original processing loop stays exactly the same…)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
  from PIL import Image, ImageDraw, ImageFont
11
 
12
  import cv2
 
13
 
14
  # Optional: YOLO for phone detection
15
  YOLO_MODEL = None
 
19
  if YOLO_MODEL is None:
20
  try:
21
  from ultralytics import YOLO
22
+ YOLO_MODEL = YOLO('yolov8n.pt')
23
  except Exception as e:
24
  st.warning(f"YOLO model could not be loaded: {e}")
25
  YOLO_MODEL = None
26
  return YOLO_MODEL
27
 
 
28
  def iou(boxA, boxB) -> float:
 
29
  xA = max(boxA[0], boxB[0])
30
  yA = max(boxA[1], boxB[1])
31
  xB = min(boxA[2], boxB[2])
32
+ yB = min(boxA[3], boxB[3])
 
33
  interW = max(0, xB - xA)
34
  interH = max(0, yB - yA)
35
  interArea = interW * interH
 
40
 
41
  def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
42
  det = cv2.QRCodeDetector()
43
+ # ✅ Correct unpack
44
  retval, data_list, points, _ = det.detectAndDecodeMulti(image_np)
45
  results = []
46
  if points is None:
 
49
  pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
50
  x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
51
  x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
52
+ results.append({
53
+ "bbox": [float(x1), float(y1), float(x2), float(y2)],
54
+ "data": data_single,
55
+ "points": pts.tolist()
56
+ })
57
  return results
58
 
59
  if isinstance(data_list, (list, tuple)):
 
66
  x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
67
  x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
68
  payload = decoded_list[i] if i < len(decoded_list) else ""
69
+ results.append({
70
+ "bbox": [float(x1), float(y1), float(x2), float(y2)],
71
+ "data": payload,
72
+ "points": pts.tolist()
73
+ })
74
  return results
75
 
76
  def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
 
81
  bboxes = []
82
  for r in results:
83
  for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
84
+ if int(cls) == 67: # COCO: "cell phone"
85
  bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
86
  return bboxes
87
 
88
+ def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]],
89
+ phone_boxes: List[List[float]], flags: Dict[int, List[str]]) -> Image.Image:
90
  img = pil_img.copy().convert("RGB")
91
  draw = ImageDraw.Draw(img)
92
  try:
 
110
  draw.text((qr["bbox"][0], qr["bbox"][1]-12), label, fill=color, font=font)
111
 
112
  for i, msgs in flags.items():
113
+ if not msgs: continue
 
114
  x1, y1, x2, y2 = qr_boxes[i]["bbox"]
115
  y_text = y2 + 4
116
  for msg in msgs:
117
  draw.text((x1, y_text), f"[{msg}]", fill=(255,0,0), font=font)
118
  y_text += 12
119
+
120
  return img
121
 
122
  def unpack_zip(uploaded_file, workdir):
 
141
  vals = df.iloc[:,0].dropna().astype(str).tolist()
142
  else:
143
  content = file.read().decode("utf-8", errors="ignore")
144
+ file.seek(0) # ✅ reset pointer so file can be reused
145
  vals = [line.strip() for line in content.splitlines() if line.strip()]
146
  return [v.strip() for v in vals if v.strip()]
147
  except Exception as e:
148
  st.error(f"Failed to parse approved list: {e}")
149
  return []
150
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
151
  def match_payload(payload: str, approved: List[str]) -> bool:
152
  if not payload:
153
  return False
154
+ p = payload.strip().lower()
155
  for a in approved:
156
+ a_norm = a.strip().lower()
157
+ if a_norm in p or p in a_norm:
 
 
158
  return True
159
  return False
160
 
 
161
  st.set_page_config(page_title="QR Code Anomaly Scanner", layout="wide")
162
  st.title("🕵️ QR Code Anomaly Scanner (Retail Store 360° CCTV Frames)")
163
 
 
170
  - **UNAPPROVED_QR**: decoded payload not in the approved list.
171
  - **ON_PHONE**: QR bounding box overlaps a detected phone.
172
  - **UNDECODED_QR**: QR detected but not decodable.
173
+ Download the annotated images and a consolidated CSV report at the end.
174
  """)
175
 
176
  with st.sidebar:
 
185
 
186
  workdir = tempfile.mkdtemp()
187
 
188
+ if process_btn:
189
+ if not approved_file:
190
+ st.error("Please upload the Approved QR List first.")
191
+ st.stop()
192
+
193
+ approved_list = read_approved_list(approved_file)
194
+ if not approved_list:
195
+ st.warning("Approved list is empty or failed to parse. All decoded QR payloads will be treated as UNAPPROVED.")
196
+ else:
197
+ st.success(f"Loaded {len(approved_list)} approved entries.")
198
+
199
+ img_paths = []
200
+ for f in frames or []:
201
+ out = os.path.join(workdir, f.name)
202
+ with open(out, "wb") as g:
203
+ g.write(f.read())
204
+ img_paths.append(out)
205
+ if frames_zip is not None:
206
+ img_paths.extend(unpack_zip(frames_zip, workdir))
207
+
208
+ img_paths = sorted(set(img_paths))
209
+ if not img_paths:
210
+ st.error("Please upload at least one frame image (or a ZIP).")
211
+ st.stop()
212
+
213
+ if run_phone_detection:
214
+ load_yolo()
215
+
216
+ rows = []
217
+ annotated_dir = os.path.join(workdir, "annotated")
218
+ os.makedirs(annotated_dir, exist_ok=True)
219
+
220
+ progress = st.progress(0.0)
221
+ status = st.empty()
222
+
223
+ for idx, path in enumerate(img_paths):
224
+ status.text(f"Processing {os.path.basename(path)} ({idx+1}/{len(img_paths)})")
225
+ pil = Image.open(path).convert("RGB")
226
+ np_img = np.array(pil)
227
+
228
+ qr_results = detect_qr_opencv(np_img)
229
+ phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
230
+
231
+ flags = {}
232
+ for i, qr in enumerate(qr_results):
233
+ msgs = []
234
+ payload = qr.get("data", "")
235
+ if not payload:
236
+ msgs.append("UNDECODED_QR")
237
+ elif not match_payload(payload, approved_list):
238
+ msgs.append("UNAPPROVED_QR")
239
+ if phone_boxes:
240
+ qb = qr["bbox"]
241
+ for pb in phone_boxes:
242
+ if iou(qb, pb) >= iou_threshold:
243
+ msgs.append("ON_PHONE")
244
+ break
245
+ flags[i] = msgs
246
+
247
+ rows.append({
248
+ "frame": os.path.basename(path),
249
+ "qr_index": i,
250
+ "payload": payload,
251
+ "approved_match": (payload and match_payload(payload, approved_list)),
252
+ "on_phone": ("ON_PHONE" in msgs),
253
+ "undecoded": ("UNDECODED_QR" in msgs),
254
+ "anomalies": "|".join(msgs) if msgs else "",
255
+ "qr_bbox": qr["bbox"],
256
+ "phone_boxes": phone_boxes
257
+ })
258
+
259
+ if not qr_results:
260
+ rows.append({
261
+ "frame": os.path.basename(path),
262
+ "qr_index": -1,
263
+ "payload": "",
264
+ "approved_match": False,
265
+ "on_phone": False,
266
+ "undecoded": False,
267
+ "anomalies": "NO_QR_FOUND",
268
+ "qr_bbox": None,
269
+ "phone_boxes": phone_boxes
270
+ })
271
+
272
+ annotated = annotate_image(pil, qr_results, phone_boxes, flags)
273
+ out_path = os.path.join(annotated_dir, os.path.basename(path))
274
+ annotated.save(out_path)
275
+
276
+ progress.progress((idx+1)/len(img_paths))
277
+
278
+ status.text("Completed.")
279
+ df = pd.DataFrame(rows)
280
+
281
+ st.subheader("Results")
282
+ st.dataframe(df, use_container_width=True)
283
+
284
+ st.markdown("### Summary")
285
+ total_frames = len(img_paths)
286
+ total_qr = int((df["qr_index"] >= 0).sum())
287
+ unapproved = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
288
+ on_phone = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
289
+ undecoded = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
290
+ no_qr = int((df["anomalies"] == "NO_QR_FOUND").sum())
291
+ st.write({
292
+ "frames_processed": total_frames,
293
+ "qr_detections": total_qr,
294
+ "unapproved_qr": unapproved,
295
+ "qr_on_phone": on_phone,
296
+ "undecoded_qr": undecoded,
297
+ "frames_with_no_qr": no_qr
298
+ })
299
+
300
+ csv_bytes = df.to_csv(index=False).encode("utf-8")
301
+ st.download_button("⬇️ Download CSV Report", data=csv_bytes,
302
+ file_name="qr_anomaly_report.csv", mime="text/csv")
303
+
304
+ mem = io.BytesIO()
305
+ with zipfile.ZipFile(mem, mode="w", compression=zipfile.ZIP_DEFLATED) as z:
306
+ for fname in sorted(os.listdir(annotated_dir)):
307
+ z.write(os.path.join(annotated_dir, fname), arcname=fname)
308
+ mem.seek(0)
309
+ st.download_button("⬇️ Download Annotated Images (ZIP)", data=mem.getvalue(),
310
+ file_name="annotated_frames.zip", mime="application/zip")
311
+
312
+ else:
313
+ st.info("Upload inputs on the left and click **Run Scan** to begin.")
314
+ st.markdown("""
315
+ **Tips**
316
+ - Approved list can be **TXT** (one payload per line) or **CSV** (use a `payload` column or first column).
317
+ - For QR-on-phone detection, keep **Detect phones (YOLO)** enabled.
318
+ - Name frames with timestamps to correlate events later.
319
+ """)