SuriRaja commited on
Commit
52e86cf
·
verified ·
1 Parent(s): dc24cfa

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +52 -324
app.py CHANGED
@@ -1,348 +1,76 @@
1
- import io
2
- import os
3
- import zipfile
4
- import tempfile
5
- from typing import List, Tuple, Dict, Any
6
-
7
- import streamlit as st
8
  import numpy as np
9
  import pandas as pd
10
- from PIL import Image, ImageDraw, ImageFont
11
-
12
- import cv2
13
-
14
- # Optional: YOLO for phone detection
15
- # We load lazily on first use to keep startup fast.
16
- YOLO_MODEL = None
17
-
18
- def load_yolo():
19
- global YOLO_MODEL
20
- if YOLO_MODEL is None:
21
- try:
22
- from ultralytics import YOLO
23
- # Use lightweight pretrained model; supports "cell phone" class via COCO.
24
- YOLO_MODEL = YOLO('yolov8n.pt') # automatically downloads on first run
25
- except Exception as e:
26
- st.warning(f"YOLO model could not be loaded: {e}")
27
- YOLO_MODEL = None
28
- return YOLO_MODEL
29
 
30
- def iou(boxA, boxB) -> float:
31
- # boxes in [x1,y1,x2,y2]
32
- xA = max(boxA[0], boxB[0])
33
- yA = max(boxA[1], boxB[1])
34
- xB = min(boxA[2], boxB[2])
35
- yB = min(boxA[3], boxB[3])
36
- interW = max(0, xB - xA)
37
- interH = max(0, yB - yA)
38
- interArea = interW * interH
39
- areaA = max(0, boxA[2] - boxA[0]) * max(0, boxA[3] - boxA[1])
40
- areaB = max(0, boxB[2] - boxB[0]) * max(0, boxB[3] - boxB[1])
41
- denom = areaA + areaB - interArea + 1e-6
42
- return interArea / denom
 
 
43
 
44
- def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
45
- """
46
- Use OpenCV's QRCodeDetector to find and decode QR codes.
47
- Returns list of dicts: {bbox: [x1,y1,x2,y2], data: str, points: np.ndarray}
48
- """
49
- def detect_qr_opencv(image_np):
50
  det = cv2.QRCodeDetector()
51
-
52
  try:
53
- # For OpenCV >= 4.5.3
54
  retval, decoded_info, points, _ = det.detectAndDecodeMulti(image_np)
55
  if retval:
56
  return decoded_info, points
57
  else:
58
  return [], []
59
  except:
60
- # Fallback to single QR detection
61
  data, points, _ = det.detectAndDecode(image_np)
62
  return [data] if data else [], points
63
 
64
- # points shape: (N,4,2), data is list/tuple of strings (may be '' for undecodeable)
65
- if isinstance(data, (list, tuple)):
66
- decoded_list = data
67
- else:
68
- decoded_list = [data] * len(points)
69
-
70
- for i, quad in enumerate(points):
71
- pts = np.array(quad, dtype=np.float32).reshape(-1,2)
72
- x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
73
- x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
74
- payload = decoded_list[i] if i < len(decoded_list) else ""
75
- results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],
76
- "data": payload,
77
- "points": pts.tolist()})
78
- return results
79
 
80
- def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
81
- """
82
- Detect cell phones with YOLO. Returns list of [x1,y1,x2,y2].
83
- """
84
- model = load_yolo()
85
- if model is None:
86
- return []
87
- # YOLO expects RGB image; ultralytics handles numpy arrays
88
- results = model.predict(source=image_np, conf=conf, verbose=False)
89
- bboxes = []
90
- for r in results:
91
- for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
92
- # COCO: class 67 is "cell phone"
93
- if int(cls) == 67:
94
- bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
95
- return bboxes
96
 
97
- def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_boxes: List[List[float]], flags: Dict[int, List[str]]) -> Image.Image:
98
- img = pil_img.copy().convert("RGB")
99
- draw = ImageDraw.Draw(img)
100
- # Try to load a default font
101
- try:
102
- font = ImageFont.load_default()
103
- except:
104
- font = None
105
-
106
- # Draw phone boxes
107
- for pb in phone_boxes:
108
- draw.rectangle(pb, outline=(255, 165, 0), width=3) # orange
109
- draw.text((pb[0], pb[1]-12), "PHONE", fill=(255,165,0), font=font)
110
 
111
- # Draw QR boxes
112
- for i, qr in enumerate(qr_boxes):
113
- color = (0,255,0) # green
114
- if i in flags and any("UNAPPROVED" in f or "ON_PHONE" in f for f in flags[i]):
115
- color = (255,0,0) # red for anomaly
116
- draw.rectangle(qr["bbox"], outline=color, width=3)
117
- label = "QR"
118
- if qr.get("data"):
119
- snippet = qr["data"][:32].replace("\n"," ")
120
- label += f": {snippet}"
121
- draw.text((qr["bbox"][0], qr["bbox"][1]-12), label, fill=color, font=font)
122
 
123
- # Add flags text
124
- for i, msgs in flags.items():
125
- if not msgs:
126
- continue
127
- x1, y1, x2, y2 = qr_boxes[i]["bbox"]
128
- y_text = y2 + 4
129
- for msg in msgs:
130
- draw.text((x1, y_text), f"[{msg}]", fill=(255,0,0), font=font)
131
- y_text += 12
132
 
133
- return img
 
134
 
135
- def unpack_zip(uploaded_file, workdir):
136
- zf = zipfile.ZipFile(uploaded_file)
137
- out_paths = []
138
- for name in zf.namelist():
139
- if name.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".webp")):
140
- p = os.path.join(workdir, os.path.basename(name))
141
- with open(p, "wb") as f:
142
- f.write(zf.read(name))
143
- out_paths.append(p)
144
- return out_paths
145
 
146
- def read_approved_list(file) -> List[str]:
147
- """
148
- Accepts CSV or TXT. One payload per line or in a 'payload' column.
149
- Payloads can be full strings or partial substrings to match.
150
- """
151
- name = file.name.lower()
152
- try:
153
- if name.endswith(".csv"):
154
- df = pd.read_csv(file)
155
- if "payload" in df.columns:
156
- vals = df["payload"].dropna().astype(str).tolist()
157
  else:
158
- # take first column
159
- vals = df.iloc[:,0].dropna().astype(str).tolist()
160
- else:
161
- # plain text
162
- content = file.read().decode("utf-8", errors="ignore")
163
- vals = [line.strip() for line in content.splitlines() if line.strip()]
164
- # Normalize
165
- return [v.strip() for v in vals if v.strip()]
166
- except Exception as e:
167
- st.error(f"Failed to parse approved list: {e}")
168
- return []
169
-
170
- def match_payload(payload: str, approved: List[str]) -> bool:
171
- """
172
- Return True if payload matches an approved entry.
173
- We allow substring match either way to account for embedded metadata/UTMs.
174
- """
175
- if not payload:
176
- return False
177
- p = payload.strip()
178
- for a in approved:
179
- if a in p or p in a:
180
- return True
181
- return False
182
-
183
- st.set_page_config(page_title="QR Code Anomaly Scanner", layout="wide")
184
-
185
- st.title("🕵️ QR Code Anomaly Scanner (Retail Store 360° CCTV Frames)")
186
-
187
- st.markdown("""
188
- Upload a set of frame images (multiple files **or** a ZIP), plus the approved QR list (CSV/TXT).
189
- The app will:
190
- - Detect and decode QR codes in each frame.
191
- - Detect **cell phones** via YOLO to infer if a QR is shown on a phone.
192
- - Flag anomalies:
193
- - **UNAPPROVED_QR**: decoded payload not in the approved list.
194
- - **ON_PHONE**: QR bounding box overlaps a detected phone.
195
- - **UNDECODED_QR**: QR detected but not decodable (could be suspicious/obstructed).
196
- Download the annotated images and a consolidated CSV report at the end.
197
- """)
198
-
199
- with st.sidebar:
200
- st.header("Inputs")
201
- approved_file = st.file_uploader("Approved QR List (CSV/TXT)", type=["csv","txt"])
202
- frames = st.file_uploader("Frames (images) — select multiple", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True)
203
- frames_zip = st.file_uploader("Or upload a ZIP of frames", type=["zip"])
204
- run_phone_detection = st.checkbox("Detect phones (YOLO)", value=True)
205
- phone_conf = st.slider("Phone detection confidence", 0.1, 0.8, 0.25, 0.05)
206
- iou_threshold = st.slider("QR–Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05)
207
- process_btn = st.button("Run Scan")
208
-
209
- workdir = tempfile.mkdtemp()
210
-
211
- if process_btn:
212
- if not approved_file:
213
- st.error("Please upload the Approved QR List first.")
214
- st.stop()
215
-
216
- approved_list = read_approved_list(approved_file)
217
- if not approved_list:
218
- st.warning("Approved list is empty or failed to parse. All decoded QR payloads will be treated as UNAPPROVED.")
219
  else:
220
- st.success(f"Loaded {len(approved_list)} approved entries.")
221
-
222
- img_paths = []
223
- # Save multi-file uploads
224
- for f in frames or []:
225
- out = os.path.join(workdir, f.name)
226
- with open(out, "wb") as g:
227
- g.write(f.read())
228
- img_paths.append(out)
229
- # Or unpack ZIP
230
- if frames_zip is not None:
231
- img_paths.extend(unpack_zip(frames_zip, workdir))
232
-
233
- img_paths = sorted(set(img_paths))
234
- if not img_paths:
235
- st.error("Please upload at least one frame image (or a ZIP).")
236
- st.stop()
237
-
238
- if run_phone_detection:
239
- load_yolo() # try to initialize early to show warnings
240
-
241
- rows = []
242
- annotated_dir = os.path.join(workdir, "annotated")
243
- os.makedirs(annotated_dir, exist_ok=True)
244
-
245
- progress = st.progress(0.0)
246
- status = st.empty()
247
-
248
- for idx, path in enumerate(img_paths):
249
- status.text(f"Processing {os.path.basename(path)} ({idx+1}/{len(img_paths)})")
250
- pil = Image.open(path).convert("RGB")
251
- np_img = np.array(pil)
252
-
253
- qr_results = detect_qr_opencv(np_img)
254
- phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
255
-
256
- flags = {}
257
- for i, qr in enumerate(qr_results):
258
- msgs = []
259
- payload = qr.get("data", "")
260
- if not payload:
261
- msgs.append("UNDECODED_QR")
262
- elif not match_payload(payload, approved_list):
263
- msgs.append("UNAPPROVED_QR")
264
- # Check overlap with phones
265
- if phone_boxes:
266
- qb = qr["bbox"]
267
- for pb in phone_boxes:
268
- if iou(qb, pb) >= iou_threshold:
269
- msgs.append("ON_PHONE")
270
- break
271
- flags[i] = msgs
272
-
273
- # Append a row
274
- rows.append({
275
- "frame": os.path.basename(path),
276
- "qr_index": i,
277
- "payload": payload,
278
- "approved_match": (payload and match_payload(payload, approved_list)),
279
- "on_phone": ("ON_PHONE" in msgs),
280
- "undecoded": ("UNDECODED_QR" in msgs),
281
- "anomalies": "|".join(msgs) if msgs else "",
282
- "qr_bbox": qr["bbox"],
283
- "phone_boxes": phone_boxes
284
- })
285
-
286
- # If no QR detected, still log the frame
287
- if not qr_results:
288
- rows.append({
289
- "frame": os.path.basename(path),
290
- "qr_index": -1,
291
- "payload": "",
292
- "approved_match": False,
293
- "on_phone": False,
294
- "undecoded": False,
295
- "anomalies": "NO_QR_FOUND",
296
- "qr_bbox": None,
297
- "phone_boxes": phone_boxes
298
- })
299
-
300
- annotated = annotate_image(pil, qr_results, phone_boxes, flags)
301
- out_path = os.path.join(annotated_dir, os.path.basename(path))
302
- annotated.save(out_path)
303
-
304
- progress.progress((idx+1)/len(img_paths))
305
-
306
- status.text("Completed.")
307
- df = pd.DataFrame(rows)
308
-
309
- st.subheader("Results")
310
- st.dataframe(df, use_container_width=True)
311
-
312
- # Summary
313
- st.markdown("### Summary")
314
- total_frames = len(img_paths)
315
- total_qr = int((df["qr_index"] >= 0).sum())
316
- unapproved = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
317
- on_phone = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
318
- undecoded = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
319
- no_qr = int((df["anomalies"] == "NO_QR_FOUND").sum())
320
- st.write({
321
- "frames_processed": total_frames,
322
- "qr_detections": total_qr,
323
- "unapproved_qr": unapproved,
324
- "qr_on_phone": on_phone,
325
- "undecoded_qr": undecoded,
326
- "frames_with_no_qr": no_qr
327
- })
328
-
329
- # Downloads: CSV + ZIP of annotated images
330
- csv_bytes = df.to_csv(index=False).encode("utf-8")
331
- st.download_button("⬇️ Download CSV Report", data=csv_bytes, file_name="qr_anomaly_report.csv", mime="text/csv")
332
-
333
- # Create ZIP
334
- mem = io.BytesIO()
335
- with zipfile.ZipFile(mem, mode="w", compression=zipfile.ZIP_DEFLATED) as z:
336
- for fname in sorted(os.listdir(annotated_dir)):
337
- z.write(os.path.join(annotated_dir, fname), arcname=fname)
338
- mem.seek(0)
339
- st.download_button("⬇️ Download Annotated Images (ZIP)", data=mem.getvalue(), file_name="annotated_frames.zip", mime="application/zip")
340
-
341
- else:
342
- st.info("Upload inputs on the left and click **Run Scan** to begin.")
343
- st.markdown("""
344
- **Tips**
345
- - Your approved list can be **TXT** (one payload per line) or **CSV** (use a `payload` column or the first column).
346
- - For mobile QR misuse detection, keep **Detect phones (YOLO)** enabled.
347
- - Name frames with timestamps if you want to correlate events later.
348
- """)
 
1
+ import cv2
 
 
 
 
 
 
2
  import numpy as np
3
  import pandas as pd
4
+ import streamlit as st
5
+ from PIL import Image
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
 
7
+ # ============================
8
+ # Load Approved QR List
9
+ # ============================
10
+ @st.cache_data
11
+ def load_whitelist(file_path):
12
+ try:
13
+ if file_path.endswith(".csv"):
14
+ df = pd.read_csv(file_path)
15
+ return set(df["QR_Data"].astype(str).tolist())
16
+ elif file_path.endswith(".txt"):
17
+ with open(file_path, "r") as f:
18
+ return set([line.strip().split(":")[-1].strip() for line in f.readlines()])
19
+ except Exception as e:
20
+ st.error(f"Error loading whitelist: {e}")
21
+ return set()
22
 
23
+ # ============================
24
+ # QR Detection Function (fixed)
25
+ # ============================
26
+ def detect_qr_opencv(image_np):
 
 
27
  det = cv2.QRCodeDetector()
 
28
  try:
29
+ # For OpenCV >= 4.5.3 (multi QR support)
30
  retval, decoded_info, points, _ = det.detectAndDecodeMulti(image_np)
31
  if retval:
32
  return decoded_info, points
33
  else:
34
  return [], []
35
  except:
36
+ # Fallback for single QR code detection
37
  data, points, _ = det.detectAndDecode(image_np)
38
  return [data] if data else [], points
39
 
40
+ # ============================
41
+ # Streamlit UI
42
+ # ============================
43
+ st.title("AI-Powered QR Code Surveillance")
 
 
 
 
 
 
 
 
 
 
 
44
 
45
+ st.sidebar.header("Configuration")
46
+ whitelist_file = st.sidebar.file_uploader("Upload Approved QR List (CSV or TXT)", type=["csv", "txt"])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
+ if whitelist_file:
49
+ whitelist = load_whitelist(whitelist_file.name)
50
+ st.sidebar.success("Whitelist loaded successfully ✅")
51
+ else:
52
+ whitelist = set()
53
+ st.sidebar.warning("No whitelist loaded ⚠️")
 
 
 
 
 
 
 
54
 
55
+ uploaded_file = st.file_uploader("Upload an image with QR codes", type=["jpg", "jpeg", "png"])
 
 
 
 
 
 
 
 
 
 
56
 
57
+ if uploaded_file is not None:
58
+ # Convert to OpenCV format
59
+ image = Image.open(uploaded_file).convert("RGB")
60
+ image_np = np.array(image)
 
 
 
 
 
61
 
62
+ # Detect QR Codes
63
+ decoded_data, points_all = detect_qr_opencv(image_np)
64
 
65
+ # Display image
66
+ st.image(image, caption="Uploaded Frame", use_column_width=True)
 
 
 
 
 
 
 
 
67
 
68
+ if decoded_data:
69
+ st.subheader("🔍 Detected QR Codes")
70
+ for qr in decoded_data:
71
+ if qr in whitelist:
72
+ st.success(f"✅ APPROVED: {qr}")
 
 
 
 
 
 
73
  else:
74
+ st.error(f"🚫 UNAUTHORIZED: {qr}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  else:
76
+ st.warning("No QR Codes detected ❌")