Spaces:
Sleeping
Sleeping
File size: 8,908 Bytes
06f54b5 9cfa247 06f54b5 a49b8cc 06f54b5 9cfa247 06f54b5 9cfa247 06f54b5 9b0126c 06f54b5 9a392e7 06f54b5 9cfa247 06f54b5 a49b8cc 9cfa247 06f54b5 9cfa247 06f54b5 9cfa247 06f54b5 9cfa247 06f54b5 9cfa247 06f54b5 9cfa247 06f54b5 9cfa247 77e861a 9cfa247 3e09b48 06f54b5 9cfa247 06f54b5 9cfa247 06f54b5 032e63d 9cfa247 a449a02 9cfa247 77e861a 9cfa247 a449a02 9a392e7 9cfa247 9a392e7 9cfa247 9a392e7 9cfa247 9a392e7 9cfa247 9a392e7 9cfa247 9a392e7 9cfa247 9a392e7 9cfa247 77e861a 9cfa247 77e861a 9cfa247 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
import io
import os
import zipfile
import tempfile
import time
from typing import List, Dict, Any
import streamlit as st
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw, ImageFont
import cv2
# Optional: YOLO for phone detection
YOLO_MODEL = None
# ROI Zones (x1, y1, x2, y2)
ROI_ZONES = [(100, 100, 400, 400)] # example, configurable later
ABSENCE_THRESHOLD_SEC = 15
ALERT_LOG_FILE = "alerts.json"
# --- Alert persistence ---
def load_alerts():
if os.path.exists(ALERT_LOG_FILE):
return pd.read_json(ALERT_LOG_FILE).to_dict(orient="records")
return []
def save_alert(alert: Dict[str, Any]):
alerts = load_alerts()
alerts.append(alert)
pd.DataFrame(alerts).to_json(ALERT_LOG_FILE, orient="records", indent=2)
def log_alert(message, frame_name):
alert = {"time": time.ctime(), "alert": message, "frame": frame_name}
save_alert(alert)
# --- YOLO Loader ---
def load_yolo():
global YOLO_MODEL
if YOLO_MODEL is None:
try:
from ultralytics import YOLO
YOLO_MODEL = YOLO('yolov8n.pt')
except Exception as e:
st.warning(f"YOLO model could not be loaded: {e}")
YOLO_MODEL = None
return YOLO_MODEL
def iou(boxA, boxB) -> float:
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
interW = max(0, xB - xA)
interH = max(0, yB - yA)
interArea = interW * interH
areaA = max(0, boxA[2] - boxA[0]) * max(0, boxA[3] - boxA[1])
areaB = max(0, boxB[2] - boxB[0]) * max(0, boxB[3] - boxB[1])
denom = areaA + areaB - interArea + 1e-6
return interArea / denom
# --- QR Detection ---
def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
det = cv2.QRCodeDetector()
retval, data_list, points, _ = det.detectAndDecodeMulti(image_np)
results = []
if points is None:
data_single, points_single, _ = det.detectAndDecode(image_np)
if points_single is not None and data_single:
pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": data_single,"points": pts.tolist()})
return results
decoded_list = data_list if isinstance(data_list, (list, tuple)) else [data_list] * len(points)
for i, quad in enumerate(points):
pts = np.array(quad, dtype=np.float32).reshape(-1,2)
x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
payload = decoded_list[i] if i < len(decoded_list) else ""
results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": payload,"points": pts.tolist()})
return results
# --- Phone Detection ---
def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
model = load_yolo()
if model is None:
return []
results = model.predict(source=image_np, conf=conf, verbose=False)
bboxes = []
for r in results:
for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
if int(cls) == 67: # COCO: "cell phone"
bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
return bboxes
# --- Tampering detection ---
def detect_tampering(image_np: np.ndarray, bbox: List[float]) -> bool:
x1, y1, x2, y2 = map(int, bbox)
roi = image_np[y1:y2, x1:x2]
if roi.size == 0:
return False
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200)
edge_density = np.sum(edges > 0) / (roi.shape[0] * roi.shape[1])
return edge_density < 0.01
# --- Payload normalization & UPI parsing ---
from urllib.parse import urlparse, parse_qs
def normalize_payload(payload: str) -> str:
if not payload:
return ""
p = payload.strip().lower()
if p.startswith("upi://"):
try:
parsed = urlparse(p)
qs = parse_qs(parsed.query)
if "pa" in qs:
return qs["pa"][0].strip().lower()
except Exception:
pass
if "pa=" in p:
try:
part = p.split("pa=")[1].split("&")[0]
return part.strip().lower()
except Exception:
pass
return p
def match_payload(payload: str, approved: List[str]) -> bool:
if not payload:
return False
norm_payload = normalize_payload(payload)
for a in approved:
norm_a = normalize_payload(a)
if norm_payload == norm_a:
return True
return False
# --- ROI Helper ---
def inside_roi(bbox, roi):
x1, y1, x2, y2 = bbox
rx1, ry1, rx2, ry2 = roi
return (x1 >= rx1 and y1 >= ry1 and x2 <= rx2 and y2 <= ry2)
# --- UI ---
st.set_page_config(page_title="QR Code Anomaly Scanner", page_icon="π΅οΈ", layout="wide")
st.title("π΅οΈ QR Code Anomaly Scanner with Extended Compliance")
with st.sidebar:
st.header("βοΈ Inputs")
approved_file = st.file_uploader("π Approved QR List (CSV/TXT)", type=["csv","txt"])
frames = st.file_uploader("πΌοΈ Frames (images)", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True)
run_phone_detection = st.checkbox("π± Detect phones (YOLO)", value=True)
phone_conf = st.slider("π Phone detection confidence", 0.1, 0.8, 0.25, 0.05)
iou_threshold = st.slider("π― QRβPhone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05)
process_btn = st.button("π Run Scan", use_container_width=True)
if process_btn:
if not approved_file:
st.error("Please upload the Approved QR List first.")
st.stop()
approved_list = pd.read_csv(approved_file).iloc[:,0].astype(str).tolist() if approved_file.name.endswith(".csv") else approved_file.read().decode().splitlines()
st.success(f"β
Loaded {len(approved_list)} approved entries.")
rows = []
absence_counter = 0
for f in frames or []:
pil = Image.open(f).convert("RGB")
np_img = np.array(pil)
qr_results = detect_qr_opencv(np_img)
phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
flags = {}
if len(qr_results) > 1:
log_alert("Multiple QRs detected", f.name)
if not qr_results:
absence_counter += 1
if absence_counter * 1 > ABSENCE_THRESHOLD_SEC: # assuming 1s per frame approx
log_alert("QR absent for threshold duration", f.name)
else:
absence_counter = 0
for i, qr in enumerate(qr_results):
msgs = []
payload = qr.get("data", "")
if not payload:
msgs.append("UNDECODED_QR")
elif not match_payload(payload, approved_list):
msgs.append("UNAPPROVED_QR")
if phone_boxes:
qb = qr["bbox"]
for pb in phone_boxes:
if iou(qb, pb) >= iou_threshold:
msgs.append("ON_PHONE")
break
if not any(inside_roi(qr["bbox"], roi) for roi in ROI_ZONES):
msgs.append("OUTSIDE_ROI")
if detect_tampering(np_img, qr["bbox"]):
msgs.append("TAMPERING")
if msgs:
log_alert("|".join(msgs), f.name)
flags[i] = msgs
rows.append({"frame": f.name,"qr_index": i,"payload": payload,"approved_match": (payload and match_payload(payload, approved_list)),"anomalies": "|".join(msgs) if msgs else ""})
df = pd.DataFrame(rows)
st.dataframe(df)
# --- Dashboard Cards ---
st.markdown("### π Compliance Dashboard")
col1, col2, col3 = st.columns(3)
col4, col5, col6 = st.columns(3)
unapproved_count = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
on_phone_count = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
tampering_count = int((df["anomalies"].str.contains("TAMPERING", na=False)).sum())
roi_count = int((df["anomalies"].str.contains("OUTSIDE_ROI", na=False)).sum())
absence_count = int((df["anomalies"].str.contains("ABSENCE", na=False)).sum())
undecoded_count = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
col1.metric("β Unauthorized QRs", unapproved_count)
col2.metric("π± On Phone", on_phone_count)
col3.metric("β οΈ Tampered", tampering_count)
col4.metric("π« Outside ROI", roi_count)
col5.metric("β³ QR Missing", absence_count)
col6.metric("π Undecoded", undecoded_count)
# --- Alert History ---
st.subheader("π Alert History")
for a in load_alerts()[-10:][::-1]:
st.write(f"{a['time']} [{a['frame']}] β {a['alert']}")
|