import io import os import zipfile import tempfile import time from typing import List, Dict, Any import streamlit as st import numpy as np import pandas as pd from PIL import Image, ImageDraw, ImageFont import cv2 # Optional: YOLO for phone detection YOLO_MODEL = None # ROI Zones (x1, y1, x2, y2) ROI_ZONES = [(100, 100, 400, 400)] # example, configurable later ABSENCE_THRESHOLD_SEC = 15 ALERT_LOG_FILE = "alerts.json" # --- Alert persistence --- def load_alerts(): if os.path.exists(ALERT_LOG_FILE): return pd.read_json(ALERT_LOG_FILE).to_dict(orient="records") return [] def save_alert(alert: Dict[str, Any]): alerts = load_alerts() alerts.append(alert) pd.DataFrame(alerts).to_json(ALERT_LOG_FILE, orient="records", indent=2) def log_alert(message, frame_name): alert = {"time": time.ctime(), "alert": message, "frame": frame_name} save_alert(alert) # --- YOLO Loader --- def load_yolo(): global YOLO_MODEL if YOLO_MODEL is None: try: from ultralytics import YOLO YOLO_MODEL = YOLO('yolov8n.pt') except Exception as e: st.warning(f"YOLO model could not be loaded: {e}") YOLO_MODEL = None return YOLO_MODEL def iou(boxA, boxB) -> float: xA = max(boxA[0], boxB[0]) yA = max(boxA[1], boxB[1]) xB = min(boxA[2], boxB[2]) yB = min(boxA[3], boxB[3]) interW = max(0, xB - xA) interH = max(0, yB - yA) interArea = interW * interH areaA = max(0, boxA[2] - boxA[0]) * max(0, boxA[3] - boxA[1]) areaB = max(0, boxB[2] - boxB[0]) * max(0, boxB[3] - boxB[1]) denom = areaA + areaB - interArea + 1e-6 return interArea / denom # --- QR Detection --- def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]: det = cv2.QRCodeDetector() retval, data_list, points, _ = det.detectAndDecodeMulti(image_np) results = [] if points is None: data_single, points_single, _ = det.detectAndDecode(image_np) if points_single is not None and data_single: pts = np.array(points_single, dtype=np.float32).reshape(-1, 2) x1, y1 = np.min(pts[:,0]), np.min(pts[:,1]) x2, y2 = np.max(pts[:,0]), np.max(pts[:,1]) results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": data_single,"points": pts.tolist()}) return results decoded_list = data_list if isinstance(data_list, (list, tuple)) else [data_list] * len(points) for i, quad in enumerate(points): pts = np.array(quad, dtype=np.float32).reshape(-1,2) x1, y1 = np.min(pts[:,0]), np.min(pts[:,1]) x2, y2 = np.max(pts[:,0]), np.max(pts[:,1]) payload = decoded_list[i] if i < len(decoded_list) else "" results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": payload,"points": pts.tolist()}) return results # --- Phone Detection --- def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]: model = load_yolo() if model is None: return [] results = model.predict(source=image_np, conf=conf, verbose=False) bboxes = [] for r in results: for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()): if int(cls) == 67: # COCO: "cell phone" bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])]) return bboxes # --- Tampering detection --- def detect_tampering(image_np: np.ndarray, bbox: List[float]) -> bool: x1, y1, x2, y2 = map(int, bbox) roi = image_np[y1:y2, x1:x2] if roi.size == 0: return False gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 100, 200) edge_density = np.sum(edges > 0) / (roi.shape[0] * roi.shape[1]) return edge_density < 0.01 # --- Payload normalization & UPI parsing --- from urllib.parse import urlparse, parse_qs def normalize_payload(payload: str) -> str: if not payload: return "" p = payload.strip().lower() if p.startswith("upi://"): try: parsed = urlparse(p) qs = parse_qs(parsed.query) if "pa" in qs: return qs["pa"][0].strip().lower() except Exception: pass if "pa=" in p: try: part = p.split("pa=")[1].split("&")[0] return part.strip().lower() except Exception: pass return p def match_payload(payload: str, approved: List[str]) -> bool: if not payload: return False norm_payload = normalize_payload(payload) for a in approved: norm_a = normalize_payload(a) if norm_payload == norm_a: return True return False # --- ROI Helper --- def inside_roi(bbox, roi): x1, y1, x2, y2 = bbox rx1, ry1, rx2, ry2 = roi return (x1 >= rx1 and y1 >= ry1 and x2 <= rx2 and y2 <= ry2) # --- UI --- st.set_page_config(page_title="QR Code Anomaly Scanner", page_icon="πŸ•΅οΈ", layout="wide") st.title("πŸ•΅οΈ QR Code Anomaly Scanner with Extended Compliance") with st.sidebar: st.header("βš™οΈ Inputs") approved_file = st.file_uploader("πŸ“‘ Approved QR List (CSV/TXT)", type=["csv","txt"]) frames = st.file_uploader("πŸ–ΌοΈ Frames (images)", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True) run_phone_detection = st.checkbox("πŸ“± Detect phones (YOLO)", value=True) phone_conf = st.slider("πŸ“ Phone detection confidence", 0.1, 0.8, 0.25, 0.05) iou_threshold = st.slider("🎯 QR–Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05) process_btn = st.button("πŸš€ Run Scan", use_container_width=True) if process_btn: if not approved_file: st.error("Please upload the Approved QR List first.") st.stop() approved_list = pd.read_csv(approved_file).iloc[:,0].astype(str).tolist() if approved_file.name.endswith(".csv") else approved_file.read().decode().splitlines() st.success(f"βœ… Loaded {len(approved_list)} approved entries.") rows = [] absence_counter = 0 for f in frames or []: pil = Image.open(f).convert("RGB") np_img = np.array(pil) qr_results = detect_qr_opencv(np_img) phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else [] flags = {} if len(qr_results) > 1: log_alert("Multiple QRs detected", f.name) if not qr_results: absence_counter += 1 if absence_counter * 1 > ABSENCE_THRESHOLD_SEC: # assuming 1s per frame approx log_alert("QR absent for threshold duration", f.name) else: absence_counter = 0 for i, qr in enumerate(qr_results): msgs = [] payload = qr.get("data", "") if not payload: msgs.append("UNDECODED_QR") elif not match_payload(payload, approved_list): msgs.append("UNAPPROVED_QR") if phone_boxes: qb = qr["bbox"] for pb in phone_boxes: if iou(qb, pb) >= iou_threshold: msgs.append("ON_PHONE") break if not any(inside_roi(qr["bbox"], roi) for roi in ROI_ZONES): msgs.append("OUTSIDE_ROI") if detect_tampering(np_img, qr["bbox"]): msgs.append("TAMPERING") if msgs: log_alert("|".join(msgs), f.name) flags[i] = msgs rows.append({"frame": f.name,"qr_index": i,"payload": payload,"approved_match": (payload and match_payload(payload, approved_list)),"anomalies": "|".join(msgs) if msgs else ""}) df = pd.DataFrame(rows) st.dataframe(df) # --- Dashboard Cards --- st.markdown("### πŸ“Š Compliance Dashboard") col1, col2, col3 = st.columns(3) col4, col5, col6 = st.columns(3) unapproved_count = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum()) on_phone_count = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum()) tampering_count = int((df["anomalies"].str.contains("TAMPERING", na=False)).sum()) roi_count = int((df["anomalies"].str.contains("OUTSIDE_ROI", na=False)).sum()) absence_count = int((df["anomalies"].str.contains("ABSENCE", na=False)).sum()) undecoded_count = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum()) col1.metric("❌ Unauthorized QRs", unapproved_count) col2.metric("πŸ“± On Phone", on_phone_count) col3.metric("⚠️ Tampered", tampering_count) col4.metric("🚫 Outside ROI", roi_count) col5.metric("⏳ QR Missing", absence_count) col6.metric("πŸ” Undecoded", undecoded_count) # --- Alert History --- st.subheader("πŸ“œ Alert History") for a in load_alerts()[-10:][::-1]: st.write(f"{a['time']} [{a['frame']}] β†’ {a['alert']}")