QRCode-Raja / app.py
SuriRaja's picture
Upload 2 files
a49b8cc verified
raw
history blame
13 kB
import io
import os
import zipfile
import tempfile
from typing import List, Tuple, Dict, Any
import streamlit as st
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw, ImageFont
import cv2
# Optional: YOLO for phone detection
# We load lazily on first use to keep startup fast.
YOLO_MODEL = None
def load_yolo():
global YOLO_MODEL
if YOLO_MODEL is None:
try:
from ultralytics import YOLO
# Use lightweight pretrained model; supports "cell phone" class via COCO.
YOLO_MODEL = YOLO('yolov8n.pt') # automatically downloads on first run
except Exception as e:
st.warning(f"YOLO model could not be loaded: {e}")
YOLO_MODEL = None
return YOLO_MODEL
def iou(boxA, boxB) -> float:
# boxes in [x1,y1,x2,y2]
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
interW = max(0, xB - xA)
interH = max(0, yB - yA)
interArea = interW * interH
areaA = max(0, boxA[2] - boxA[0]) * max(0, boxA[3] - boxA[1])
areaB = max(0, boxB[2] - boxB[0]) * max(0, boxB[3] - boxB[1])
denom = areaA + areaB - interArea + 1e-6
return interArea / denom
def detect_qr_opencv(image_np: np.ndarray) -> List[Dict[str, Any]]:
"""
Use OpenCV's QRCodeDetector to find and decode QR codes.
Returns list of dicts: {bbox: [x1,y1,x2,y2], data: str, points: np.ndarray}
"""
det = cv2.QRCodeDetector()
data, points, _ = det.detectAndDecodeMulti(image_np)
results = []
if points is None:
# Try single QR fallback
data_single, points_single, _ = det.detectAndDecode(image_np)
if points_single is not None and data_single:
pts = np.array(points_single, dtype=np.float32).reshape(-1, 2)
x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],
"data": data_single,
"points": pts.tolist()})
return results
# points shape: (N,4,2), data is list/tuple of strings (may be '' for undecodeable)
if isinstance(data, (list, tuple)):
decoded_list = data
else:
decoded_list = [data] * len(points)
for i, quad in enumerate(points):
pts = np.array(quad, dtype=np.float32).reshape(-1,2)
x1, y1 = np.min(pts[:,0]), np.min(pts[:,1])
x2, y2 = np.max(pts[:,0]), np.max(pts[:,1])
payload = decoded_list[i] if i < len(decoded_list) else ""
results.append({"bbox": [float(x1), float(y1), float(x2), float(y2)],
"data": payload,
"points": pts.tolist()})
return results
def detect_phones_yolo(image_np: np.ndarray, conf: float = 0.25) -> List[List[float]]:
"""
Detect cell phones with YOLO. Returns list of [x1,y1,x2,y2].
"""
model = load_yolo()
if model is None:
return []
# YOLO expects RGB image; ultralytics handles numpy arrays
results = model.predict(source=image_np, conf=conf, verbose=False)
bboxes = []
for r in results:
for box, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.cls.cpu().numpy()):
# COCO: class 67 is "cell phone"
if int(cls) == 67:
bboxes.append([float(box[0]), float(box[1]), float(box[2]), float(box[3])])
return bboxes
def annotate_image(pil_img: Image.Image, qr_boxes: List[Dict[str, Any]], phone_boxes: List[List[float]], flags: Dict[int, List[str]]) -> Image.Image:
img = pil_img.copy().convert("RGB")
draw = ImageDraw.Draw(img)
# Try to load a default font
try:
font = ImageFont.load_default()
except:
font = None
# Draw phone boxes
for pb in phone_boxes:
draw.rectangle(pb, outline=(255, 165, 0), width=3) # orange
draw.text((pb[0], pb[1]-12), "PHONE", fill=(255,165,0), font=font)
# Draw QR boxes
for i, qr in enumerate(qr_boxes):
color = (0,255,0) # green
if i in flags and any("UNAPPROVED" in f or "ON_PHONE" in f for f in flags[i]):
color = (255,0,0) # red for anomaly
draw.rectangle(qr["bbox"], outline=color, width=3)
label = "QR"
if qr.get("data"):
snippet = qr["data"][:32].replace("\n"," ")
label += f": {snippet}"
draw.text((qr["bbox"][0], qr["bbox"][1]-12), label, fill=color, font=font)
# Add flags text
for i, msgs in flags.items():
if not msgs:
continue
x1, y1, x2, y2 = qr_boxes[i]["bbox"]
y_text = y2 + 4
for msg in msgs:
draw.text((x1, y_text), f"[{msg}]", fill=(255,0,0), font=font)
y_text += 12
return img
def unpack_zip(uploaded_file, workdir):
zf = zipfile.ZipFile(uploaded_file)
out_paths = []
for name in zf.namelist():
if name.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".webp")):
p = os.path.join(workdir, os.path.basename(name))
with open(p, "wb") as f:
f.write(zf.read(name))
out_paths.append(p)
return out_paths
def read_approved_list(file) -> List[str]:
"""
Accepts CSV or TXT. One payload per line or in a 'payload' column.
Payloads can be full strings or partial substrings to match.
"""
name = file.name.lower()
try:
if name.endswith(".csv"):
df = pd.read_csv(file)
if "payload" in df.columns:
vals = df["payload"].dropna().astype(str).tolist()
else:
# take first column
vals = df.iloc[:,0].dropna().astype(str).tolist()
else:
# plain text
content = file.read().decode("utf-8", errors="ignore")
vals = [line.strip() for line in content.splitlines() if line.strip()]
# Normalize
return [v.strip() for v in vals if v.strip()]
except Exception as e:
st.error(f"Failed to parse approved list: {e}")
return []
def match_payload(payload: str, approved: List[str]) -> bool:
"""
Return True if payload matches an approved entry.
We allow substring match either way to account for embedded metadata/UTMs.
"""
if not payload:
return False
p = payload.strip()
for a in approved:
if a in p or p in a:
return True
return False
st.set_page_config(page_title="QR Code Anomaly Scanner", layout="wide")
st.title("🕵️ QR Code Anomaly Scanner (Retail Store 360° CCTV Frames)")
st.markdown("""
Upload a set of frame images (multiple files **or** a ZIP), plus the approved QR list (CSV/TXT).
The app will:
- Detect and decode QR codes in each frame.
- Detect **cell phones** via YOLO to infer if a QR is shown on a phone.
- Flag anomalies:
- **UNAPPROVED_QR**: decoded payload not in the approved list.
- **ON_PHONE**: QR bounding box overlaps a detected phone.
- **UNDECODED_QR**: QR detected but not decodable (could be suspicious/obstructed).
Download the annotated images and a consolidated CSV report at the end.
""")
with st.sidebar:
st.header("Inputs")
approved_file = st.file_uploader("Approved QR List (CSV/TXT)", type=["csv","txt"])
frames = st.file_uploader("Frames (images) — select multiple", type=["jpg","jpeg","png","bmp","webp"], accept_multiple_files=True)
frames_zip = st.file_uploader("Or upload a ZIP of frames", type=["zip"])
run_phone_detection = st.checkbox("Detect phones (YOLO)", value=True)
phone_conf = st.slider("Phone detection confidence", 0.1, 0.8, 0.25, 0.05)
iou_threshold = st.slider("QR–Phone overlap IoU threshold", 0.05, 0.8, 0.2, 0.05)
process_btn = st.button("Run Scan")
workdir = tempfile.mkdtemp()
if process_btn:
if not approved_file:
st.error("Please upload the Approved QR List first.")
st.stop()
approved_list = read_approved_list(approved_file)
if not approved_list:
st.warning("Approved list is empty or failed to parse. All decoded QR payloads will be treated as UNAPPROVED.")
else:
st.success(f"Loaded {len(approved_list)} approved entries.")
img_paths = []
# Save multi-file uploads
for f in frames or []:
out = os.path.join(workdir, f.name)
with open(out, "wb") as g:
g.write(f.read())
img_paths.append(out)
# Or unpack ZIP
if frames_zip is not None:
img_paths.extend(unpack_zip(frames_zip, workdir))
img_paths = sorted(set(img_paths))
if not img_paths:
st.error("Please upload at least one frame image (or a ZIP).")
st.stop()
if run_phone_detection:
load_yolo() # try to initialize early to show warnings
rows = []
annotated_dir = os.path.join(workdir, "annotated")
os.makedirs(annotated_dir, exist_ok=True)
progress = st.progress(0.0)
status = st.empty()
for idx, path in enumerate(img_paths):
status.text(f"Processing {os.path.basename(path)} ({idx+1}/{len(img_paths)})")
pil = Image.open(path).convert("RGB")
np_img = np.array(pil)
qr_results = detect_qr_opencv(np_img)
phone_boxes = detect_phones_yolo(np_img, conf=phone_conf) if run_phone_detection else []
flags = {}
for i, qr in enumerate(qr_results):
msgs = []
payload = qr.get("data", "")
if not payload:
msgs.append("UNDECODED_QR")
elif not match_payload(payload, approved_list):
msgs.append("UNAPPROVED_QR")
# Check overlap with phones
if phone_boxes:
qb = qr["bbox"]
for pb in phone_boxes:
if iou(qb, pb) >= iou_threshold:
msgs.append("ON_PHONE")
break
flags[i] = msgs
# Append a row
rows.append({
"frame": os.path.basename(path),
"qr_index": i,
"payload": payload,
"approved_match": (payload and match_payload(payload, approved_list)),
"on_phone": ("ON_PHONE" in msgs),
"undecoded": ("UNDECODED_QR" in msgs),
"anomalies": "|".join(msgs) if msgs else "",
"qr_bbox": qr["bbox"],
"phone_boxes": phone_boxes
})
# If no QR detected, still log the frame
if not qr_results:
rows.append({
"frame": os.path.basename(path),
"qr_index": -1,
"payload": "",
"approved_match": False,
"on_phone": False,
"undecoded": False,
"anomalies": "NO_QR_FOUND",
"qr_bbox": None,
"phone_boxes": phone_boxes
})
annotated = annotate_image(pil, qr_results, phone_boxes, flags)
out_path = os.path.join(annotated_dir, os.path.basename(path))
annotated.save(out_path)
progress.progress((idx+1)/len(img_paths))
status.text("Completed.")
df = pd.DataFrame(rows)
st.subheader("Results")
st.dataframe(df, use_container_width=True)
# Summary
st.markdown("### Summary")
total_frames = len(img_paths)
total_qr = int((df["qr_index"] >= 0).sum())
unapproved = int((df["anomalies"].str.contains("UNAPPROVED_QR", na=False)).sum())
on_phone = int((df["anomalies"].str.contains("ON_PHONE", na=False)).sum())
undecoded = int((df["anomalies"].str.contains("UNDECODED_QR", na=False)).sum())
no_qr = int((df["anomalies"] == "NO_QR_FOUND").sum())
st.write({
"frames_processed": total_frames,
"qr_detections": total_qr,
"unapproved_qr": unapproved,
"qr_on_phone": on_phone,
"undecoded_qr": undecoded,
"frames_with_no_qr": no_qr
})
# Downloads: CSV + ZIP of annotated images
csv_bytes = df.to_csv(index=False).encode("utf-8")
st.download_button("⬇️ Download CSV Report", data=csv_bytes, file_name="qr_anomaly_report.csv", mime="text/csv")
# Create ZIP
mem = io.BytesIO()
with zipfile.ZipFile(mem, mode="w", compression=zipfile.ZIP_DEFLATED) as z:
for fname in sorted(os.listdir(annotated_dir)):
z.write(os.path.join(annotated_dir, fname), arcname=fname)
mem.seek(0)
st.download_button("⬇️ Download Annotated Images (ZIP)", data=mem.getvalue(), file_name="annotated_frames.zip", mime="application/zip")
else:
st.info("Upload inputs on the left and click **Run Scan** to begin.")
st.markdown("""
**Tips**
- Your approved list can be **TXT** (one payload per line) or **CSV** (use a `payload` column or the first column).
- For mobile QR misuse detection, keep **Detect phones (YOLO)** enabled.
- Name frames with timestamps if you want to correlate events later.
""")