QRCode-Raja / app.py
SuriRaja's picture
Update app.py
9cfa247 verified
import io
import os
import zipfile
import tempfile
import time
from typing import List, Dict, Any
import streamlit as st
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw, ImageFont
import cv2
# Optional: YOLO for phone detection
YOLO_MODEL = None
# ROI Zones (x1, y1, x2, y2)
ROI_ZONES = [(100, 100, 400, 400)] # example, configurable later
ABSENCE_THRESHOLD_SEC = 15
ALERT_LOG_FILE = "alerts.json"
# --- Alert persistence ---
def load_alerts():
if os.path.exists(ALERT_LOG_FILE):
return pd.read_json(ALERT_LOG_FILE).to_dict(orient="records")
return []
def save_alert(alert: Dict[str, Any]):
alerts = load_alerts()
alerts.append(alert)
pd.DataFrame(alerts).to_json(ALERT_LOG_FILE, orient="records", indent=2)
def log_alert(message, frame_name):
alert = {"time": time.ctime(), "alert": message, "frame": frame_name}
save_alert(alert)
# --- YOLO Loader ---
def load_yolo():
global YOLO_MODEL
if YOLO_MODEL is None:
try:
from ultralytics import YOLO
YOLO_MODEL = YOLO('yolov8n.pt')
except Exception as e:
st.warning(f"YOLO model could not be loaded: {e}")
YOLO_MODEL = None
return YOLO_MODEL
def iou(boxA, boxB) -> float:
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
interW = max(0, xB - xA)
interH = max(0, yB - yA)
interArea = interW * interH
areaA = max(0, boxA[2] - boxA[0]) * max(0, boxA[3] - boxA[1])
areaB = max(0, boxB[2] - boxB[0]) * max(0, boxB[3] - boxB[1])
denom = areaA + areaB - interArea + 1e-6
return interArea / denom
# --- QR Detection ---
def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
det = cv2.QRCodeDetector()
retval, data_list, points, _ = det.detectAndDecodeMulti(image_np)
results = []
if points is None:
data_single, points_single, _ = det.detectAndDecode(image_np)
if points_single is not None and data_single:
pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": data_single,"points": pts.tolist()})
return results
decoded_list = data_list if isinstance(data_list, (list, tuple)) else [data_list] * len(points)
for i, quad in enumerate(points):
pts = np.array(quad, dtype=np.float32).reshape(-1,2)
x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
payload = decoded_list[i] if i < len(decoded_list) else ""
results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],"data": payload,"points": pts.tolist()})
return results
# --- Phone Detection ---
def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
model = load_yolo()
if model is None:
return []
results = model.predict(source=image_np, conf=conf, verbose=False)
bboxes = []
for r in results:
for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
if int(cls) == 67: # COCO: "cell phone"
bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
return bboxes
# --- Tampering detection ---
def detect_tampering(image_np: np.ndarray, bbox: List[float]) -> bool:
x1, y1, x2, y2 = map(int, bbox)
roi = image_np[y1:y2, x1:x2]
if roi.size == 0:
return False
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200)
edge_density = np.sum(edges > 0) / (roi.shape[0] * roi.shape[1])
return edge_density < 0.01
# --- Payload normalization & UPI parsing ---
from urllib.parse import urlparse, parse_qs
def normalize_payload(payload: str) -> str:
if not payload:
return ""
p = payload.strip().lower()
if p.startswith("upi://"):
try:
parsed = urlparse(p)
qs = parse_qs(parsed.query)
if "pa" in qs:
return qs["pa"][0].strip().lower()
except Exception:
pass
if "pa=" in p:
try:
part = p.split("pa=")[1].split("&")[0]
return part.strip().lower()
except Exception:
pass
return p
def match_payload(payload: str, approved: List[str]) -> bool:
if not payload:
return False
norm_payload = normalize_payload(payload)
for a in approved:
norm_a = normalize_payload(a)
if norm_payload == norm_a:
return True
return False
# --- ROI Helper ---
def inside_roi(bbox, roi):
x1, y1, x2, y2 = bbox
rx1, ry1, rx2, ry2 = roi
return (x1 >= rx1 and y1 >= ry1 and x2 <= rx2 and y2 <= ry2)
# --- UI ---
st.set_page_config(page_title="QR Code Anomaly Scanner", page_icon="πŸ•΅οΈ", layout="wide")
st.title("πŸ•΅οΈ QR Code Anomaly Scanner with Extended Compliance")
with st.sidebar:
st.header("βš™οΈ Inputs")
approved_file = st.file_uploader("πŸ“‘ Approved QR List (CSV/TXT)", type=["csv","txt"])
frames = st.file_uploader("πŸ–ΌοΈ Frames (images)", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True)
run_phone_detection = st.checkbox("πŸ“± Detect phones (YOLO)", value=True)
phone_conf = st.slider("πŸ“ Phone detection confidence", 0.1, 0.8, 0.25, 0.05)
iou_threshold = st.slider("🎯 QR–Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05)
process_btn = st.button("πŸš€ Run Scan", use_container_width=True)
if process_btn:
if not approved_file:
st.error("Please upload the Approved QR List first.")
st.stop()
approved_list = pd.read_csv(approved_file).iloc[:,0].astype(str).tolist() if approved_file.name.endswith(".csv") else approved_file.read().decode().splitlines()
st.success(f"βœ… Loaded {len(approved_list)} approved entries.")
rows = []
absence_counter = 0
for f in frames or []:
pil = Image.open(f).convert("RGB")
np_img = np.array(pil)
qr_results = detect_qr_opencv(np_img)
phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
flags = {}
if len(qr_results) > 1:
log_alert("Multiple QRs detected", f.name)
if not qr_results:
absence_counter += 1
if absence_counter * 1 > ABSENCE_THRESHOLD_SEC: # assuming 1s per frame approx
log_alert("QR absent for threshold duration", f.name)
else:
absence_counter = 0
for i, qr in enumerate(qr_results):
msgs = []
payload = qr.get("data", "")
if not payload:
msgs.append("UNDECODED_QR")
elif not match_payload(payload, approved_list):
msgs.append("UNAPPROVED_QR")
if phone_boxes:
qb = qr["bbox"]
for pb in phone_boxes:
if iou(qb, pb) >= iou_threshold:
msgs.append("ON_PHONE")
break
if not any(inside_roi(qr["bbox"], roi) for roi in ROI_ZONES):
msgs.append("OUTSIDE_ROI")
if detect_tampering(np_img, qr["bbox"]):
msgs.append("TAMPERING")
if msgs:
log_alert("|".join(msgs), f.name)
flags[i] = msgs
rows.append({"frame": f.name,"qr_index": i,"payload": payload,"approved_match": (payload and match_payload(payload, approved_list)),"anomalies": "|".join(msgs) if msgs else ""})
df = pd.DataFrame(rows)
st.dataframe(df)
# --- Dashboard Cards ---
st.markdown("### πŸ“Š Compliance Dashboard")
col1, col2, col3 = st.columns(3)
col4, col5, col6 = st.columns(3)
unapproved_count = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
on_phone_count = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
tampering_count = int((df["anomalies"].str.contains("TAMPERING", na=False)).sum())
roi_count = int((df["anomalies"].str.contains("OUTSIDE_ROI", na=False)).sum())
absence_count = int((df["anomalies"].str.contains("ABSENCE", na=False)).sum())
undecoded_count = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
col1.metric("❌ Unauthorized QRs", unapproved_count)
col2.metric("πŸ“± On Phone", on_phone_count)
col3.metric("⚠️ Tampered", tampering_count)
col4.metric("🚫 Outside ROI", roi_count)
col5.metric("⏳ QR Missing", absence_count)
col6.metric("πŸ” Undecoded", undecoded_count)
# --- Alert History ---
st.subheader("πŸ“œ Alert History")
for a in load_alerts()[-10:][::-1]:
st.write(f"{a['time']} [{a['frame']}] β†’ {a['alert']}")