SuriRaja commited on
Commit
06f54b5
·
verified ·
1 Parent(s): 52e86cf

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +332 -60
app.py CHANGED
@@ -1,76 +1,348 @@
1
- import cv2
 
 
 
 
 
 
2
  import numpy as np
3
  import pandas as pd
4
- import streamlit as st
5
- from PIL import Image
6
 
7
- # ============================
8
- # Load Approved QR List
9
- # ============================
10
- @st.cache_data
11
- def load_whitelist(file_path):
12
- try:
13
- if file_path.endswith(".csv"):
14
- df = pd.read_csv(file_path)
15
- return set(df["QR_Data"].astype(str).tolist())
16
- elif file_path.endswith(".txt"):
17
- with open(file_path, "r") as f:
18
- return set([line.strip().split(":")[-1].strip() for line in f.readlines()])
19
- except Exception as e:
20
- st.error(f"Error loading whitelist: {e}")
21
- return set()
 
 
22
 
23
- # ============================
24
- # QR Detection Function (fixed)
25
- # ============================
26
- def detect_qr_opencv(image_np):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  det = cv2.QRCodeDetector()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  try:
29
- # For OpenCV >= 4.5.3 (multi QR support)
30
- retval, decoded_info, points, _ = det.detectAndDecodeMulti(image_np)
31
- if retval:
32
- return decoded_info, points
33
- else:
34
- return [], []
35
  except:
36
- # Fallback for single QR code detection
37
- data, points, _ = det.detectAndDecode(image_np)
38
- return [data] if data else [], points
39
 
40
- # ============================
41
- # Streamlit UI
42
- # ============================
43
- st.title("AI-Powered QR Code Surveillance")
44
 
45
- st.sidebar.header("Configuration")
46
- whitelist_file = st.sidebar.file_uploader("Upload Approved QR List (CSV or TXT)", type=["csv", "txt"])
 
 
 
 
 
 
 
 
 
47
 
48
- if whitelist_file:
49
- whitelist = load_whitelist(whitelist_file.name)
50
- st.sidebar.success("Whitelist loaded successfully ✅")
51
- else:
52
- whitelist = set()
53
- st.sidebar.warning("No whitelist loaded ⚠️")
 
 
 
 
 
54
 
55
- uploaded_file = st.file_uploader("Upload an image with QR codes", type=["jpg", "jpeg", "png"])
 
 
 
 
 
 
 
 
 
56
 
57
- if uploaded_file is not None:
58
- # Convert to OpenCV format
59
- image = Image.open(uploaded_file).convert("RGB")
60
- image_np = np.array(image)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
 
62
- # Detect QR Codes
63
- decoded_data, points_all = detect_qr_opencv(image_np)
 
 
 
 
 
 
 
 
 
 
64
 
65
- # Display image
66
- st.image(image, caption="Uploaded Frame", use_column_width=True)
67
 
68
- if decoded_data:
69
- st.subheader("🔍 Detected QR Codes")
70
- for qr in decoded_data:
71
- if qr in whitelist:
72
- st.success(f"✅ APPROVED: {qr}")
73
- else:
74
- st.error(f"🚫 UNAUTHORIZED: {qr}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  else:
76
- st.warning("No QR Codes detected ❌")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import io
2
+ import os
3
+ import zipfile
4
+ import tempfile
5
+ from typing import List, Tuple, Dict, Any
6
+
7
+ import streamlit as st
8
  import numpy as np
9
  import pandas as pd
10
+ from PIL import Image, ImageDraw, ImageFont
 
11
 
12
+ import cv2
13
+
14
+ # Optional: YOLO for phone detection
15
+ # We load lazily on first use to keep startup fast.
16
+ YOLO_MODEL = None
17
+
18
+ def load_yolo():
19
+ global YOLO_MODEL
20
+ if YOLO_MODEL is None:
21
+ try:
22
+ from ultralytics import YOLO
23
+ # Use lightweight pretrained model; supports "cell phone" class via COCO.
24
+ YOLO_MODEL = YOLO('yolov8n.pt') # automatically downloads on first run
25
+ except Exception as e:
26
+ st.warning(f"YOLO model could not be loaded: {e}")
27
+ YOLO_MODEL = None
28
+ return YOLO_MODEL
29
 
30
+ def iou(boxA, boxB) -> float:
31
+ # boxes in [x1,y1,x2,y2]
32
+ xA = max(boxA[0], boxB[0])
33
+ yA = max(boxA[1], boxB[1])
34
+ xB = min(boxA[2], boxB[2])
35
+ yB = min(boxA[3], boxB[3])
36
+ interW = max(0, xB - xA)
37
+ interH = max(0, yB - yA)
38
+ interArea = interW * interH
39
+ areaA = max(0, boxA[2] - boxA[0]) * max(0, boxA[3] - boxA[1])
40
+ areaB = max(0, boxB[2] - boxB[0]) * max(0, boxB[3] - boxB[1])
41
+ denom = areaA + areaB - interArea + 1e-6
42
+ return interArea / denom
43
+
44
+ def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
45
+ """
46
+ Use OpenCV's QRCodeDetector to find and decode QR codes.
47
+ Returns list of dicts: {bbox: [x1,y1,x2,y2], data: str, points: np.ndarray}
48
+ """
49
  det = cv2.QRCodeDetector()
50
+ data, points, _ = det.detectAndDecodeMulti(image_np)
51
+ results = []
52
+ if points is None:
53
+ # Try single QR fallback
54
+ data_single, points_single, _ = det.detectAndDecode(image_np)
55
+ if points_single is not None and data_single:
56
+ pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
57
+ x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
58
+ x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
59
+ results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],
60
+ "data": data_single,
61
+ "points": pts.tolist()})
62
+ return results
63
+
64
+ # points shape: (N,4,2), data is list/tuple of strings (may be '' for undecodeable)
65
+ if isinstance(data, (list, tuple)):
66
+ decoded_list = data
67
+ else:
68
+ decoded_list = [data] * len(points)
69
+
70
+ for i, quad in enumerate(points):
71
+ pts = np.array(quad, dtype=np.float32).reshape(-1,2)
72
+ x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
73
+ x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
74
+ payload = decoded_list[i] if i < len(decoded_list) else ""
75
+ results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],
76
+ "data": payload,
77
+ "points": pts.tolist()})
78
+ return results
79
+
80
+ def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
81
+ """
82
+ Detect cell phones with YOLO. Returns list of [x1,y1,x2,y2].
83
+ """
84
+ model = load_yolo()
85
+ if model is None:
86
+ return []
87
+ # YOLO expects RGB image; ultralytics handles numpy arrays
88
+ results = model.predict(source=image_np, conf=conf, verbose=False)
89
+ bboxes = []
90
+ for r in results:
91
+ for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
92
+ # COCO: class 67 is "cell phone"
93
+ if int(cls) == 67:
94
+ bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
95
+ return bboxes
96
+
97
+ def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_boxes: List[List[float]], flags: Dict[int, List[str]]) -> Image.Image:
98
+ img = pil_img.copy().convert("RGB")
99
+ draw = ImageDraw.Draw(img)
100
+ # Try to load a default font
101
  try:
102
+ font = ImageFont.load_default()
 
 
 
 
 
103
  except:
104
+ font = None
 
 
105
 
106
+ # Draw phone boxes
107
+ for pb in phone_boxes:
108
+ draw.rectangle(pb, outline=(255, 165, 0), width=3) # orange
109
+ draw.text((pb[0], pb[1]-12), "PHONE", fill=(255,165,0), font=font)
110
 
111
+ # Draw QR boxes
112
+ for i, qr in enumerate(qr_boxes):
113
+ color = (0,255,0) # green
114
+ if i in flags and any("UNAPPROVED" in f or "ON_PHONE" in f for f in flags[i]):
115
+ color = (255,0,0) # red for anomaly
116
+ draw.rectangle(qr["bbox"], outline=color, width=3)
117
+ label = "QR"
118
+ if qr.get("data"):
119
+ snippet = qr["data"][:32].replace("\n"," ")
120
+ label += f": {snippet}"
121
+ draw.text((qr["bbox"][0], qr["bbox"][1]-12), label, fill=color, font=font)
122
 
123
+ # Add flags text
124
+ for i, msgs in flags.items():
125
+ if not msgs:
126
+ continue
127
+ x1, y1, x2, y2 = qr_boxes[i]["bbox"]
128
+ y_text = y2 + 4
129
+ for msg in msgs:
130
+ draw.text((x1, y_text), f"[{msg}]", fill=(255,0,0), font=font)
131
+ y_text += 12
132
+
133
+ return img
134
 
135
+ def unpack_zip(uploaded_file, workdir):
136
+ zf = zipfile.ZipFile(uploaded_file)
137
+ out_paths = []
138
+ for name in zf.namelist():
139
+ if name.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".webp")):
140
+ p = os.path.join(workdir, os.path.basename(name))
141
+ with open(p, "wb") as f:
142
+ f.write(zf.read(name))
143
+ out_paths.append(p)
144
+ return out_paths
145
 
146
+ def read_approved_list(file) -> List[str]:
147
+ """
148
+ Accepts CSV or TXT. One payload per line or in a 'payload' column.
149
+ Payloads can be full strings or partial substrings to match.
150
+ """
151
+ name = file.name.lower()
152
+ try:
153
+ if name.endswith(".csv"):
154
+ df = pd.read_csv(file)
155
+ if "payload" in df.columns:
156
+ vals = df["payload"].dropna().astype(str).tolist()
157
+ else:
158
+ # take first column
159
+ vals = df.iloc[:,0].dropna().astype(str).tolist()
160
+ else:
161
+ # plain text
162
+ content = file.read().decode("utf-8", errors="ignore")
163
+ vals = [line.strip() for line in content.splitlines() if line.strip()]
164
+ # Normalize
165
+ return [v.strip() for v in vals if v.strip()]
166
+ except Exception as e:
167
+ st.error(f"Failed to parse approved list: {e}")
168
+ return []
169
 
170
+ def match_payload(payload: str, approved: List[str]) -> bool:
171
+ """
172
+ Return True if payload matches an approved entry.
173
+ We allow substring match either way to account for embedded metadata/UTMs.
174
+ """
175
+ if not payload:
176
+ return False
177
+ p = payload.strip()
178
+ for a in approved:
179
+ if a in p or p in a:
180
+ return True
181
+ return False
182
 
183
+ st.set_page_config(page_title="QR Code Anomaly Scanner", layout="wide")
 
184
 
185
+ st.title("🕵️ QR Code Anomaly Scanner (Retail Store 360° CCTV Frames)")
186
+
187
+ st.markdown("""
188
+ Upload a set of frame images (multiple files **or** a ZIP), plus the approved QR list (CSV/TXT).
189
+ The app will:
190
+ - Detect and decode QR codes in each frame.
191
+ - Detect **cell phones** via YOLO to infer if a QR is shown on a phone.
192
+ - Flag anomalies:
193
+ - **UNAPPROVED_QR**: decoded payload not in the approved list.
194
+ - **ON_PHONE**: QR bounding box overlaps a detected phone.
195
+ - **UNDECODED_QR**: QR detected but not decodable (could be suspicious/obstructed).
196
+ Download the annotated images and a consolidated CSV report at the end.
197
+ """)
198
+
199
+ with st.sidebar:
200
+ st.header("Inputs")
201
+ approved_file = st.file_uploader("Approved QR List (CSV/TXT)", type=["csv","txt"])
202
+ frames = st.file_uploader("Frames (images) — select multiple", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True)
203
+ frames_zip = st.file_uploader("Or upload a ZIP of frames", type=["zip"])
204
+ run_phone_detection = st.checkbox("Detect phones (YOLO)", value=True)
205
+ phone_conf = st.slider("Phone detection confidence", 0.1, 0.8, 0.25, 0.05)
206
+ iou_threshold = st.slider("QR–Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05)
207
+ process_btn = st.button("Run Scan")
208
+
209
+ workdir = tempfile.mkdtemp()
210
+
211
+ if process_btn:
212
+ if not approved_file:
213
+ st.error("Please upload the Approved QR List first.")
214
+ st.stop()
215
+
216
+ approved_list = read_approved_list(approved_file)
217
+ if not approved_list:
218
+ st.warning("Approved list is empty or failed to parse. All decoded QR payloads will be treated as UNAPPROVED.")
219
  else:
220
+ st.success(f"Loaded {len(approved_list)} approved entries.")
221
+
222
+ img_paths = []
223
+ # Save multi-file uploads
224
+ for f in frames or []:
225
+ out = os.path.join(workdir, f.name)
226
+ with open(out, "wb") as g:
227
+ g.write(f.read())
228
+ img_paths.append(out)
229
+ # Or unpack ZIP
230
+ if frames_zip is not None:
231
+ img_paths.extend(unpack_zip(frames_zip, workdir))
232
+
233
+ img_paths = sorted(set(img_paths))
234
+ if not img_paths:
235
+ st.error("Please upload at least one frame image (or a ZIP).")
236
+ st.stop()
237
+
238
+ if run_phone_detection:
239
+ load_yolo() # try to initialize early to show warnings
240
+
241
+ rows = []
242
+ annotated_dir = os.path.join(workdir, "annotated")
243
+ os.makedirs(annotated_dir, exist_ok=True)
244
+
245
+ progress = st.progress(0.0)
246
+ status = st.empty()
247
+
248
+ for idx, path in enumerate(img_paths):
249
+ status.text(f"Processing {os.path.basename(path)} ({idx+1}/{len(img_paths)})")
250
+ pil = Image.open(path).convert("RGB")
251
+ np_img = np.array(pil)
252
+
253
+ qr_results = detect_qr_opencv(np_img)
254
+ phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
255
+
256
+ flags = {}
257
+ for i, qr in enumerate(qr_results):
258
+ msgs = []
259
+ payload = qr.get("data", "")
260
+ if not payload:
261
+ msgs.append("UNDECODED_QR")
262
+ elif not match_payload(payload, approved_list):
263
+ msgs.append("UNAPPROVED_QR")
264
+ # Check overlap with phones
265
+ if phone_boxes:
266
+ qb = qr["bbox"]
267
+ for pb in phone_boxes:
268
+ if iou(qb, pb) >= iou_threshold:
269
+ msgs.append("ON_PHONE")
270
+ break
271
+ flags[i] = msgs
272
+
273
+ # Append a row
274
+ rows.append({
275
+ "frame": os.path.basename(path),
276
+ "qr_index": i,
277
+ "payload": payload,
278
+ "approved_match": (payload and match_payload(payload, approved_list)),
279
+ "on_phone": ("ON_PHONE" in msgs),
280
+ "undecoded": ("UNDECODED_QR" in msgs),
281
+ "anomalies": "|".join(msgs) if msgs else "",
282
+ "qr_bbox": qr["bbox"],
283
+ "phone_boxes": phone_boxes
284
+ })
285
+
286
+ # If no QR detected, still log the frame
287
+ if not qr_results:
288
+ rows.append({
289
+ "frame": os.path.basename(path),
290
+ "qr_index": -1,
291
+ "payload": "",
292
+ "approved_match": False,
293
+ "on_phone": False,
294
+ "undecoded": False,
295
+ "anomalies": "NO_QR_FOUND",
296
+ "qr_bbox": None,
297
+ "phone_boxes": phone_boxes
298
+ })
299
+
300
+ annotated = annotate_image(pil, qr_results, phone_boxes, flags)
301
+ out_path = os.path.join(annotated_dir, os.path.basename(path))
302
+ annotated.save(out_path)
303
+
304
+ progress.progress((idx+1)/len(img_paths))
305
+
306
+ status.text("Completed.")
307
+ df = pd.DataFrame(rows)
308
+
309
+ st.subheader("Results")
310
+ st.dataframe(df, use_container_width=True)
311
+
312
+ # Summary
313
+ st.markdown("### Summary")
314
+ total_frames = len(img_paths)
315
+ total_qr = int((df["qr_index"] >= 0).sum())
316
+ unapproved = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
317
+ on_phone = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
318
+ undecoded = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
319
+ no_qr = int((df["anomalies"] == "NO_QR_FOUND").sum())
320
+ st.write({
321
+ "frames_processed": total_frames,
322
+ "qr_detections": total_qr,
323
+ "unapproved_qr": unapproved,
324
+ "qr_on_phone": on_phone,
325
+ "undecoded_qr": undecoded,
326
+ "frames_with_no_qr": no_qr
327
+ })
328
+
329
+ # Downloads: CSV + ZIP of annotated images
330
+ csv_bytes = df.to_csv(index=False).encode("utf-8")
331
+ st.download_button("⬇️ Download CSV Report", data=csv_bytes, file_name="qr_anomaly_report.csv", mime="text/csv")
332
+
333
+ # Create ZIP
334
+ mem = io.BytesIO()
335
+ with zipfile.ZipFile(mem, mode="w", compression=zipfile.ZIP_DEFLATED) as z:
336
+ for fname in sorted(os.listdir(annotated_dir)):
337
+ z.write(os.path.join(annotated_dir, fname), arcname=fname)
338
+ mem.seek(0)
339
+ st.download_button("⬇️ Download Annotated Images (ZIP)", data=mem.getvalue(), file_name="annotated_frames.zip", mime="application/zip")
340
+
341
+ else:
342
+ st.info("Upload inputs on the left and click **Run Scan** to begin.")
343
+ st.markdown("""
344
+ **Tips**
345
+ - Your approved list can be **TXT** (one payload per line) or **CSV** (use a `payload` column or the first column).
346
+ - For mobile QR misuse detection, keep **Detect phones (YOLO)** enabled.
347
+ - Name frames with timestamps if you want to correlate events later.
348
+ """)