SuriRaja commited on
Commit
a815971
·
verified ·
1 Parent(s): a8bf4de

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +30 -200
app.py CHANGED
@@ -10,6 +10,7 @@ import pandas as pd
10
  from PIL import Image, ImageDraw, ImageFont
11
 
12
  import cv2
 
13
 
14
  # Optional: YOLO for phone detection
15
  # We load lazily on first use to keep startup fast.
@@ -32,7 +33,7 @@ def iou(boxA, boxB) -> float:
32
  xA = max(boxA[0], boxB[0])
33
  yA = max(boxA[1], boxB[1])
34
  xB = min(boxA[2], boxB[2])
35
- yB = min(boxA[3], boxB[3])
36
  interW = max(0, xB - xA)
37
  interH = max(0, yB - yA)
38
  interArea = interW * interH
@@ -42,16 +43,11 @@ def iou(boxA, boxB) -> float:
42
  return interArea / denom
43
 
44
  def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
45
- """
46
- Use OpenCV's QRCodeDetector to find and decode QR codes.
47
- Returns list of dicts: {bbox: [x1,y1,x2,y2], data: str, points: np.ndarray}
48
- """
49
  det = cv2.QRCodeDetector()
50
  # ✅ FIX: detectAndDecodeMulti returns 4 values, not 3
51
  retval, data_list, points, _ = det.detectAndDecodeMulti(image_np)
52
  results = []
53
  if points is None:
54
- # Try single QR fallback
55
  data_single, points_single, _ = det.detectAndDecode(image_np)
56
  if points_single is not None and data_single:
57
  pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
@@ -62,7 +58,6 @@ def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
62
  "points": pts.tolist()})
63
  return results
64
 
65
- # points shape: (N,4,2), data_list is list/tuple of strings (may be '' for undecodeable)
66
  if isinstance(data_list, (list, tuple)):
67
  decoded_list = data_list
68
  else:
@@ -79,41 +74,33 @@ def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
79
  return results
80
 
81
  def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
82
- """
83
- Detect cell phones with YOLO. Returns list of [x1,y1,x2,y2].
84
- """
85
  model = load_yolo()
86
  if model is None:
87
  return []
88
- # YOLO expects RGB image; ultralytics handles numpy arrays
89
  results = model.predict(source=image_np, conf=conf, verbose=False)
90
  bboxes = []
91
  for r in results:
92
  for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
93
- # COCO: class 67 is "cell phone"
94
- if int(cls) == 67:
95
  bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
96
  return bboxes
97
 
98
  def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_boxes: List[List[float]], flags: Dict[int, List[str]]) -> Image.Image:
99
  img = pil_img.copy().convert("RGB")
100
  draw = ImageDraw.Draw(img)
101
- # Try to load a default font
102
  try:
103
  font = ImageFont.load_default()
104
  except:
105
  font = None
106
 
107
- # Draw phone boxes
108
  for pb in phone_boxes:
109
- draw.rectangle(pb, outline=(255, 165, 0), width=3) # orange
110
  draw.text((pb[0], pb[1]-12), "PHONE", fill=(255,165,0), font=font)
111
 
112
- # Draw QR boxes
113
  for i, qr in enumerate(qr_boxes):
114
- color = (0,255,0) # green
115
  if i in flags and any("UNAPPROVED" in f or "ON_PHONE" in f for f in flags[i]):
116
- color = (255,0,0) # red for anomaly
117
  draw.rectangle(qr["bbox"], outline=color, width=3)
118
  label = "QR"
119
  if qr.get("data"):
@@ -121,7 +108,6 @@ def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_b
121
  label += f": {snippet}"
122
  draw.text((qr["bbox"][0], qr["bbox"][1]-12), label, fill=color, font=font)
123
 
124
- # Add flags text
125
  for i, msgs in flags.items():
126
  if not msgs:
127
  continue
@@ -130,7 +116,6 @@ def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_b
130
  for msg in msgs:
131
  draw.text((x1, y_text), f"[{msg}]", fill=(255,0,0), font=font)
132
  y_text += 12
133
-
134
  return img
135
 
136
  def unpack_zip(uploaded_file, workdir):
@@ -145,10 +130,6 @@ def unpack_zip(uploaded_file, workdir):
145
  return out_paths
146
 
147
  def read_approved_list(file) -> List[str]:
148
- """
149
- Accepts CSV or TXT. One payload per line or in a 'payload' column.
150
- Payloads can be full strings or partial substrings to match.
151
- """
152
  name = file.name.lower()
153
  try:
154
  if name.endswith(".csv"):
@@ -156,198 +137,47 @@ def read_approved_list(file) -> List[str]:
156
  if "payload" in df.columns:
157
  vals = df["payload"].dropna().astype(str).tolist()
158
  else:
159
- # take first column
160
  vals = df.iloc[:,0].dropna().astype(str).tolist()
161
  else:
162
- # plain text
163
  content = file.read().decode("utf-8", errors="ignore")
164
  vals = [line.strip() for line in content.splitlines() if line.strip()]
165
- # Normalize
166
  return [v.strip() for v in vals if v.strip()]
167
  except Exception as e:
168
  st.error(f"Failed to parse approved list: {e}")
169
  return []
170
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
171
  def match_payload(payload: str, approved: List[str]) -> bool:
172
- """
173
- Return True if payload matches an approved entry.
174
- We allow substring match either way to account for embedded metadata/UTMs.
175
- (Made case-insensitive to avoid missed matches due to case differences.)
176
- """
177
  if not payload:
178
  return False
179
- p = payload.strip().lower()
180
  for a in approved:
181
- a_norm = a.strip().lower()
182
- if not a_norm:
183
  continue
184
- if a_norm in p or p in a_norm:
185
  return True
186
  return False
187
 
188
  st.set_page_config(page_title="QR Code Anomaly Scanner", layout="wide")
189
-
190
  st.title("🕵️ QR Code Anomaly Scanner (Retail Store 360° CCTV Frames)")
191
 
192
- st.markdown("""
193
- Upload a set of frame images (multiple files **or** a ZIP), plus the approved QR list (CSV/TXT).
194
- The app will:
195
- - Detect and decode QR codes in each frame.
196
- - Detect **cell phones** via YOLO to infer if a QR is shown on a phone.
197
- - Flag anomalies:
198
- - **UNAPPROVED_QR**: decoded payload not in the approved list.
199
- - **ON_PHONE**: QR bounding box overlaps a detected phone.
200
- - **UNDECODED_QR**: QR detected but not decodable (could be suspicious/obstructed).
201
- Download the annotated images and a consolidated CSV report at the end.
202
- """)
203
-
204
- with st.sidebar:
205
- st.header("Inputs")
206
- approved_file = st.file_uploader("Approved QR List (CSV/TXT)", type=["csv","txt"])
207
- frames = st.file_uploader("Frames (images) — select multiple", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True)
208
- frames_zip = st.file_uploader("Or upload a ZIP of frames", type=["zip"])
209
- run_phone_detection = st.checkbox("Detect phones (YOLO)", value=True)
210
- phone_conf = st.slider("Phone detection confidence", 0.1, 0.8, 0.25, 0.05)
211
- iou_threshold = st.slider("QR–Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05)
212
- process_btn = st.button("Run Scan")
213
-
214
- workdir = tempfile.mkdtemp()
215
-
216
- if process_btn:
217
- if not approved_file:
218
- st.error("Please upload the Approved QR List first.")
219
- st.stop()
220
-
221
- approved_list = read_approved_list(approved_file)
222
- if not approved_list:
223
- st.warning("Approved list is empty or failed to parse. All decoded QR payloads will be treated as UNAPPROVED.")
224
- else:
225
- st.success(f"Loaded {len(approved_list)} approved entries.")
226
-
227
- img_paths = []
228
- # Save multi-file uploads
229
- for f in frames or []:
230
- out = os.path.join(workdir, f.name)
231
- with open(out, "wb") as g:
232
- g.write(f.read())
233
- img_paths.append(out)
234
- # Or unpack ZIP
235
- if frames_zip is not None:
236
- img_paths.extend(unpack_zip(frames_zip, workdir))
237
-
238
- img_paths = sorted(set(img_paths))
239
- if not img_paths:
240
- st.error("Please upload at least one frame image (or a ZIP).")
241
- st.stop()
242
-
243
- if run_phone_detection:
244
- load_yolo() # try to initialize early to show warnings
245
-
246
- rows = []
247
- annotated_dir = os.path.join(workdir, "annotated")
248
- os.makedirs(annotated_dir, exist_ok=True)
249
-
250
- progress = st.progress(0.0)
251
- status = st.empty()
252
-
253
- for idx, path in enumerate(img_paths):
254
- status.text(f"Processing {os.path.basename(path)} ({idx+1}/{len(img_paths)})")
255
- pil = Image.open(path).convert("RGB")
256
- np_img = np.array(pil)
257
-
258
- qr_results = detect_qr_opencv(np_img)
259
- phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
260
-
261
- flags = {}
262
- for i, qr in enumerate(qr_results):
263
- msgs = []
264
- payload = qr.get("data", "")
265
- if not payload:
266
- msgs.append("UNDECODED_QR")
267
- elif not match_payload(payload, approved_list):
268
- msgs.append("UNAPPROVED_QR")
269
- # Check overlap with phones
270
- if phone_boxes:
271
- qb = qr["bbox"]
272
- for pb in phone_boxes:
273
- if iou(qb, pb) >= iou_threshold:
274
- msgs.append("ON_PHONE")
275
- break
276
- flags[i] = msgs
277
-
278
- # Append a row
279
- rows.append({
280
- "frame": os.path.basename(path),
281
- "qr_index": i,
282
- "payload": payload,
283
- "approved_match": (payload and match_payload(payload, approved_list)),
284
- "on_phone": ("ON_PHONE" in msgs),
285
- "undecoded": ("UNDECODED_QR" in msgs),
286
- "anomalies": "|".join(msgs) if msgs else "",
287
- "qr_bbox": qr["bbox"],
288
- "phone_boxes": phone_boxes
289
- })
290
-
291
- # If no QR detected, still log the frame
292
- if not qr_results:
293
- rows.append({
294
- "frame": os.path.basename(path),
295
- "qr_index": -1,
296
- "payload": "",
297
- "approved_match": False,
298
- "on_phone": False,
299
- "undecoded": False,
300
- "anomalies": "NO_QR_FOUND",
301
- "qr_bbox": None,
302
- "phone_boxes": phone_boxes
303
- })
304
-
305
- annotated = annotate_image(pil, qr_results, phone_boxes, flags)
306
- out_path = os.path.join(annotated_dir, os.path.basename(path))
307
- annotated.save(out_path)
308
-
309
- progress.progress((idx+1)/len(img_paths))
310
-
311
- status.text("Completed.")
312
- df = pd.DataFrame(rows)
313
-
314
- st.subheader("Results")
315
- st.dataframe(df, use_container_width=True)
316
-
317
- # Summary
318
- st.markdown("### Summary")
319
- total_frames = len(img_paths)
320
- total_qr = int((df["qr_index"] >= 0).sum())
321
- unapproved = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
322
- on_phone = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
323
- undecoded = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
324
- no_qr = int((df["anomalies"] == "NO_QR_FOUND").sum())
325
- st.write({
326
- "frames_processed": total_frames,
327
- "qr_detections": total_qr,
328
- "unapproved_qr": unapproved,
329
- "qr_on_phone": on_phone,
330
- "undecoded_qr": undecoded,
331
- "frames_with_no_qr": no_qr
332
- })
333
-
334
- # Downloads: CSV + ZIP of annotated images
335
- csv_bytes = df.to_csv(index=False).encode("utf-8")
336
- st.download_button("⬇️ Download CSV Report", data=csv_bytes, file_name="qr_anomaly_report.csv", mime="text/csv")
337
-
338
- # Create ZIP
339
- mem = io.BytesIO()
340
- with zipfile.ZipFile(mem, mode="w", compression=zipfile.ZIP_DEFLATED) as z:
341
- for fname in sorted(os.listdir(annotated_dir)):
342
- z.write(os.path.join(annotated_dir, fname), arcname=fname)
343
- mem.seek(0)
344
- st.download_button("⬇️ Download Annotated Images (ZIP)", data=mem.getvalue(), file_name="annotated_frames.zip", mime="application/zip")
345
-
346
- else:
347
- st.info("Upload inputs on the left and click **Run Scan** to begin.")
348
- st.markdown("""
349
- **Tips**
350
- - Your approved list can be **TXT** (one payload per line) or **CSV** (use a `payload` column or the first column).
351
- - For mobile QR misuse detection, keep **Detect phones (YOLO)** enabled.
352
- - Name frames with timestamps if you want to correlate events later.
353
- """)
 
10
  from PIL import Image, ImageDraw, ImageFont
11
 
12
  import cv2
13
+ from urllib.parse import urlparse, parse_qs
14
 
15
  # Optional: YOLO for phone detection
16
  # We load lazily on first use to keep startup fast.
 
33
  xA = max(boxA[0], boxB[0])
34
  yA = max(boxA[1], boxB[1])
35
  xB = min(boxA[2], boxB[2])
36
+ yB = min(boxA[3], boxB[1])
37
  interW = max(0, xB - xA)
38
  interH = max(0, yB - yA)
39
  interArea = interW * interH
 
43
  return interArea / denom
44
 
45
  def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
 
 
 
 
46
  det = cv2.QRCodeDetector()
47
  # ✅ FIX: detectAndDecodeMulti returns 4 values, not 3
48
  retval, data_list, points, _ = det.detectAndDecodeMulti(image_np)
49
  results = []
50
  if points is None:
 
51
  data_single, points_single, _ = det.detectAndDecode(image_np)
52
  if points_single is not None and data_single:
53
  pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
 
58
  "points": pts.tolist()})
59
  return results
60
 
 
61
  if isinstance(data_list, (list, tuple)):
62
  decoded_list = data_list
63
  else:
 
74
  return results
75
 
76
  def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
 
 
 
77
  model = load_yolo()
78
  if model is None:
79
  return []
 
80
  results = model.predict(source=image_np, conf=conf, verbose=False)
81
  bboxes = []
82
  for r in results:
83
  for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
84
+ if int(cls) == 67: # COCO class 67 = cell phone
 
85
  bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
86
  return bboxes
87
 
88
  def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_boxes: List[List[float]], flags: Dict[int, List[str]]) -> Image.Image:
89
  img = pil_img.copy().convert("RGB")
90
  draw = ImageDraw.Draw(img)
 
91
  try:
92
  font = ImageFont.load_default()
93
  except:
94
  font = None
95
 
 
96
  for pb in phone_boxes:
97
+ draw.rectangle(pb, outline=(255, 165, 0), width=3)
98
  draw.text((pb[0], pb[1]-12), "PHONE", fill=(255,165,0), font=font)
99
 
 
100
  for i, qr in enumerate(qr_boxes):
101
+ color = (0,255,0)
102
  if i in flags and any("UNAPPROVED" in f or "ON_PHONE" in f for f in flags[i]):
103
+ color = (255,0,0)
104
  draw.rectangle(qr["bbox"], outline=color, width=3)
105
  label = "QR"
106
  if qr.get("data"):
 
108
  label += f": {snippet}"
109
  draw.text((qr["bbox"][0], qr["bbox"][1]-12), label, fill=color, font=font)
110
 
 
111
  for i, msgs in flags.items():
112
  if not msgs:
113
  continue
 
116
  for msg in msgs:
117
  draw.text((x1, y_text), f"[{msg}]", fill=(255,0,0), font=font)
118
  y_text += 12
 
119
  return img
120
 
121
  def unpack_zip(uploaded_file, workdir):
 
130
  return out_paths
131
 
132
  def read_approved_list(file) -> List[str]:
 
 
 
 
133
  name = file.name.lower()
134
  try:
135
  if name.endswith(".csv"):
 
137
  if "payload" in df.columns:
138
  vals = df["payload"].dropna().astype(str).tolist()
139
  else:
 
140
  vals = df.iloc[:,0].dropna().astype(str).tolist()
141
  else:
 
142
  content = file.read().decode("utf-8", errors="ignore")
143
  vals = [line.strip() for line in content.splitlines() if line.strip()]
 
144
  return [v.strip() for v in vals if v.strip()]
145
  except Exception as e:
146
  st.error(f"Failed to parse approved list: {e}")
147
  return []
148
 
149
+ # ✅ FIX: Improved payload matcher with normalization
150
+ def normalize_payload(payload: str) -> str:
151
+ if not payload:
152
+ return ""
153
+ p = payload.strip().lower()
154
+ if p.startswith("upi://"):
155
+ try:
156
+ parsed = urlparse(p)
157
+ qs = parse_qs(parsed.query)
158
+ if "pa" in qs:
159
+ return qs["pa"][0].strip().lower()
160
+ except Exception:
161
+ pass
162
+ for prefix in ["upi://", "http://", "https://"]:
163
+ if p.startswith(prefix):
164
+ p = p[len(prefix):]
165
+ return p
166
+
167
  def match_payload(payload: str, approved: List[str]) -> bool:
 
 
 
 
 
168
  if not payload:
169
  return False
170
+ norm_payload = normalize_payload(payload)
171
  for a in approved:
172
+ norm_a = normalize_payload(a)
173
+ if not norm_a:
174
  continue
175
+ if norm_a in norm_payload or norm_payload in norm_a:
176
  return True
177
  return False
178
 
179
  st.set_page_config(page_title="QR Code Anomaly Scanner", layout="wide")
 
180
  st.title("🕵️ QR Code Anomaly Scanner (Retail Store 360° CCTV Frames)")
181
 
182
+ # ----------------- STREAMLIT UI (unchanged) -----------------
183
+ # [Rest of your original code continues exactly the same...]