Spaces:
Sleeping
Sleeping
| import io | |
| import os | |
| import zipfile | |
| import tempfile | |
| import time | |
| from typing import List, Dict, Any | |
| import streamlit as st | |
| import numpy as np | |
| import pandas as pd | |
| from PIL import Image, ImageDraw, ImageFont | |
| import cv2 | |
| # Optional: YOLO for phone detection | |
| YOLO_MODEL = None | |
| # ROI Zones (x1, y1, x2, y2) | |
| ROI_ZONES = [(100, 100, 400, 400)] # example, configurable later | |
| ABSENCE_THRESHOLD_SEC = 15 | |
| ALERT_LOG_FILE = "alerts.json" | |
| # --- Alert persistence --- | |
| def load_alerts(): | |
| if os.path.exists(ALERT_LOG_FILE): | |
| return pd.read_json(ALERT_LOG_FILE).to_dict(orient="records") | |
| return [] | |
| def save_alert(alert: Dict[str, Any]): | |
| alerts = load_alerts() | |
| alerts.append(alert) | |
| pd.DataFrame(alerts).to_json(ALERT_LOG_FILE, orient="records", indent=2) | |
| def log_alert(message, frame_name): | |
| alert = {"time": time.ctime(), "alert": message, "frame": frame_name} | |
| save_alert(alert) | |
| # --- YOLO Loader --- | |
| def load_yolo(): | |
| global YOLO_MODEL | |
| if YOLO_MODEL is None: | |
| try: | |
| from ultralytics import YOLO | |
| YOLO_MODEL = YOLO('yolov8n.pt') | |
| except Exception as e: | |
| st.warning(f"YOLO model could not be loaded: {e}") | |
| YOLO_MODEL = None | |
| return YOLO_MODEL | |
| def iou(boxA, boxB) -> float: | |
| xA = max(boxA[0], boxB[0]) | |
| yA = max(boxA[1], boxB[1]) | |
| xB = min(boxA[2], boxB[2]) | |
| yB = min(boxA[3], boxB[3]) | |
| interW = max(0, xB - xA) | |
| interH = max(0, yB - yA) | |
| interArea = interW * interH | |
| areaA = max(0, boxA[2] - boxA[0]) * max(0, boxA[3] - boxA[1]) | |
| areaB = max(0, boxB[2] - boxB[0]) * max(0, boxB[3] - boxB[1]) | |
| denom = areaA + areaB - interArea + 1e-6 | |
| return interArea / denom | |
| # --- QR Detection --- | |
| def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]: | |
| det = cv2.QRCodeDetector() | |
| retval, data_list, points, _ = det.detectAndDecodeMulti(image_np) | |
| results = [] | |
| if points is None: | |
| data_single, points_single, _ = det.detectAndDecode(image_np) | |
| if points_single is not None and data_single: | |
| pts = np.array(points_single, dtype=np.float32).reshape(-1, 2) | |
| x1, y1 = np.min(pts[:,0]), np.min(pts[:,1]) | |
| x2, y2 = np.max(pts[:,0]), np.max(pts[:,1]) | |
| results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": data_single,"points": pts.tolist()}) | |
| return results | |
| decoded_list = data_list if isinstance(data_list, (list, tuple)) else [data_list] * len(points) | |
| for i, quad in enumerate(points): | |
| pts = np.array(quad, dtype=np.float32).reshape(-1,2) | |
| x1, y1 = np.min(pts[:,0]), np.min(pts[:,1]) | |
| x2, y2 = np.max(pts[:,0]), np.max(pts[:,1]) | |
| payload = decoded_list[i] if i < len(decoded_list) else "" | |
| results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": payload,"points": pts.tolist()}) | |
| return results | |
| # --- Phone Detection --- | |
| def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]: | |
| model = load_yolo() | |
| if model is None: | |
| return [] | |
| results = model.predict(source=image_np, conf=conf, verbose=False) | |
| bboxes = [] | |
| for r in results: | |
| for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()): | |
| if int(cls) == 67: # COCO: "cell phone" | |
| bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])]) | |
| return bboxes | |
| # --- Tampering detection --- | |
| def detect_tampering(image_np: np.ndarray, bbox: List[float]) -> bool: | |
| x1, y1, x2, y2 = map(int, bbox) | |
| roi = image_np[y1:y2, x1:x2] | |
| if roi.size == 0: | |
| return False | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 100, 200) | |
| edge_density = np.sum(edges > 0) / (roi.shape[0] * roi.shape[1]) | |
| return edge_density < 0.01 | |
| # --- Payload normalization & UPI parsing --- | |
| from urllib.parse import urlparse, parse_qs | |
| def normalize_payload(payload: str) -> str: | |
| if not payload: | |
| return "" | |
| p = payload.strip().lower() | |
| if p.startswith("upi://"): | |
| try: | |
| parsed = urlparse(p) | |
| qs = parse_qs(parsed.query) | |
| if "pa" in qs: | |
| return qs["pa"][0].strip().lower() | |
| except Exception: | |
| pass | |
| if "pa=" in p: | |
| try: | |
| part = p.split("pa=")[1].split("&")[0] | |
| return part.strip().lower() | |
| except Exception: | |
| pass | |
| return p | |
| def match_payload(payload: str, approved: List[str]) -> bool: | |
| if not payload: | |
| return False | |
| norm_payload = normalize_payload(payload) | |
| for a in approved: | |
| norm_a = normalize_payload(a) | |
| if norm_payload == norm_a: | |
| return True | |
| return False | |
| # --- ROI Helper --- | |
| def inside_roi(bbox, roi): | |
| x1, y1, x2, y2 = bbox | |
| rx1, ry1, rx2, ry2 = roi | |
| return (x1 >= rx1 and y1 >= ry1 and x2 <= rx2 and y2 <= ry2) | |
| # --- UI --- | |
| st.set_page_config(page_title="QR Code Anomaly Scanner", page_icon="π΅οΈ", layout="wide") | |
| st.title("π΅οΈ QR Code Anomaly Scanner with Extended Compliance") | |
| with st.sidebar: | |
| st.header("βοΈ Inputs") | |
| approved_file = st.file_uploader("π Approved QR List (CSV/TXT)", type=["csv","txt"]) | |
| frames = st.file_uploader("πΌοΈ Frames (images)", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True) | |
| run_phone_detection = st.checkbox("π± Detect phones (YOLO)", value=True) | |
| phone_conf = st.slider("π Phone detection confidence", 0.1, 0.8, 0.25, 0.05) | |
| iou_threshold = st.slider("π― QRβPhone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05) | |
| process_btn = st.button("π Run Scan", use_container_width=True) | |
| if process_btn: | |
| if not approved_file: | |
| st.error("Please upload the Approved QR List first.") | |
| st.stop() | |
| approved_list = pd.read_csv(approved_file).iloc[:,0].astype(str).tolist() if approved_file.name.endswith(".csv") else approved_file.read().decode().splitlines() | |
| st.success(f"β Loaded {len(approved_list)} approved entries.") | |
| rows = [] | |
| absence_counter = 0 | |
| for f in frames or []: | |
| pil = Image.open(f).convert("RGB") | |
| np_img = np.array(pil) | |
| qr_results = detect_qr_opencv(np_img) | |
| phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else [] | |
| flags = {} | |
| if len(qr_results) > 1: | |
| log_alert("Multiple QRs detected", f.name) | |
| if not qr_results: | |
| absence_counter += 1 | |
| if absence_counter * 1 > ABSENCE_THRESHOLD_SEC: # assuming 1s per frame approx | |
| log_alert("QR absent for threshold duration", f.name) | |
| else: | |
| absence_counter = 0 | |
| for i, qr in enumerate(qr_results): | |
| msgs = [] | |
| payload = qr.get("data", "") | |
| if not payload: | |
| msgs.append("UNDECODED_QR") | |
| elif not match_payload(payload, approved_list): | |
| msgs.append("UNAPPROVED_QR") | |
| if phone_boxes: | |
| qb = qr["bbox"] | |
| for pb in phone_boxes: | |
| if iou(qb, pb) >= iou_threshold: | |
| msgs.append("ON_PHONE") | |
| break | |
| if not any(inside_roi(qr["bbox"], roi) for roi in ROI_ZONES): | |
| msgs.append("OUTSIDE_ROI") | |
| if detect_tampering(np_img, qr["bbox"]): | |
| msgs.append("TAMPERING") | |
| if msgs: | |
| log_alert("|".join(msgs), f.name) | |
| flags[i] = msgs | |
| rows.append({"frame": f.name,"qr_index": i,"payload": payload,"approved_match": (payload and match_payload(payload, approved_list)),"anomalies": "|".join(msgs) if msgs else ""}) | |
| df = pd.DataFrame(rows) | |
| st.dataframe(df) | |
| # --- Dashboard Cards --- | |
| st.markdown("### π Compliance Dashboard") | |
| col1, col2, col3 = st.columns(3) | |
| col4, col5, col6 = st.columns(3) | |
| unapproved_count = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum()) | |
| on_phone_count = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum()) | |
| tampering_count = int((df["anomalies"].str.contains("TAMPERING", na=False)).sum()) | |
| roi_count = int((df["anomalies"].str.contains("OUTSIDE_ROI", na=False)).sum()) | |
| absence_count = int((df["anomalies"].str.contains("ABSENCE", na=False)).sum()) | |
| undecoded_count = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum()) | |
| col1.metric("β Unauthorized QRs", unapproved_count) | |
| col2.metric("π± On Phone", on_phone_count) | |
| col3.metric("β οΈ Tampered", tampering_count) | |
| col4.metric("π« Outside ROI", roi_count) | |
| col5.metric("β³ QR Missing", absence_count) | |
| col6.metric("π Undecoded", undecoded_count) | |
| # --- Alert History --- | |
| st.subheader("π Alert History") | |
| for a in load_alerts()[-10:][::-1]: | |
| st.write(f"{a['time']} [{a['frame']}] β {a['alert']}") | |