SuriRaja commited on
Commit
469c083
ยท
verified ยท
1 Parent(s): 032e63d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +71 -204
app.py CHANGED
@@ -2,6 +2,7 @@ import io
2
  import os
3
  import zipfile
4
  import tempfile
 
5
  from typing import List, Dict, Any
6
 
7
  import streamlit as st
@@ -13,6 +14,27 @@ import cv2
13
  # Optional: YOLO for phone detection
14
  YOLO_MODEL = None
15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  def load_yolo():
17
  global YOLO_MODEL
18
  if YOLO_MODEL is None:
@@ -37,6 +59,7 @@ def iou(boxA, boxB) -> float:
37
  denom = areaA + areaB - interArea + 1e-6
38
  return interArea / denom
39
 
 
40
  def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
41
  det = cv2.QRCodeDetector()
42
  retval, data_list, points, _ = det.detectAndDecodeMulti(image_np)
@@ -47,26 +70,18 @@ def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
47
  pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
48
  x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
49
  x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
50
- results.append({
51
- "bbox": [float(x1), float(y1), float(x2), float(y2)],
52
- "data": data_single,
53
- "points": pts.tolist()
54
- })
55
  return results
56
-
57
  decoded_list = data_list if isinstance(data_list, (list, tuple)) else [data_list] * len(points)
58
  for i, quad in enumerate(points):
59
  pts = np.array(quad, dtype=np.float32).reshape(-1,2)
60
  x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
61
  x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
62
  payload = decoded_list[i] if i < len(decoded_list) else ""
63
- results.append({
64
- "bbox": [float(x1), float(y1), float(x2), float(y2)],
65
- "data": payload,
66
- "points": pts.tolist()
67
- })
68
  return results
69
 
 
70
  def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
71
  model = load_yolo()
72
  if model is None:
@@ -79,71 +94,20 @@ def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[fl
79
  bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
80
  return bboxes
81
 
82
- def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]],
83
- phone_boxes: List[List[float]], flags: Dict[int, List[str]]) -> Image.Image:
84
- img = pil_img.copy().convert("RGB")
85
- draw = ImageDraw.Draw(img)
86
- try:
87
- font = ImageFont.load_default()
88
- except:
89
- font = None
90
-
91
- for pb in phone_boxes:
92
- draw.rectangle(pb, outline=(255, 165, 0), width=3)
93
- draw.text((pb[0], pb[1]-12), "PHONE", fill=(255,165,0), font=font)
94
-
95
- for i, qr in enumerate(qr_boxes):
96
- color = (0,255,0)
97
- if i in flags and any("UNAPPROVED" in f or "ON_PHONE" in f for f in flags[i]):
98
- color = (255,0,0)
99
- draw.rectangle(qr["bbox"], outline=color, width=3)
100
- label = "QR"
101
- if qr.get("data"):
102
- snippet = qr["data"][:32].replace("\n"," ")
103
- label += f": {snippet}"
104
- draw.text((qr["bbox"][0], qr["bbox"][1]-12), label, fill=color, font=font)
105
-
106
- for i, msgs in flags.items():
107
- if not msgs: continue
108
- x1, y1, x2, y2 = qr_boxes[i]["bbox"]
109
- y_text = y2 + 4
110
- for msg in msgs:
111
- draw.text((x1, y_text), f"[{msg}]", fill=(255,0,0), font=font)
112
- y_text += 12
113
-
114
- return img
115
-
116
- def unpack_zip(uploaded_file, workdir):
117
- zf = zipfile.ZipFile(uploaded_file)
118
- out_paths = []
119
- for name in zf.namelist():
120
- if name.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".webp")):
121
- p = os.path.join(workdir, os.path.basename(name))
122
- with open(p, "wb") as f:
123
- f.write(zf.read(name))
124
- out_paths.append(p)
125
- return out_paths
126
-
127
- def read_approved_list(file) -> List[str]:
128
- name = file.name.lower()
129
- try:
130
- if name.endswith(".csv"):
131
- df = pd.read_csv(file)
132
- if "payload" in df.columns:
133
- vals = df["payload"].dropna().astype(str).tolist()
134
- else:
135
- vals = df.iloc[:,0].dropna().astype(str).tolist()
136
- else:
137
- content = file.read().decode("utf-8", errors="ignore")
138
- file.seek(0) # reset pointer
139
- vals = [line.strip() for line in content.splitlines() if line.strip()]
140
- return [v.strip() for v in vals if v.strip()]
141
- except Exception as e:
142
- st.error(f"Failed to parse approved list: {e}")
143
- return []
144
 
145
- # --- FIXED match with normalization ---
146
  from urllib.parse import urlparse, parse_qs
 
147
  def normalize_payload(payload: str) -> str:
148
  if not payload:
149
  return ""
@@ -162,9 +126,6 @@ def normalize_payload(payload: str) -> str:
162
  return part.strip().lower()
163
  except Exception:
164
  pass
165
- for prefix in ["upi://", "http://", "https://"]:
166
- if p.startswith(prefix):
167
- p = p[len(prefix):]
168
  return p
169
 
170
  def match_payload(payload: str, approved: List[str]) -> bool:
@@ -177,75 +138,55 @@ def match_payload(payload: str, approved: List[str]) -> bool:
177
  return True
178
  return False
179
 
 
 
 
 
 
 
180
  # --- UI ---
181
  st.set_page_config(page_title="QR Code Anomaly Scanner", page_icon="๐Ÿ•ต๏ธ", layout="wide")
182
 
183
- st.markdown(
184
- """
185
- <div style="background-color:#4B8BBE;padding:15px;border-radius:10px;margin-bottom:20px;">
186
- <h1 style="color:white;text-align:center;">๐Ÿ•ต๏ธ QR Code Anomaly Scanner</h1>
187
- <p style="color:white;text-align:center;">AI-Powered Surveillance for Retail Store 360ยฐ CCTV Frames</p>
188
- </div>
189
- """,
190
- unsafe_allow_html=True
191
- )
192
 
193
  with st.sidebar:
194
  st.header("โš™๏ธ Inputs")
195
  approved_file = st.file_uploader("๐Ÿ“‘ Approved QR List (CSV/TXT)", type=["csv","txt"])
196
  frames = st.file_uploader("๐Ÿ–ผ๏ธ Frames (images)", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True)
197
- frames_zip = st.file_uploader("๐Ÿ“ฆ Or upload a ZIP of frames", type=["zip"])
198
  run_phone_detection = st.checkbox("๐Ÿ“ฑ Detect phones (YOLO)", value=True)
199
  phone_conf = st.slider("๐Ÿ“ Phone detection confidence", 0.1, 0.8, 0.25, 0.05)
200
  iou_threshold = st.slider("๐ŸŽฏ QRโ€“Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05)
201
  process_btn = st.button("๐Ÿš€ Run Scan", use_container_width=True)
202
 
203
- workdir = tempfile.mkdtemp()
204
-
205
  if process_btn:
206
  if not approved_file:
207
  st.error("Please upload the Approved QR List first.")
208
  st.stop()
209
 
210
- approved_list = read_approved_list(approved_file)
211
- if not approved_list:
212
- st.warning("Approved list is empty or failed to parse. All decoded QR payloads will be treated as UNAPPROVED.")
213
- else:
214
- st.success(f"โœ… Loaded {len(approved_list)} approved entries.")
215
-
216
- img_paths = []
217
- for f in frames or []:
218
- out = os.path.join(workdir, f.name)
219
- with open(out, "wb") as g:
220
- g.write(f.read())
221
- img_paths.append(out)
222
- if frames_zip is not None:
223
- img_paths.extend(unpack_zip(frames_zip, workdir))
224
-
225
- img_paths = sorted(set(img_paths))
226
- if not img_paths:
227
- st.error("Please upload at least one frame image (or a ZIP).")
228
- st.stop()
229
-
230
- if run_phone_detection:
231
- load_yolo()
232
 
233
  rows = []
234
- annotated_dir = os.path.join(workdir, "annotated")
235
- os.makedirs(annotated_dir, exist_ok=True)
236
-
237
- progress = st.progress(0.0)
238
- status = st.empty()
239
 
240
- for idx, path in enumerate(img_paths):
241
- status.text(f"Processing {os.path.basename(path)} ({idx+1}/{len(img_paths)})")
242
- pil = Image.open(path).convert("RGB")
243
  np_img = np.array(pil)
244
 
245
  qr_results = detect_qr_opencv(np_img)
246
  phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
247
 
248
  flags = {}
 
 
 
 
 
 
 
 
 
 
249
  for i, qr in enumerate(qr_results):
250
  msgs = []
251
  payload = qr.get("data", "")
@@ -259,93 +200,19 @@ if process_btn:
259
  if iou(qb, pb) >= iou_threshold:
260
  msgs.append("ON_PHONE")
261
  break
 
 
 
 
 
 
262
  flags[i] = msgs
263
 
264
- rows.append({
265
- "frame": os.path.basename(path),
266
- "qr_index": i,
267
- "payload": payload,
268
- "approved_match": (payload and match_payload(payload, approved_list)),
269
- "on_phone": ("ON_PHONE" in msgs),
270
- "undecoded": ("UNDECODED_QR" in msgs),
271
- "anomalies": "|".join(msgs) if msgs else "",
272
- "qr_bbox": qr["bbox"],
273
- "phone_boxes": phone_boxes
274
- })
275
-
276
- if not qr_results:
277
- rows.append({
278
- "frame": os.path.basename(path),
279
- "qr_index": -1,
280
- "payload": "",
281
- "approved_match": False,
282
- "on_phone": False,
283
- "undecoded": False,
284
- "anomalies": "NO_QR_FOUND",
285
- "qr_bbox": None,
286
- "phone_boxes": phone_boxes
287
- })
288
-
289
- annotated = annotate_image(pil, qr_results, phone_boxes, flags)
290
- out_path = os.path.join(annotated_dir, os.path.basename(path))
291
- annotated.save(out_path)
292
-
293
- progress.progress((idx+1)/len(img_paths))
294
 
295
- status.text("โœ… Completed.")
296
  df = pd.DataFrame(rows)
 
297
 
298
- # --- Summary metrics ---
299
- st.markdown("### ๐Ÿ“Š Summary Dashboard")
300
- col1, col2, col3, col4, col5, col6 = st.columns(6)
301
- total_frames = len(img_paths)
302
- total_qr = int((df["qr_index"] >= 0).sum())
303
- unapproved = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
304
- on_phone = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
305
- undecoded = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
306
- no_qr = int((df["anomalies"] == "NO_QR_FOUND").sum())
307
-
308
- col1.metric("Frames Processed", total_frames)
309
- col2.metric("QR Detections", total_qr)
310
- col3.metric("โŒ Unapproved", unapproved)
311
- col4.metric("๐Ÿ“ฑ On Phone", on_phone)
312
- col5.metric("โš ๏ธ Undecoded", undecoded)
313
- col6.metric("๐Ÿšซ No QR Found", no_qr)
314
-
315
- # --- Results Table ---
316
- st.markdown("### ๐Ÿ“ Detailed Results")
317
- def highlight_anomalies(val):
318
- if "UNAPPROVED" in str(val):
319
- return "background-color: #FFB6C1; color: black;"
320
- elif "ON_PHONE" in str(val):
321
- return "background-color: #FFD580; color: black;"
322
- elif "UNDECODED" in str(val):
323
- return "background-color: #B0C4DE; color: black;"
324
- elif "NO_QR_FOUND" in str(val):
325
- return "background-color: #D3D3D3; color: black;"
326
- return ""
327
-
328
- st.dataframe(df.style.applymap(highlight_anomalies, subset=["anomalies"]), use_container_width=True)
329
-
330
- # --- Image Gallery ---
331
- st.markdown("### ๐Ÿ–ผ๏ธ Annotated Frames Preview")
332
- cols = st.columns(3)
333
- for idx, fname in enumerate(sorted(os.listdir(annotated_dir))):
334
- with cols[idx % 3]:
335
- st.image(os.path.join(annotated_dir, fname), caption=fname, use_container_width=True)
336
-
337
- # --- Downloads ---
338
- csv_bytes = df.to_csv(index=False).encode("utf-8")
339
- st.download_button("โฌ‡๏ธ Download CSV Report", data=csv_bytes,
340
- file_name="qr_anomaly_report.csv", mime="text/csv")
341
-
342
- mem = io.BytesIO()
343
- with zipfile.ZipFile(mem, mode="w", compression=zipfile.ZIP_DEFLATED) as z:
344
- for fname in sorted(os.listdir(annotated_dir)):
345
- z.write(os.path.join(annotated_dir, fname), arcname=fname)
346
- mem.seek(0)
347
- st.download_button("โฌ‡๏ธ Download Annotated Images (ZIP)", data=mem.getvalue(),
348
- file_name="annotated_frames.zip", mime="application/zip")
349
-
350
- else:
351
- st.info("Upload inputs on the left and click **Run Scan** to begin.")
 
2
  import os
3
  import zipfile
4
  import tempfile
5
+ import time
6
  from typing import List, Dict, Any
7
 
8
  import streamlit as st
 
14
  # Optional: YOLO for phone detection
15
  YOLO_MODEL = None
16
 
17
+ # ROI Zones (x1, y1, x2, y2)
18
+ ROI_ZONES = [(100, 100, 400, 400)] # example, configurable later
19
+ ABSENCE_THRESHOLD_SEC = 15
20
+ ALERT_LOG_FILE = "alerts.json"
21
+
22
+ # --- Alert persistence ---
23
+ def load_alerts():
24
+ if os.path.exists(ALERT_LOG_FILE):
25
+ return pd.read_json(ALERT_LOG_FILE).to_dict(orient="records")
26
+ return []
27
+
28
+ def save_alert(alert: Dict[str, Any]):
29
+ alerts = load_alerts()
30
+ alerts.append(alert)
31
+ pd.DataFrame(alerts).to_json(ALERT_LOG_FILE, orient="records", indent=2)
32
+
33
+ def log_alert(message, frame_name):
34
+ alert = {"time": time.ctime(), "alert": message, "frame": frame_name}
35
+ save_alert(alert)
36
+
37
+ # --- YOLO Loader ---
38
  def load_yolo():
39
  global YOLO_MODEL
40
  if YOLO_MODEL is None:
 
59
  denom = areaA + areaB - interArea + 1e-6
60
  return interArea / denom
61
 
62
+ # --- QR Detection ---
63
  def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
64
  det = cv2.QRCodeDetector()
65
  retval, data_list, points, _ = det.detectAndDecodeMulti(image_np)
 
70
  pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
71
  x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
72
  x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
73
+ results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": data_single,"points": pts.tolist()})
 
 
 
 
74
  return results
 
75
  decoded_list = data_list if isinstance(data_list, (list, tuple)) else [data_list] * len(points)
76
  for i, quad in enumerate(points):
77
  pts = np.array(quad, dtype=np.float32).reshape(-1,2)
78
  x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
79
  x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
80
  payload = decoded_list[i] if i < len(decoded_list) else ""
81
+ results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": payload,"points": pts.tolist()})
 
 
 
 
82
  return results
83
 
84
+ # --- Phone Detection ---
85
  def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
86
  model = load_yolo()
87
  if model is None:
 
94
  bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
95
  return bboxes
96
 
97
+ # --- Tampering detection ---
98
+ def detect_tampering(image_np: np.ndarray, bbox: List[float]) -> bool:
99
+ x1, y1, x2, y2 = map(int, bbox)
100
+ roi = image_np[y1:y2, x1:x2]
101
+ if roi.size == 0:
102
+ return False
103
+ gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
104
+ edges = cv2.Canny(gray, 100, 200)
105
+ edge_density = np.sum(edges > 0) / (roi.shape[0] * roi.shape[1])
106
+ return edge_density < 0.01
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
 
108
+ # --- Payload normalization & UPI parsing ---
109
  from urllib.parse import urlparse, parse_qs
110
+
111
  def normalize_payload(payload: str) -> str:
112
  if not payload:
113
  return ""
 
126
  return part.strip().lower()
127
  except Exception:
128
  pass
 
 
 
129
  return p
130
 
131
  def match_payload(payload: str, approved: List[str]) -> bool:
 
138
  return True
139
  return False
140
 
141
+ # --- ROI Helper ---
142
+ def inside_roi(bbox, roi):
143
+ x1, y1, x2, y2 = bbox
144
+ rx1, ry1, rx2, ry2 = roi
145
+ return (x1 >= rx1 and y1 >= ry1 and x2 <= rx2 and y2 <= ry2)
146
+
147
  # --- UI ---
148
  st.set_page_config(page_title="QR Code Anomaly Scanner", page_icon="๐Ÿ•ต๏ธ", layout="wide")
149
 
150
+ st.title("๐Ÿ•ต๏ธ QR Code Anomaly Scanner with Extended Compliance")
 
 
 
 
 
 
 
 
151
 
152
  with st.sidebar:
153
  st.header("โš™๏ธ Inputs")
154
  approved_file = st.file_uploader("๐Ÿ“‘ Approved QR List (CSV/TXT)", type=["csv","txt"])
155
  frames = st.file_uploader("๐Ÿ–ผ๏ธ Frames (images)", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True)
 
156
  run_phone_detection = st.checkbox("๐Ÿ“ฑ Detect phones (YOLO)", value=True)
157
  phone_conf = st.slider("๐Ÿ“ Phone detection confidence", 0.1, 0.8, 0.25, 0.05)
158
  iou_threshold = st.slider("๐ŸŽฏ QRโ€“Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05)
159
  process_btn = st.button("๐Ÿš€ Run Scan", use_container_width=True)
160
 
 
 
161
  if process_btn:
162
  if not approved_file:
163
  st.error("Please upload the Approved QR List first.")
164
  st.stop()
165
 
166
+ approved_list = pd.read_csv(approved_file).iloc[:,0].astype(str).tolist() if approved_file.name.endswith(".csv") else approved_file.read().decode().splitlines()
167
+ st.success(f"โœ… Loaded {len(approved_list)} approved entries.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
168
 
169
  rows = []
170
+ absence_counter = 0
 
 
 
 
171
 
172
+ for f in frames or []:
173
+ pil = Image.open(f).convert("RGB")
 
174
  np_img = np.array(pil)
175
 
176
  qr_results = detect_qr_opencv(np_img)
177
  phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
178
 
179
  flags = {}
180
+ if len(qr_results) > 1:
181
+ log_alert("Multiple QRs detected", f.name)
182
+
183
+ if not qr_results:
184
+ absence_counter += 1
185
+ if absence_counter * 1 > ABSENCE_THRESHOLD_SEC: # assuming 1s per frame approx
186
+ log_alert("QR absent for threshold duration", f.name)
187
+ else:
188
+ absence_counter = 0
189
+
190
  for i, qr in enumerate(qr_results):
191
  msgs = []
192
  payload = qr.get("data", "")
 
200
  if iou(qb, pb) >= iou_threshold:
201
  msgs.append("ON_PHONE")
202
  break
203
+ if not any(inside_roi(qr["bbox"], roi) for roi in ROI_ZONES):
204
+ msgs.append("OUTSIDE_ROI")
205
+ if detect_tampering(np_img, qr["bbox"]):
206
+ msgs.append("TAMPERING")
207
+ if msgs:
208
+ log_alert("|".join(msgs), f.name)
209
  flags[i] = msgs
210
 
211
+ rows.append({"frame": f.name,"qr_index": i,"payload": payload,"approved_match": (payload and match_payload(payload, approved_list)),"anomalies": "|".join(msgs) if msgs else ""})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
212
 
 
213
  df = pd.DataFrame(rows)
214
+ st.dataframe(df)
215
 
216
+ st.subheader("๐Ÿ“œ Alert History")
217
+ for a in load_alerts()[-10:][::-1]:
218
+ st.write(f"{a['time']} [{a['frame']}] โ†’ {a['alert']}")