Spaces:
Sleeping
Sleeping
| import io | |
| import os | |
| import zipfile | |
| import tempfile | |
| from typing import List, Tuple, Dict, Any | |
| import streamlit as st | |
| import numpy as np | |
| import pandas as pd | |
| from PIL import Image, ImageDraw, ImageFont | |
| import cv2 | |
| # Optional: YOLO for phone detection | |
| # We load lazily on first use to keep startup fast. | |
| YOLO_MODEL = None | |
| def load_yolo(): | |
| global YOLO_MODEL | |
| if YOLO_MODEL is None: | |
| try: | |
| from ultralytics import YOLO | |
| # Use lightweight pretrained model; supports "cell phone" class via COCO. | |
| YOLO_MODEL = YOLO('yolov8n.pt') # automatically downloads on first run | |
| except Exception as e: | |
| st.warning(f"YOLO model could not be loaded: {e}") | |
| YOLO_MODEL = None | |
| return YOLO_MODEL | |
| def iou(boxA, boxB) -> float: | |
| # boxes in [x1,y1,x2,y2] | |
| xA = max(boxA[0], boxB[0]) | |
| yA = max(boxA[1], boxB[1]) | |
| xB = min(boxA[2], boxB[2]) | |
| yB = min(boxA[3], boxB[3]) | |
| interW = max(0, xB - xA) | |
| interH = max(0, yB - yA) | |
| interArea = interW * interH | |
| areaA = max(0, boxA[2] - boxA[0]) * max(0, boxA[3] - boxA[1]) | |
| areaB = max(0, boxB[2] - boxB[0]) * max(0, boxB[3] - boxB[1]) | |
| denom = areaA + areaB - interArea + 1e-6 | |
| return interArea / denom | |
| def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]: | |
| """ | |
| Use OpenCV's QRCodeDetector to find and decode QR codes. | |
| Returns list of dicts: {bbox: [x1,y1,x2,y2], data: str, points: np.ndarray} | |
| """ | |
| det = cv2.QRCodeDetector() | |
| data, points, _ = det.detectAndDecodeMulti(image_np) | |
| results = [] | |
| if points is None: | |
| # Try single QR fallback | |
| data_single, points_single, _ = det.detectAndDecode(image_np) | |
| if points_single is not None and data_single: | |
| pts = np.array(points_single, dtype=np.float32).reshape(-1, 2) | |
| x1, y1 = np.min(pts[:,0]), np.min(pts[:,1]) | |
| x2, y2 = np.max(pts[:,0]), np.max(pts[:,1]) | |
| results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)], | |
| "data": data_single, | |
| "points": pts.tolist()}) | |
| return results | |
| # points shape: (N,4,2), data is list/tuple of strings (may be '' for undecodeable) | |
| if isinstance(data, (list, tuple)): | |
| decoded_list = data | |
| else: | |
| decoded_list = [data] * len(points) | |
| for i, quad in enumerate(points): | |
| pts = np.array(quad, dtype=np.float32).reshape(-1,2) | |
| x1, y1 = np.min(pts[:,0]), np.min(pts[:,1]) | |
| x2, y2 = np.max(pts[:,0]), np.max(pts[:,1]) | |
| payload = decoded_list[i] if i < len(decoded_list) else "" | |
| results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)], | |
| "data": payload, | |
| "points": pts.tolist()}) | |
| return results | |
| def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]: | |
| """ | |
| Detect cell phones with YOLO. Returns list of [x1,y1,x2,y2]. | |
| """ | |
| model = load_yolo() | |
| if model is None: | |
| return [] | |
| # YOLO expects RGB image; ultralytics handles numpy arrays | |
| results = model.predict(source=image_np, conf=conf, verbose=False) | |
| bboxes = [] | |
| for r in results: | |
| for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()): | |
| # COCO: class 67 is "cell phone" | |
| if int(cls) == 67: | |
| bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])]) | |
| return bboxes | |
| def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_boxes: List[List[float]], flags: Dict[int, List[str]]) -> Image.Image: | |
| img = pil_img.copy().convert("RGB") | |
| draw = ImageDraw.Draw(img) | |
| # Try to load a default font | |
| try: | |
| font = ImageFont.load_default() | |
| except: | |
| font = None | |
| # Draw phone boxes | |
| for pb in phone_boxes: | |
| draw.rectangle(pb, outline=(255, 165, 0), width=3) # orange | |
| draw.text((pb[0], pb[1]-12), "PHONE", fill=(255,165,0), font=font) | |
| # Draw QR boxes | |
| for i, qr in enumerate(qr_boxes): | |
| color = (0,255,0) # green | |
| if i in flags and any("UNAPPROVED" in f or "ON_PHONE" in f for f in flags[i]): | |
| color = (255,0,0) # red for anomaly | |
| draw.rectangle(qr["bbox"], outline=color, width=3) | |
| label = "QR" | |
| if qr.get("data"): | |
| snippet = qr["data"][:32].replace("\n"," ") | |
| label += f": {snippet}" | |
| draw.text((qr["bbox"][0], qr["bbox"][1]-12), label, fill=color, font=font) | |
| # Add flags text | |
| for i, msgs in flags.items(): | |
| if not msgs: | |
| continue | |
| x1, y1, x2, y2 = qr_boxes[i]["bbox"] | |
| y_text = y2 + 4 | |
| for msg in msgs: | |
| draw.text((x1, y_text), f"[{msg}]", fill=(255,0,0), font=font) | |
| y_text += 12 | |
| return img | |
| def unpack_zip(uploaded_file, workdir): | |
| zf = zipfile.ZipFile(uploaded_file) | |
| out_paths = [] | |
| for name in zf.namelist(): | |
| if name.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".webp")): | |
| p = os.path.join(workdir, os.path.basename(name)) | |
| with open(p, "wb") as f: | |
| f.write(zf.read(name)) | |
| out_paths.append(p) | |
| return out_paths | |
| def read_approved_list(file) -> List[str]: | |
| """ | |
| Accepts CSV or TXT. One payload per line or in a 'payload' column. | |
| Payloads can be full strings or partial substrings to match. | |
| """ | |
| name = file.name.lower() | |
| try: | |
| if name.endswith(".csv"): | |
| df = pd.read_csv(file) | |
| if "payload" in df.columns: | |
| vals = df["payload"].dropna().astype(str).tolist() | |
| else: | |
| # take first column | |
| vals = df.iloc[:,0].dropna().astype(str).tolist() | |
| else: | |
| # plain text | |
| content = file.read().decode("utf-8", errors="ignore") | |
| vals = [line.strip() for line in content.splitlines() if line.strip()] | |
| # Normalize | |
| return [v.strip() for v in vals if v.strip()] | |
| except Exception as e: | |
| st.error(f"Failed to parse approved list: {e}") | |
| return [] | |
| def match_payload(payload: str, approved: List[str]) -> bool: | |
| """ | |
| Return True if payload matches an approved entry. | |
| We allow substring match either way to account for embedded metadata/UTMs. | |
| """ | |
| if not payload: | |
| return False | |
| p = payload.strip() | |
| for a in approved: | |
| if a in p or p in a: | |
| return True | |
| return False | |
| st.set_page_config(page_title="QR Code Anomaly Scanner", layout="wide") | |
| st.title("🕵️ QR Code Anomaly Scanner (Retail Store 360° CCTV Frames)") | |
| st.markdown(""" | |
| Upload a set of frame images (multiple files **or** a ZIP), plus the approved QR list (CSV/TXT). | |
| The app will: | |
| - Detect and decode QR codes in each frame. | |
| - Detect **cell phones** via YOLO to infer if a QR is shown on a phone. | |
| - Flag anomalies: | |
| - **UNAPPROVED_QR**: decoded payload not in the approved list. | |
| - **ON_PHONE**: QR bounding box overlaps a detected phone. | |
| - **UNDECODED_QR**: QR detected but not decodable (could be suspicious/obstructed). | |
| Download the annotated images and a consolidated CSV report at the end. | |
| """) | |
| with st.sidebar: | |
| st.header("Inputs") | |
| approved_file = st.file_uploader("Approved QR List (CSV/TXT)", type=["csv","txt"]) | |
| frames = st.file_uploader("Frames (images) — select multiple", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True) | |
| frames_zip = st.file_uploader("Or upload a ZIP of frames", type=["zip"]) | |
| run_phone_detection = st.checkbox("Detect phones (YOLO)", value=True) | |
| phone_conf = st.slider("Phone detection confidence", 0.1, 0.8, 0.25, 0.05) | |
| iou_threshold = st.slider("QR–Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05) | |
| process_btn = st.button("Run Scan") | |
| workdir = tempfile.mkdtemp() | |
| if process_btn: | |
| if not approved_file: | |
| st.error("Please upload the Approved QR List first.") | |
| st.stop() | |
| approved_list = read_approved_list(approved_file) | |
| if not approved_list: | |
| st.warning("Approved list is empty or failed to parse. All decoded QR payloads will be treated as UNAPPROVED.") | |
| else: | |
| st.success(f"Loaded {len(approved_list)} approved entries.") | |
| img_paths = [] | |
| # Save multi-file uploads | |
| for f in frames or []: | |
| out = os.path.join(workdir, f.name) | |
| with open(out, "wb") as g: | |
| g.write(f.read()) | |
| img_paths.append(out) | |
| # Or unpack ZIP | |
| if frames_zip is not None: | |
| img_paths.extend(unpack_zip(frames_zip, workdir)) | |
| img_paths = sorted(set(img_paths)) | |
| if not img_paths: | |
| st.error("Please upload at least one frame image (or a ZIP).") | |
| st.stop() | |
| if run_phone_detection: | |
| load_yolo() # try to initialize early to show warnings | |
| rows = [] | |
| annotated_dir = os.path.join(workdir, "annotated") | |
| os.makedirs(annotated_dir, exist_ok=True) | |
| progress = st.progress(0.0) | |
| status = st.empty() | |
| for idx, path in enumerate(img_paths): | |
| status.text(f"Processing {os.path.basename(path)} ({idx+1}/{len(img_paths)})") | |
| pil = Image.open(path).convert("RGB") | |
| np_img = np.array(pil) | |
| qr_results = detect_qr_opencv(np_img) | |
| phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else [] | |
| flags = {} | |
| for i, qr in enumerate(qr_results): | |
| msgs = [] | |
| payload = qr.get("data", "") | |
| if not payload: | |
| msgs.append("UNDECODED_QR") | |
| elif not match_payload(payload, approved_list): | |
| msgs.append("UNAPPROVED_QR") | |
| # Check overlap with phones | |
| if phone_boxes: | |
| qb = qr["bbox"] | |
| for pb in phone_boxes: | |
| if iou(qb, pb) >= iou_threshold: | |
| msgs.append("ON_PHONE") | |
| break | |
| flags[i] = msgs | |
| # Append a row | |
| rows.append({ | |
| "frame": os.path.basename(path), | |
| "qr_index": i, | |
| "payload": payload, | |
| "approved_match": (payload and match_payload(payload, approved_list)), | |
| "on_phone": ("ON_PHONE" in msgs), | |
| "undecoded": ("UNDECODED_QR" in msgs), | |
| "anomalies": "|".join(msgs) if msgs else "", | |
| "qr_bbox": qr["bbox"], | |
| "phone_boxes": phone_boxes | |
| }) | |
| # If no QR detected, still log the frame | |
| if not qr_results: | |
| rows.append({ | |
| "frame": os.path.basename(path), | |
| "qr_index": -1, | |
| "payload": "", | |
| "approved_match": False, | |
| "on_phone": False, | |
| "undecoded": False, | |
| "anomalies": "NO_QR_FOUND", | |
| "qr_bbox": None, | |
| "phone_boxes": phone_boxes | |
| }) | |
| annotated = annotate_image(pil, qr_results, phone_boxes, flags) | |
| out_path = os.path.join(annotated_dir, os.path.basename(path)) | |
| annotated.save(out_path) | |
| progress.progress((idx+1)/len(img_paths)) | |
| status.text("Completed.") | |
| df = pd.DataFrame(rows) | |
| st.subheader("Results") | |
| st.dataframe(df, use_container_width=True) | |
| # Summary | |
| st.markdown("### Summary") | |
| total_frames = len(img_paths) | |
| total_qr = int((df["qr_index"] >= 0).sum()) | |
| unapproved = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum()) | |
| on_phone = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum()) | |
| undecoded = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum()) | |
| no_qr = int((df["anomalies"] == "NO_QR_FOUND").sum()) | |
| st.write({ | |
| "frames_processed": total_frames, | |
| "qr_detections": total_qr, | |
| "unapproved_qr": unapproved, | |
| "qr_on_phone": on_phone, | |
| "undecoded_qr": undecoded, | |
| "frames_with_no_qr": no_qr | |
| }) | |
| # Downloads: CSV + ZIP of annotated images | |
| csv_bytes = df.to_csv(index=False).encode("utf-8") | |
| st.download_button("⬇️ Download CSV Report", data=csv_bytes, file_name="qr_anomaly_report.csv", mime="text/csv") | |
| # Create ZIP | |
| mem = io.BytesIO() | |
| with zipfile.ZipFile(mem, mode="w", compression=zipfile.ZIP_DEFLATED) as z: | |
| for fname in sorted(os.listdir(annotated_dir)): | |
| z.write(os.path.join(annotated_dir, fname), arcname=fname) | |
| mem.seek(0) | |
| st.download_button("⬇️ Download Annotated Images (ZIP)", data=mem.getvalue(), file_name="annotated_frames.zip", mime="application/zip") | |
| else: | |
| st.info("Upload inputs on the left and click **Run Scan** to begin.") | |
| st.markdown(""" | |
| **Tips** | |
| - Your approved list can be **TXT** (one payload per line) or **CSV** (use a `payload` column or the first column). | |
| - For mobile QR misuse detection, keep **Detect phones (YOLO)** enabled. | |
| - Name frames with timestamps if you want to correlate events later. | |
| """) |