SuriRaja commited on
Commit
9cfa247
Β·
verified Β·
1 Parent(s): 77e861a

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +136 -332
app.py CHANGED
@@ -1,9 +1,9 @@
1
-
2
  import io
3
  import os
4
  import zipfile
5
  import tempfile
6
- from typing import List, Tuple, Dict, Any
 
7
 
8
  import streamlit as st
9
  import numpy as np
@@ -14,20 +14,39 @@ import cv2
14
  # Optional: YOLO for phone detection
15
  YOLO_MODEL = None
16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
  def load_yolo():
18
  global YOLO_MODEL
19
  if YOLO_MODEL is None:
20
  try:
21
  from ultralytics import YOLO
22
- # Use lightweight pretrained model; supports "cell phone" class via COCO.
23
- YOLO_MODEL = YOLO("yolov8n.pt") # will use local file if present, or download on first run
24
  except Exception as e:
25
  st.warning(f"YOLO model could not be loaded: {e}")
26
  YOLO_MODEL = None
27
  return YOLO_MODEL
28
 
29
  def iou(boxA, boxB) -> float:
30
- # boxes in [x1,y1,x2,y2]
31
  xA = max(boxA[0], boxB[0])
32
  yA = max(boxA[1], boxB[1])
33
  xB = min(boxA[2], boxB[2])
@@ -40,396 +59,181 @@ def iou(boxA, boxB) -> float:
40
  denom = areaA + areaB - interArea + 1e-6
41
  return interArea / denom
42
 
 
43
  def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
44
- """
45
- Use OpenCV's QRCodeDetector to find and decode QR codes.
46
- Returns list of dicts: {bbox: [x1,y1,x2,y2], data: str, points: np.ndarray}
47
- """
48
  det = cv2.QRCodeDetector()
49
- data, points, _ = det.detectAndDecodeMulti(image_np)
50
  results = []
51
-
52
  if points is None:
53
- # Try single QR fallback
54
  data_single, points_single, _ = det.detectAndDecode(image_np)
55
  if points_single is not None and data_single:
56
  pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
57
- x1, y1 = np.min(pts[:, 0]), np.min(pts[:, 1])
58
- x2, y2 = np.max(pts[:, 0]), np.max(pts[:, 1])
59
- results.append({
60
- "bbox": [float(x1), float(y1), float(x2), float(y2)],
61
- "data": data_single,
62
- "points": pts.tolist()
63
- })
64
  return results
65
-
66
- # points shape: (N,4,2), data is list/tuple of strings (may be '' for undecodable)
67
- if isinstance(data, (list, tuple)):
68
- decoded_list = data
69
- else:
70
- decoded_list = [data] * len(points)
71
-
72
  for i, quad in enumerate(points):
73
- pts = np.array(quad, dtype=np.float32).reshape(-1, 2)
74
- x1, y1 = np.min(pts[:, 0]), np.min(pts[:, 1])
75
- x2, y2 = np.max(pts[:, 0]), np.max(pts[:, 1])
76
  payload = decoded_list[i] if i < len(decoded_list) else ""
77
- results.append({
78
- "bbox": [float(x1), float(y1), float(x2), float(y2)],
79
- "data": payload,
80
- "points": pts.tolist()
81
- })
82
  return results
83
 
 
84
  def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
85
- """
86
- Detect cell phones with YOLO. Returns list of [x1,y1,x2,y2].
87
- """
88
  model = load_yolo()
89
  if model is None:
90
  return []
91
- # YOLO expects RGB image; ultralytics handles numpy arrays
92
  results = model.predict(source=image_np, conf=conf, verbose=False)
93
  bboxes = []
94
  for r in results:
95
- # handle cases where no boxes are returned
96
- if getattr(r, "boxes", None) is None:
97
- continue
98
- # collect cell phone boxes (COCO class 67)
99
- try:
100
- for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
101
- if int(cls) == 67:
102
- bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
103
- except Exception:
104
- # Fail-open if structure is different
105
- continue
106
  return bboxes
107
 
108
- def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_boxes: List[List[float]], flags: Dict[int, List[str]]) -> Image.Image:
109
- img = pil_img.copy().convert("RGB")
110
- draw = ImageDraw.Draw(img)
111
- # Try to load a default font
112
- try:
113
- font = ImageFont.load_default()
114
- except Exception:
115
- font = None
116
-
117
- # Draw phone boxes
118
- for pb in phone_boxes:
119
- draw.rectangle(pb, outline=(255, 165, 0), width=3) # orange
120
- draw.text((pb[0], pb[1] - 12), "PHONE", fill=(255, 165, 0), font=font)
121
-
122
- # Draw QR boxes
123
- for i, qr in enumerate(qr_boxes):
124
- color = (0, 255, 0) # green
125
- if i in flags and any("UNAPPROVED" in f or "ON_PHONE" in f for f in flags[i]):
126
- color = (255, 0, 0) # red for anomaly
127
- draw.rectangle(qr["bbox"], outline=color, width=3)
128
- label = "QR"
129
- if qr.get("data"):
130
- snippet = qr["data"][:32].replace("\n", " ")
131
- label += f": {snippet}"
132
- draw.text((qr["bbox"][0], qr["bbox"][1] - 12), label, fill=color, font=font)
133
-
134
- # Add flags text
135
- for i, msgs in flags.items():
136
- if not msgs:
137
- continue
138
- x1, y1, x2, y2 = qr_boxes[i]["bbox"]
139
- y_text = y2 + 4
140
- for msg in msgs:
141
- draw.text((x1, y_text), f"[{msg}]", fill=(255, 0, 0), font=font)
142
- y_text += 12
143
- return img
144
-
145
- def unpack_zip(uploaded_file, workdir):
146
- zf = zipfile.ZipFile(uploaded_file)
147
- out_paths = []
148
- for name in zf.namelist():
149
- if name.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".webp")):
150
- p = os.path.join(workdir, os.path.basename(name))
151
- with open(p, "wb") as f:
152
- f.write(zf.read(name))
153
- out_paths.append(p)
154
- return out_paths
155
-
156
- def read_approved_list(file) -> List[str]:
157
- """
158
- Accepts CSV or TXT. One payload per line or in a 'payload' column.
159
- Payloads can be full strings or partial substrings to match.
160
- """
161
- name = file.name.lower()
162
- try:
163
- if name.endswith(".csv"):
164
- df = pd.read_csv(file)
165
- if "payload" in df.columns:
166
- vals = df["payload"].dropna().astype(str).tolist()
167
- else:
168
- # take first column
169
- vals = df.iloc[:, 0].dropna().astype(str).tolist()
170
- else:
171
- # plain text
172
- content = file.read().decode("utf-8", errors="ignore")
173
- vals = [line.strip() for line in content.splitlines() if line.strip()]
174
- # Normalize
175
- return [v.strip() for v in vals if v.strip()]
176
-
177
- except Exception as e:
178
- st.error(f"Failed to parse approved list: {e}")
179
- return []
180
 
181
  def match_payload(payload: str, approved: List[str]) -> bool:
182
- """
183
- Return True if payload matches an approved entry.
184
- We allow substring match either way to account for embedded metadata/UTMs.
185
- """
186
  if not payload:
187
  return False
188
- p = payload.strip()
189
  for a in approved:
190
- if a in p or p in a:
 
191
  return True
192
  return False
193
 
194
- # -------------------- UI --------------------
195
- st.set_page_config(page_title="QR Code Anomaly Scanner", page_icon="πŸ•΅", layout="wide")
196
- st.title("πŸ•΅ QR Code Anomaly Scanner (Retail Store 360Β° CCTV Frames)")
197
-
198
- st.markdown("""
199
- Upload a set of frame images (multiple files **or** a ZIP), plus the approved QR list (CSV/TXT).
200
- The app will:
201
- - Detect and decode QR codes in each frame.
202
- - Detect **cell phones** via YOLO to infer if a QR is shown on a phone.
203
- - Flag anomalies:
204
- - **UNAPPROVED_QR**: decoded payload not in the approved list.
205
- - **ON_PHONE**: QR bounding box overlaps a detected phone.
206
- - **UNDECODED_QR**: QR detected but not decodable (could be suspicious/obstructed).
207
- - Download the annotated images and a consolidated CSV report at the end.
208
- """)
209
 
210
- with st.sidebar:
211
- st.header("Inputs")
212
- approved_file = st.file_uploader("Approved QR List (CSV/TXT)", type=["csv", "txt"])
213
- frames = st.file_uploader("Frames (images) β€” select multiple", type=["jpg", "jpeg", "png", "bmp", "webp"], accept_multiple_files=True)
214
- frames_zip = st.file_uploader("Or upload a ZIP of frames", type=["zip"])
215
- run_phone_detection = st.checkbox("Detect phones (YOLO)", value=True)
216
- phone_conf = st.slider("Phone detection confidence", 0.1, 0.8, 0.25, 0.05)
217
- iou_threshold = st.slider("QR–Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05)
218
- process_btn = st.button("Run Scan", use_container_width=True)
219
 
220
- workdir = tempfile.mkdtemp()
 
 
 
 
 
 
 
 
 
221
 
222
  if process_btn:
223
  if not approved_file:
224
  st.error("Please upload the Approved QR List first.")
225
  st.stop()
226
 
227
- # Read approved list
228
- approved_list = read_approved_list(approved_file)
229
- if not approved_list:
230
- st.warning("Approved list is empty or failed to parse. All decoded QR payloads will be treated as UNAPPROVED.")
231
- else:
232
- st.success(f"Loaded {len(approved_list)} approved entries.")
233
-
234
- # Gather images
235
- img_paths = []
236
- # Save multi-file uploads
237
- for f in frames or []:
238
- out = os.path.join(workdir, f.name)
239
- with open(out, "wb") as g:
240
- g.write(f.read())
241
- img_paths.append(out)
242
- # Or unpack ZIP
243
- if frames_zip is not None:
244
- img_paths.extend(unpack_zip(frames_zip, workdir))
245
-
246
- img_paths = sorted(set(img_paths))
247
- if not img_paths:
248
- st.error("Please upload at least one frame image (or a ZIP).")
249
- st.stop()
250
-
251
- if run_phone_detection:
252
- load_yolo() # try to initialize early to show warnings
253
 
254
  rows = []
255
- annotated_dir = os.path.join(workdir, "annotated")
256
- os.makedirs(annotated_dir, exist_ok=True)
257
 
258
- progress = st.progress(0.0)
259
- status = st.empty()
260
-
261
- for idx, path in enumerate(img_paths):
262
- status.text(f"Processing {os.path.basename(path)} ({idx + 1}/{len(img_paths)})")
263
- pil = Image.open(path).convert("RGB")
264
  np_img = np.array(pil)
265
 
266
  qr_results = detect_qr_opencv(np_img)
267
  phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
268
 
269
  flags = {}
 
 
 
 
 
 
 
 
 
 
270
  for i, qr in enumerate(qr_results):
271
  msgs = []
272
  payload = qr.get("data", "")
273
-
274
  if not payload:
275
  msgs.append("UNDECODED_QR")
276
  elif not match_payload(payload, approved_list):
277
  msgs.append("UNAPPROVED_QR")
278
-
279
- # Check overlap with phones
280
  if phone_boxes:
281
  qb = qr["bbox"]
282
  for pb in phone_boxes:
283
  if iou(qb, pb) >= iou_threshold:
284
  msgs.append("ON_PHONE")
285
  break
286
-
 
 
 
 
 
287
  flags[i] = msgs
288
 
289
- # Append a row
290
- rows.append({
291
- "frame": os.path.basename(path),
292
- "qr_index": i,
293
- "payload": payload,
294
- "approved_match": (payload and match_payload(payload, approved_list)),
295
- "on_phone": ("ON_PHONE" in msgs),
296
- "undecoded": ("UNDECODED_QR" in msgs),
297
- "anomalies": "|".join(msgs) if msgs else "",
298
- "qr_bbox": qr["bbox"],
299
- "phone_boxes": phone_boxes
300
- })
301
-
302
- # If no QR detected, still log the frame
303
- if not qr_results:
304
- rows.append({
305
- "frame": os.path.basename(path),
306
- "qr_index": -1,
307
- "payload": "",
308
- "approved_match": False,
309
- "on_phone": False,
310
- "undecoded": False,
311
- "anomalies": "NO_QR_FOUND",
312
- "qr_bbox": None,
313
- "phone_boxes": phone_boxes
314
- })
315
-
316
- annotated = annotate_image(pil, qr_results, phone_boxes, flags)
317
- out_path = os.path.join(annotated_dir, os.path.basename(path))
318
- annotated.save(out_path)
319
-
320
- progress.progress((idx + 1) / len(img_paths))
321
-
322
- status.text("Completed.")
323
 
324
  df = pd.DataFrame(rows)
325
- st.subheader("Results")
326
- st.dataframe(df, use_container_width=True)
327
-
328
- # Summary
329
- st.markdown("### Summary")
330
- total_frames = len(img_paths)
331
- total_qr = int((df["qr_index"] >= 0).sum())
332
- unapproved = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
333
- on_phone = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
334
- undecoded = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
335
- no_qr = int((df["anomalies"] == "NO_QR_FOUND").sum())
336
- st.write({
337
- "frames_processed": total_frames,
338
- "qr_detections": total_qr,
339
- "unapproved_qr": unapproved,
340
- "qr_on_phone": on_phone,
341
- "undecoded_qr": undecoded,
342
- "frames_with_no_qr": no_qr
343
- })
344
-
345
- # --- Dashboard Cards (Microsoft-style colored cards) ---
346
  st.markdown("### πŸ“Š Compliance Dashboard")
 
 
347
 
348
- # minimal CSS injected once for card styling
349
- card_css = """
350
- <style>
351
- .dashboard-grid { display: grid; grid-template-columns: repeat(3, 1fr); gap: 12px; }
352
- .card {
353
- border-radius: 12px;
354
- border: 2px solid rgba(0,0,0,0.12);
355
- padding: 16px;
356
- background: #fff;
357
- box-shadow: 0 2px 6px rgba(0,0,0,0.06);
358
- text-align: center;
359
- }
360
- .card .label { font-size: 14px; font-weight: 600; opacity: 0.8; margin-bottom: 6px; }
361
- .card .value { font-size: 28px; font-weight: 800; }
362
- .card.red { border-color: #B71C1C; background: #FFEBEE; }
363
- .card.orange { border-color: #EF6C00; background: #FFF3E0; }
364
- .card.purple { border-color: #6A1B9A; background: #F3E5F5; }
365
- .card.blue { border-color: #01579B; background: #E1F5FE; }
366
- .card.green { border-color: #1B5E20; background: #E8F5E9; }
367
- .card.gray { border-color: #263238; background: #ECEFF1; }
368
- </style>
369
- """
370
- st.markdown(card_css, unsafe_allow_html=True)
371
-
372
- # compute counts from the current df (keeping existing anomaly labels)
373
  unapproved_count = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
374
- on_phone_count = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
375
- tampering_count = int((df["anomalies"].str.contains("TAMPERING", na=False)).sum()) # likely 0 if not used
376
- roi_count = int((df["anomalies"].str.contains("OUTSIDE_ROI", na=False)).sum()) # likely 0 if not used
377
- absence_count = int((df["anomalies"].str.contains("NO_QR_FOUND", na=False)).sum())
378
- undecoded_count = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
379
-
380
- cards_html = f"""
381
- <div class="dashboard-grid">
382
- <div class="card red">
383
- <div class="label">❌ Unauthorized QRs</div>
384
- <div class="value">{unapproved_count}</div>
385
- </div>
386
- <div class="card orange">
387
- <div class="label">πŸ“± On Phone</div>
388
- <div class="value">{on_phone_count}</div>
389
- </div>
390
- <div class="card purple">
391
- <div class="label">⚠️ Tampered</div>
392
- <div class="value">{tampering_count}</div>
393
- </div>
394
- <div class="card blue">
395
- <div class="label">🚫 Outside ROI</div>
396
- <div class="value">{roi_count}</div>
397
- </div>
398
- <div class="card green">
399
- <div class="label">⏳ QR Missing</div>
400
- <div class="value">{absence_count}</div>
401
- </div>
402
- <div class="card gray">
403
- <div class="label">πŸ” Undecoded</div>
404
- <div class="value">{undecoded_count}</div>
405
- </div>
406
- </div>
407
- """
408
- st.markdown(cards_html, unsafe_allow_html=True)
409
- # --- end Dashboard Cards ---
410
-
411
- # Downloads: CSV + ZIP of annotated images
412
- csv_bytes = df.to_csv(index=False).encode("utf-8")
413
- st.download_button("⬇️ Download CSV Report", data=csv_bytes, file_name="qr_anomaly_report.csv", mime="text/csv")
414
-
415
- # Create ZIP of annotations
416
- mem = io.BytesIO()
417
- with zipfile.ZipFile(mem, mode="w", compression=zipfile.ZIP_DEFLATED) as z:
418
- for fname in sorted(os.listdir(annotated_dir)):
419
- z.write(os.path.join(annotated_dir, fname), arcname=fname)
420
- mem.seek(0)
421
- st.download_button(
422
- "⬇️ Download Annotated Images (ZIP)",
423
- data=mem.getvalue(),
424
- file_name="annotated_frames.zip",
425
- mime="application/zip"
426
- )
427
-
428
- else:
429
- st.info("Upload inputs on the left and click **Run Scan** to begin.")
430
- st.markdown("""
431
- **Tips**
432
- - Your approved list can be **TXT** (one payload per line) or **CSV** (use a `payload` column or the first column).
433
- - For mobile QR misuse detection, keep **Detect phones (YOLO)** enabled.
434
- - Name frames with timestamps if you want to correlate events later.
435
- """)
 
 
1
  import io
2
  import os
3
  import zipfile
4
  import tempfile
5
+ import time
6
+ from typing import List, Dict, Any
7
 
8
  import streamlit as st
9
  import numpy as np
 
14
  # Optional: YOLO for phone detection
15
  YOLO_MODEL = None
16
 
17
+ # ROI Zones (x1, y1, x2, y2)
18
+ ROI_ZONES = [(100, 100, 400, 400)] # example, configurable later
19
+ ABSENCE_THRESHOLD_SEC = 15
20
+ ALERT_LOG_FILE = "alerts.json"
21
+
22
+ # --- Alert persistence ---
23
+ def load_alerts():
24
+ if os.path.exists(ALERT_LOG_FILE):
25
+ return pd.read_json(ALERT_LOG_FILE).to_dict(orient="records")
26
+ return []
27
+
28
+ def save_alert(alert: Dict[str, Any]):
29
+ alerts = load_alerts()
30
+ alerts.append(alert)
31
+ pd.DataFrame(alerts).to_json(ALERT_LOG_FILE, orient="records", indent=2)
32
+
33
+ def log_alert(message, frame_name):
34
+ alert = {"time": time.ctime(), "alert": message, "frame": frame_name}
35
+ save_alert(alert)
36
+
37
+ # --- YOLO Loader ---
38
  def load_yolo():
39
  global YOLO_MODEL
40
  if YOLO_MODEL is None:
41
  try:
42
  from ultralytics import YOLO
43
+ YOLO_MODEL = YOLO('yolov8n.pt')
 
44
  except Exception as e:
45
  st.warning(f"YOLO model could not be loaded: {e}")
46
  YOLO_MODEL = None
47
  return YOLO_MODEL
48
 
49
  def iou(boxA, boxB) -> float:
 
50
  xA = max(boxA[0], boxB[0])
51
  yA = max(boxA[1], boxB[1])
52
  xB = min(boxA[2], boxB[2])
 
59
  denom = areaA + areaB - interArea + 1e-6
60
  return interArea / denom
61
 
62
+ # --- QR Detection ---
63
  def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
 
 
 
 
64
  det = cv2.QRCodeDetector()
65
+ retval, data_list, points, _ = det.detectAndDecodeMulti(image_np)
66
  results = []
 
67
  if points is None:
 
68
  data_single, points_single, _ = det.detectAndDecode(image_np)
69
  if points_single is not None and data_single:
70
  pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
71
+ x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
72
+ x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
73
+ results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": data_single,"points": pts.tolist()})
 
 
 
 
74
  return results
75
+ decoded_list = data_list if isinstance(data_list, (list, tuple)) else [data_list] * len(points)
 
 
 
 
 
 
76
  for i, quad in enumerate(points):
77
+ pts = np.array(quad, dtype=np.float32).reshape(-1,2)
78
+ x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
79
+ x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
80
  payload = decoded_list[i] if i < len(decoded_list) else ""
81
+ results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": payload,"points": pts.tolist()})
 
 
 
 
82
  return results
83
 
84
+ # --- Phone Detection ---
85
  def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
 
 
 
86
  model = load_yolo()
87
  if model is None:
88
  return []
 
89
  results = model.predict(source=image_np, conf=conf, verbose=False)
90
  bboxes = []
91
  for r in results:
92
+ for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
93
+ if int(cls) == 67: # COCO: "cell phone"
94
+ bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
 
 
 
 
 
 
 
 
95
  return bboxes
96
 
97
+ # --- Tampering detection ---
98
+ def detect_tampering(image_np: np.ndarray, bbox: List[float]) -> bool:
99
+ x1, y1, x2, y2 = map(int, bbox)
100
+ roi = image_np[y1:y2, x1:x2]
101
+ if roi.size == 0:
102
+ return False
103
+ gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
104
+ edges = cv2.Canny(gray, 100, 200)
105
+ edge_density = np.sum(edges > 0) / (roi.shape[0] * roi.shape[1])
106
+ return edge_density < 0.01
107
+
108
+ # --- Payload normalization & UPI parsing ---
109
+ from urllib.parse import urlparse, parse_qs
110
+
111
+ def normalize_payload(payload: str) -> str:
112
+ if not payload:
113
+ return ""
114
+ p = payload.strip().lower()
115
+ if p.startswith("upi://"):
116
+ try:
117
+ parsed = urlparse(p)
118
+ qs = parse_qs(parsed.query)
119
+ if "pa" in qs:
120
+ return qs["pa"][0].strip().lower()
121
+ except Exception:
122
+ pass
123
+ if "pa=" in p:
124
+ try:
125
+ part = p.split("pa=")[1].split("&")[0]
126
+ return part.strip().lower()
127
+ except Exception:
128
+ pass
129
+ return p
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
130
 
131
  def match_payload(payload: str, approved: List[str]) -> bool:
 
 
 
 
132
  if not payload:
133
  return False
134
+ norm_payload = normalize_payload(payload)
135
  for a in approved:
136
+ norm_a = normalize_payload(a)
137
+ if norm_payload == norm_a:
138
  return True
139
  return False
140
 
141
+ # --- ROI Helper ---
142
+ def inside_roi(bbox, roi):
143
+ x1, y1, x2, y2 = bbox
144
+ rx1, ry1, rx2, ry2 = roi
145
+ return (x1 >= rx1 and y1 >= ry1 and x2 <= rx2 and y2 <= ry2)
 
 
 
 
 
 
 
 
 
 
146
 
147
+ # --- UI ---
148
+ st.set_page_config(page_title="QR Code Anomaly Scanner", page_icon="πŸ•΅οΈ", layout="wide")
 
 
 
 
 
 
 
149
 
150
+ st.title("πŸ•΅οΈ QR Code Anomaly Scanner with Extended Compliance")
151
+
152
+ with st.sidebar:
153
+ st.header("βš™οΈ Inputs")
154
+ approved_file = st.file_uploader("πŸ“‘ Approved QR List (CSV/TXT)", type=["csv","txt"])
155
+ frames = st.file_uploader("πŸ–ΌοΈ Frames (images)", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True)
156
+ run_phone_detection = st.checkbox("πŸ“± Detect phones (YOLO)", value=True)
157
+ phone_conf = st.slider("πŸ“ Phone detection confidence", 0.1, 0.8, 0.25, 0.05)
158
+ iou_threshold = st.slider("🎯 QR–Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05)
159
+ process_btn = st.button("πŸš€ Run Scan", use_container_width=True)
160
 
161
  if process_btn:
162
  if not approved_file:
163
  st.error("Please upload the Approved QR List first.")
164
  st.stop()
165
 
166
+ approved_list = pd.read_csv(approved_file).iloc[:,0].astype(str).tolist() if approved_file.name.endswith(".csv") else approved_file.read().decode().splitlines()
167
+ st.success(f"βœ… Loaded {len(approved_list)} approved entries.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
168
 
169
  rows = []
170
+ absence_counter = 0
 
171
 
172
+ for f in frames or []:
173
+ pil = Image.open(f).convert("RGB")
 
 
 
 
174
  np_img = np.array(pil)
175
 
176
  qr_results = detect_qr_opencv(np_img)
177
  phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
178
 
179
  flags = {}
180
+ if len(qr_results) > 1:
181
+ log_alert("Multiple QRs detected", f.name)
182
+
183
+ if not qr_results:
184
+ absence_counter += 1
185
+ if absence_counter * 1 > ABSENCE_THRESHOLD_SEC: # assuming 1s per frame approx
186
+ log_alert("QR absent for threshold duration", f.name)
187
+ else:
188
+ absence_counter = 0
189
+
190
  for i, qr in enumerate(qr_results):
191
  msgs = []
192
  payload = qr.get("data", "")
 
193
  if not payload:
194
  msgs.append("UNDECODED_QR")
195
  elif not match_payload(payload, approved_list):
196
  msgs.append("UNAPPROVED_QR")
 
 
197
  if phone_boxes:
198
  qb = qr["bbox"]
199
  for pb in phone_boxes:
200
  if iou(qb, pb) >= iou_threshold:
201
  msgs.append("ON_PHONE")
202
  break
203
+ if not any(inside_roi(qr["bbox"], roi) for roi in ROI_ZONES):
204
+ msgs.append("OUTSIDE_ROI")
205
+ if detect_tampering(np_img, qr["bbox"]):
206
+ msgs.append("TAMPERING")
207
+ if msgs:
208
+ log_alert("|".join(msgs), f.name)
209
  flags[i] = msgs
210
 
211
+ rows.append({"frame": f.name,"qr_index": i,"payload": payload,"approved_match": (payload and match_payload(payload, approved_list)),"anomalies": "|".join(msgs) if msgs else ""})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
212
 
213
  df = pd.DataFrame(rows)
214
+ st.dataframe(df)
215
+
216
+ # --- Dashboard Cards ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
217
  st.markdown("### πŸ“Š Compliance Dashboard")
218
+ col1, col2, col3 = st.columns(3)
219
+ col4, col5, col6 = st.columns(3)
220
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
221
  unapproved_count = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
222
+ on_phone_count = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
223
+ tampering_count = int((df["anomalies"].str.contains("TAMPERING", na=False)).sum())
224
+ roi_count = int((df["anomalies"].str.contains("OUTSIDE_ROI", na=False)).sum())
225
+ absence_count = int((df["anomalies"].str.contains("ABSENCE", na=False)).sum())
226
+ undecoded_count = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
227
+
228
+ col1.metric("❌ Unauthorized QRs", unapproved_count)
229
+ col2.metric("πŸ“± On Phone", on_phone_count)
230
+ col3.metric("⚠️ Tampered", tampering_count)
231
+
232
+ col4.metric("🚫 Outside ROI", roi_count)
233
+ col5.metric("⏳ QR Missing", absence_count)
234
+ col6.metric("πŸ” Undecoded", undecoded_count)
235
+
236
+ # --- Alert History ---
237
+ st.subheader("πŸ“œ Alert History")
238
+ for a in load_alerts()[-10:][::-1]:
239
+ st.write(f"{a['time']} [{a['frame']}] β†’ {a['alert']}")