# Smart Office Attendance β YOLO (Real ML) + Seeded Inputs (no DB) + InsightFace (CPU) + Daily CSV
# Flat hierarchy: app.py + requirements.txt + seed.json
# Views: Live / Employees / Reports
# Patch: sync detections to Employees & Reports, map first 3 faces to seed employees, write summary_YYYYMMDD.csv
import os, json, math, tempfile, time
from datetime import datetime, date
from pathlib import Path
from dataclasses import dataclass
import cv2
import numpy as np
import pandas as pd
import plotly.express as px
from PIL import Image
import streamlit as st
from ultralytics import YOLO
from streamlit_modal import Modal
# ---------- InsightFace (CPU) ----------
from insightface.app import FaceAnalysis
from sklearn.metrics.pairwise import cosine_similarity
# -------------------- Page / Theme --------------------
st.set_page_config(page_title="Smart Office Attendance β YOLO + FaceID (CPU) + CSV", page_icon="π‘", layout="wide")
CSS = """
.block-container {padding-top: 1rem; padding-bottom: 3rem; max-width: 1200px;}
.navbar {position: sticky; top: 0; z-index: 999; backdrop-filter: blur(6px);
background: rgba(16,16,20,0.7); border-bottom: 1px solid #262626;}
.navbar-inner {display:flex; align-items:center; justify-content:space-between; padding: 10px 0;}
.navbrand {display:flex; gap:.6rem; align-items:center}
.brandbox {width:32px; height:32px; border-radius:16px; background:#0a0a0a; color:#fff; display:grid; place-items:center; font-weight:700}
.card {border:1px solid #262626; border-radius:16px; padding:14px; background: #0f1014;}
.badge {display:inline-flex; align-items:center; gap:.4rem; border:1px solid #3a3a3a; border-radius:10px; padding:2px 8px; font-size:.75rem;}
.pill {display:inline-flex; align-items:center; gap:.35rem; border-radius:999px; padding:4px 10px; font-size:.75rem;}
.pill-success {background:#065f46; color:#e6fff6}
.pill-warn {background:#92400e; color:#fff7ed}
.pill-info {background:#1e3a8a; color:#dbeafe}
.pill-danger {background:#7f1d1d; color:#fee2e2}
.grid {display:grid; gap:14px}
@media(min-width: 768px){ .grid.cols-2 {grid-template-columns: repeat(2, 1fr);} }
@media(min-width: 1200px){ .grid.cols-3 {grid-template-columns: repeat(3, 1fr);} }
.video {height: 220px; border-radius:14px; border:1px dashed #303030; display:grid; place-items:center; color:#9ca3af}
.state {text-align:center; color:#9ca3af; padding:20px}
.footerpad {height: 40px}
"""
st.markdown(f"", unsafe_allow_html=True)
# -------------------- Seed loading (inputs only) --------------------
def load_seed():
p = Path("seed.json")
if p.exists():
with open(p, "r", encoding="utf-8") as f:
return json.load(f)
return {
"cameras": {
"cam-1":{"name":"Lobby Cam","zone":"Lobby","online":True,"latencyMs":120,"fps":15},
"cam-2":{"name":"Corridor Cam","zone":"Corridor","online":True,"latencyMs":160,"fps":12},
"cam-3":{"name":"Open Office","zone":"Desk","online":True,"latencyMs":90,"fps":18},
},
"employees":{
"e1":{"name":"Aparna Rao","title":"PM","deskId":"D-14","avatarUrl":"https://placehold.co/64","status":"At Desk","team":"Ops"},
"e2":{"name":"Karthik S","title":"Eng","deskId":"D-22","avatarUrl":"https://placehold.co/64","status":"At Desk","team":"Eng"},
"e3":{"name":"Nisha M","title":"Ops","deskId":"D-07","avatarUrl":"https://placehold.co/64","status":"Away","team":"Ops"}
},
"detections": {},
"sessions": {},
"alerts": {},
"appSettings": {"default":{"onPhoneSec":60,"idleMin":10,"blurFaces":False,"retentionDays":30}},
"trainingImages": {}
}
SEED = load_seed()
EMP = SEED.get("employees", {})
CAM = SEED.get("cameras", {})
SESS = SEED.get("sessions", {})
ALR = SEED.get("alerts", {})
APPS = SEED.get("appSettings", {}).get("default", {"onPhoneSec":60,"idleMin":10,"blurFaces":False,"retentionDays":30})
# -------------------- Session State --------------------
if "events" not in st.session_state:
st.session_state.events = pd.DataFrame(columns=["id","ts","camera","employee","activity","zone","confidence","run_id"])
if "current_run_id" not in st.session_state:
st.session_state.current_run_id = None
st.session_state.setdefault("selected_site_floor", None)
st.session_state.setdefault("selected_camera", None)
st.session_state.setdefault("privacy_blur", bool(APPS.get("blurFaces", False)))
st.session_state.setdefault("show_settings", False)
for _i in range(3):
st.session_state.setdefault(f"last_video_bytes_{_i}", None)
st.session_state.setdefault(f"last_video_suffix_{_i}", ".mp4")
st.session_state.setdefault("run_meta", {})
st.session_state.setdefault("frame_cache", {})
st.session_state.setdefault("frame_paths", {})
st.session_state.setdefault("last_uploaded_names", [])
# NEW: live per-employee summary store
st.session_state.setdefault("emp_summary", {}) # name -> counts dict
# NEW: mapping first three detected faces (or Unknowns) to seed employees (demo)
st.session_state.setdefault("face_demo_map", {}) # raw_label -> seed_name
st.session_state.setdefault("face_demo_used", set())
# Face DB
st.session_state.setdefault("face_db", {})
st.session_state.setdefault("face_ready", False)
st.session_state.setdefault("modal_open", False)
st.session_state.setdefault("modal_event_id", None)
# CSV persistence
st.session_state.setdefault("persist_count", 0)
st.session_state.setdefault("persisted_csv", "")
# NEW: daily summary path for Reports
st.session_state.setdefault("daily_summary_path", "")
# -------------------- ML (YOLO + face utils) --------------------
@dataclass
class DetBox:
cls: str
conf: float
box: tuple
@st.cache_resource(show_spinner=False)
def load_model():
return YOLO("yolov8n.pt")
@st.cache_resource(show_spinner=False)
def load_face_cascade():
return cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")
@st.cache_resource(show_spinner=False)
def load_face_analysis():
app = FaceAnalysis(name="buffalo_l")
app.prepare(ctx_id=-1, det_size=(640, 640))
return app
def run_yolo_on_frame(model: YOLO, frame_bgr, conf_thres=0.25):
res = model.predict(frame_bgr, verbose=False, conf=conf_thres)[0]
names = res.names
out = []
if res.boxes is None: return out
for b in res.boxes:
c = int(b.cls.item())
conf = float(b.conf.item()) if b.conf is not None else 0.0
xyxy = tuple(map(int, b.xyxy[0].tolist()))
out.append(DetBox(names[c], conf, xyxy))
return out
def iou(a, b):
xA, yA = max(a[0], b[0]), max(a[1], b[1])
xB, yB = min(a[2], b[2])
inter = max(0, xB-xA) * max(0, yB-yA)
if inter == 0: return 0.0
areaA = (a[2]-a[0])*(a[3]-a[1]); areaB = (b[2]-b[0])*(b[3]-b[1])
return inter / float(areaA + areaB - inter + 1e-6)
# ---------- defensive phone/face check ----------
def phone_near_head(person_box, phone_boxes, face_boxes):
(px1, py1, px2, py2) = person_box
head_h = int(py1 + 0.4 * (py2 - py1))
head_box = (px1, py1, px2, head_h)
exp_faces = []
for f in face_boxes:
if not isinstance(f, (list, tuple)) or len(f) != 4:
continue
try:
fx1, fy1, fx2, fy2 = map(int, f)
except Exception:
continue
w = fx2 - fx1
h = fy2 - fy1
ex = (max(px1, fx1 - w//6), max(py1, fy1 - h//6),
min(px2, fx2 + w//6), min(py2, fy2 + h//6))
exp_faces.append(ex)
for ph in phone_boxes:
if not hasattr(ph, "box") or not isinstance(ph.box, (list, tuple)) or len(ph.box) != 4:
continue
try:
if any(isinstance(f, (list, tuple)) and len(f) == 4 and iou(ph.box, f) > 0.02 for f in exp_faces):
return True
if iou(ph.box, head_box) > 0.05:
return True
except Exception:
continue
return False
def blur_faces_if_needed(frame_bgr, face_cascade, enable):
if not enable: return frame_bgr
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.2, minNeighbors=5, minSize=(32,32))
out = frame_bgr.copy()
for (x, y, w, h) in faces:
roi = out[y:y+h, x:x+w]
roi = cv2.GaussianBlur(roi, (31,31), 15)
out[y:y+h, x:x+w] = roi
return out
def phone_like_rect_near_face(frame_bgr, gray, person_box, face_boxes):
(px1, py1, px2, py2) = person_box
head_h = int(py1 + 0.45 * (py2 - py1))
head_box = (max(0,px1), max(0,py1), max(0,min(px2, frame_bgr.shape[1]-1)), max(0,min(head_h, frame_bgr.shape[0]-1)))
hx1, hy1, hx2, hy2 = head_box
if hx2 <= hx1 or hy2 <= hy1: return False
roi_gray = gray[hy1:hy2, hx1:hx2]
if roi_gray.size == 0: return False
edges = cv2.Canny(roi_gray, 80, 160)
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
mean_int = float(np.mean(roi_gray)) if roi_gray.size else 255
for cnt in contours:
x,y,w,h = cv2.boundingRect(cnt)
area = w*h
if area < 120 or area > (roi_gray.shape[0]*roi_gray.shape[1]*0.25):
continue
ar = w / float(h+1e-6)
if 0.4 < ar < 3.5:
patch = roi_gray[y:y+h, x:x+w]
if patch.size == 0: continue
patch_mean = float(np.mean(patch))
if patch_mean + 15 < mean_int:
return True
return False
def estimate_activity(person_box, phones, prev_box=None, motion_thresh=8.0):
if prev_box is not None:
cx0 = (prev_box[0]+prev_box[2])/2; cy0 = (prev_box[1]+prev_box[3])/2
cx1 = (person_box[0]+person_box[2])/2; cy1 = (person_box[1]+person_box[3])/2
if math.hypot(cx1-cx0) < motion_thresh: return "Idle"
else: return "Working"
return "Working"
# -------------------- InsightFace helpers --------------------
def get_face_pairs_insight(rgb_frame):
app = load_face_analysis()
bgr = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)
faces = app.get(bgr)
pairs = []
for f in faces:
x1, y1, x2, y2 = map(int, f.bbox)
emb = f.embedding
if emb is not None and emb.size > 0:
pairs.append(((x1, y1, x2, y2), emb.astype(np.float32)))
return pairs
def build_face_db_from_videos_insight(videos, max_people=6, frames_to_scan=200, every_n=5):
names = [v.get("name","Unknown") for v in EMP.values()] or [f"Emp{i+1}" for i in range(max_people)]
db = {n: [] for n in names}
collected = []
for fv in videos:
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(fv.name)[1]) as t:
data = fv.read()
t.write(data); path = t.name
cap = cv2.VideoCapture(path)
count = 0
while cap.isOpened() and count < frames_to_scan:
ret, frame = cap.read()
if not ret: break
frame_no = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
if frame_no % every_n != 0:
continue
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pairs = get_face_pairs_insight(rgb)
for (_, emb) in pairs:
collected.append(emb)
if len(collected) >= max_people:
break
if len(collected) >= max_people:
break
count += 1
cap.release()
os.remove(path)
if len(collected) >= max_people:
break
i = 0
for emb in collected:
db[names[i % len(names)]].append(emb)
i += 1
db = {k:v for k,v in db.items() if v}
return db
def recognize_name_from_embedding(emb, db, cosine_thresh=0.55, l2_tolerance=1.2):
if not db: return None
best_name, best_cos = None, -1.0
for name, enc_list in db.items():
arr = np.stack(enc_list, axis=0)
sim = float(np.mean(cosine_similarity([emb], arr)))
if sim > best_cos:
best_cos, best_name = sim, name
if best_name is not None and best_cos >= cosine_thresh:
all_encs = np.stack(db[best_name], axis=0)
dists = np.linalg.norm(all_encs - emb, axis=1)
if float(np.mean(dists)) <= l2_tolerance:
return best_name
return None
# -------- Demo mapping: first three faces/unknowns -> first three seed employees --------
def seed_name_order():
return [v.get("name","Unknown") for v in EMP.values()][:3]
def demo_map_name(raw_label: str) -> str:
"""If label starts with Unknown or not in seed, map first 3 unique raw labels to seed order."""
if not raw_label or (not raw_label.startswith("Unknown") and raw_label in seed_name_order()):
return raw_label
if raw_label in st.session_state.face_demo_map:
return st.session_state.face_demo_map[raw_label]
for seed_nm in seed_name_order():
if seed_nm not in st.session_state.face_demo_map.values():
st.session_state.face_demo_map[raw_label] = seed_nm
st.session_state.face_demo_used.add(seed_nm)
return seed_nm
return raw_label
# -------------------- Helpers --------------------
def latency_badge(latency_ms: int, fps: int) -> str:
tone = "#10b981" if latency_ms < 120 else ("#f59e0b" if latency_ms < 200 else "#ef4444")
return f"{latency_ms}ms | {fps} FPS"
def kpis_from_events(df: pd.DataFrame):
if df is None or df.empty:
return [("Present %","β"),("Avg Working hrs","β"),("Avg Idle","β"),("Corridor time","β"),("Alerts", str(len(ALR)))]
working_min = int((df["activity"]=="Working").sum() * 0.5)
idle_min = int((df["activity"]=="Idle").sum() * 0.5)
corr_min = int((df["zone"]=="Corridor").sum() * 0.5)
present_pct = f"{min(100, 40 + len(df)//2)}%"
avg_work_hrs = round(6.5 + (working_min/120), 1)
return [
("Present %", present_pct),
("Avg Working hrs", str(avg_work_hrs)),
("Avg Idle", f"{idle_min}m"),
("Corridor time", f"{corr_min}m"),
("Alerts", str(len(ALR))),
]
def save_run_summary_csv(run_id: str, site_floor: str, camera_name: str) -> str:
df = st.session_state.events.copy()
if df.empty: return ""
df = df[df["run_id"] == run_id].copy()
if df.empty: return ""
df["ts"] = pd.to_datetime(df["ts"], errors="coerce", utc=True)
counts = df.groupby(["employee","activity"]).size().unstack(fill_value=0)
for col in ["Working","On Phone","Idle","Away"]:
if col not in counts.columns: counts[col] = 0
counts["total"] = counts["Working"] + counts["On Phone"] + counts["Idle"] + counts["Away"]
pct = counts[["Working","On Phone","Idle","Away"]].div(counts["total"].replace(0,1), axis=0) * 100.0
pct = pct.round(2)
out = counts.join(pct.add_suffix(" %")).reset_index()
out.insert(0, "camera", camera_name)
out.insert(0, "site_floor", site_floor)
out.insert(0, "run_id", run_id)
out.insert(0, "date", date.today().isoformat())
out_path = f"/tmp/run_summary_{run_id}.csv"
out.to_csv(out_path, index=False)
return out_path
def cache_event_frame(event_id: str, vis_bgr: np.ndarray):
try:
img = Image.fromarray(cv2.cvtColor(vis_bgr, cv2.COLOR_BGR2RGB))
st.session_state.frame_cache[event_id] = img
thumb = img.copy(); thumb.thumbnail((96, 96))
thumb_path = f"/tmp/frame_{event_id}.png"
thumb.save(thumb_path, format="PNG")
st.session_state.frame_paths[event_id] = thumb_path
if len(st.session_state.frame_cache) > 500:
drop = list(st.session_state.frame_cache.keys())[:-400]
for k in drop:
st.session_state.frame_cache.pop(k, None)
st.session_state.frame_paths.pop(k, None)
except Exception:
pass
# NEW: write per-day employee summary CSV from live st.session_state.emp_summary
def write_daily_emp_summary_csv():
if not st.session_state.emp_summary:
return ""
today = date.today().strftime("%Y%m%d")
path = f"/tmp/summary_{today}.csv"
rows = []
for emp, c in st.session_state.emp_summary.items():
w = int(c.get("Working", 0)); ph = int(c.get("On Phone", 0)); idle = int(c.get("Idle", 0)); away = int(c.get("Away", 0))
total = max(1, w + ph + idle + away)
rows.append({
"Employee": emp,
"Working": w, "On Phone": ph, "Idle": idle, "Away": away,
"Working %": round(w/total*100, 2), "On Phone %": round(ph/total*100, 2),
"Idle %": round(idle/total*100, 2), "Away %": round(away/total*100, 2)
})
df = pd.DataFrame(rows)
df.to_csv(path, index=False)
st.session_state.daily_summary_path = path
return path
# >>> NEW HELPER: latest 3 event frames for a given employee <<<
def recent_frames_for_employee(emp_name: str, n: int = 3):
if st.session_state.events is None or st.session_state.events.empty:
return []
df = st.session_state.events.copy()
try:
df["ts"] = pd.to_datetime(df["ts"], errors="coerce", utc=True)
except Exception:
pass
df = df[df["employee"] == emp_name].sort_values("ts", ascending=False)
frames = []
for _, r in df.iterrows():
eid = r["id"]
img = st.session_state.frame_cache.get(eid)
if img is not None:
frames.append((eid, img))
if len(frames) >= n:
break
return frames
# -------------------- Navbar --------------------
st.markdown("""
SO
Smart Office Attendance
Live β’ Employees β’ Reports
Sync: Superbse (mock)
""", unsafe_allow_html=True)
# -------------------- Global Toggles --------------------
top = st.columns([1,1,1,1,1,1])
with top[-3]:
if st.button("Settings"):
st.session_state.show_settings = True
with top[-2]:
st.session_state.privacy_blur = st.toggle(
"Privacy: blur faces",
value=st.session_state.get("privacy_blur", bool(APPS.get("blurFaces", False))),
help="Reason: Compliance / PII policy"
)
with top[-1]:
st.write(":alarm_clock:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# -------------------- Tabs --------------------
tab_live, tab_emp, tab_reports = st.tabs(["π’ Live", "π₯ Employees", "π Reports"])
# ======================================================
# LIVE
# ======================================================
with tab_live:
h1, h2, h3 = st.columns([1.2, 1.0, 1.0])
with h1:
st.session_state.selected_site_floor = st.selectbox(
"Site / Floor (required before upload)",
["HQ β’ L2","HQ β’ L3","Annex β’ L1"], index=0, key="site_selector"
)
with h2:
cam_names = [v["name"] for v in CAM.values()] or ["Lobby Cam"]
st.session_state.selected_camera = st.selectbox(
"Camera (required before upload)", cam_names, index=0, key="cam_selector"
)
with h3:
st.toggle("Maximize stream", value=st.session_state.get("stream_full", False),
key="stream_full", help="Show one live stream full-width")
online = [c for c in CAM.values() if c.get("online")]
cols = st.columns(3)
stream_canvases = [None, None, None]
replay_btns = [False, False, False]
if online and not st.session_state.get("stream_full", False):
for i in range(3):
cam = online[i] if i < len(online) else {"name": f"Camera {i+1}", "zone": "β", "latencyMs": 0, "fps": 0}
with cols[i]:
st.markdown("", unsafe_allow_html=True)
st.markdown(f"**{cam['name']}** β {cam.get('zone','β')}")
st.markdown(latency_badge(cam.get("latencyMs",0), cam.get("fps",0)), unsafe_allow_html=True)
stream_canvases[i] = st.empty()
replay_btns[i] = st.button("Replay", key=f"replay_tile_{i}")
st.markdown("
", unsafe_allow_html=True)
else:
st.markdown("", unsafe_allow_html=True)
st.caption("Live stream (maximized)")
stream_canvases[0] = st.empty()
replay_btns[0] = st.button("Replay", key="replay_tile_0_max")
st.markdown("
", unsafe_allow_html=True)
for i in range(3):
if stream_canvases[i] is None:
stream_canvases[i] = st.empty()
c1, c2, c3, c4 = st.columns(4)
with c1: conf_thres = st.slider("Confidence", 0.1, 0.6, 0.25, 0.05)
with c2: sample_fps = st.slider("Sample FPS", 1, 12, 6)
with c3: max_secs = st.slider("Max seconds", 5, 120, 30)
with c4: motion_px = st.slider("Idle motion px", 2, 20, 8)
dd1, dd2, dd3 = st.columns(3)
with dd1:
stream_live = st.checkbox("Live preview (stream)", value=True)
with dd2:
skip_near_identical = st.checkbox("Skip near-identical frames", value=True)
with dd3:
diff_thresh = st.slider("Frame diff threshold", 1.0, 20.0, 6.0, 0.5)
cd1, cd2 = st.columns(2)
with cd1:
event_cooldown_sec = st.slider("Event cooldown (sec)", 0.0, 5.0, 1.5, 0.5)
with cd2:
st.caption("Replay per placeholder (buttons above)")
preview_imgs, preview_captions = [], []
def ensure_face_db(videos):
if st.session_state.face_ready:
return
if not videos:
return
st.info("Building face database from uploaded video⦠(CPU)")
mem_files = []
for fv in videos:
data = fv.getvalue() if hasattr(fv, "getvalue") else fv.read()
mem = tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(fv.name)[1])
mem.write(data); mem.flush(); mem.close()
class _MemWrap:
def __init__(self, p, n): self._p=p; self.name=n
def read(self):
with open(self._p, "rb") as f: return f.read()
mem_files.append(_MemWrap(mem.name, fv.name))
db = build_face_db_from_videos_insight(mem_files, max_people=max(3, len(EMP)))
for mf in mem_files:
try: os.remove(mf._p)
except: pass
st.session_state.face_db = db
st.session_state.face_ready = True
if db:
st.success(f"Face DB ready for {len(db)} person(s): {', '.join(list(db.keys())[:6])}")
else:
st.warning("No faces found to build DB. Events will be tagged as Unknown.")
def run_in_tile(tile_index: int, payload_bytes: bytes, suffix: str, tag_site_floor: str, tag_camera: str):
st.session_state.current_run_id = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
run_id = f"{st.session_state.current_run_id}-T{tile_index+1}"
st.session_state.run_meta[run_id] = {"site_floor": tag_site_floor, "camera": tag_camera}
model = load_model()
face_cascade = load_face_cascade()
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as t:
t.write(payload_bytes)
tmp_path = t.name
cap = cv2.VideoCapture(tmp_path)
src_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
step = int(max(1, round(src_fps / sample_fps)))
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
max_frames = int(min(total, src_fps * max_secs))
prev_person_boxes, drawn, frame_no = {}, 0, 0
prev_gray = None
last_tag_time = {}
phone_streak = {}
prog = st.progress(0.0)
while cap.isOpened() and frame_no < max_frames:
ret, frame = cap.read()
if not ret: break
pos = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
if pos % step != 0:
frame_no += 1
continue
do_process = True
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if skip_near_identical:
if prev_gray is not None:
mad = float(np.mean(cv2.absdiff(gray, prev_gray)))
if mad < diff_thresh:
do_process = False
prev_gray = gray
if not do_process:
if stream_live: time.sleep(0.001)
frame_no += 1
continue
dets = run_yolo_on_frame(model, frame, conf_thres)
persons = [d for d in dets if d.cls == "person"]
phones = [d for d in dets if d.cls in ("cell phone","mobile phone","phone")]
vis = blur_faces_if_needed(frame, face_cascade, st.session_state.privacy_blur)
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
face_pairs = get_face_pairs_insight(rgb)
face_boxes_only = [b for (b,_) in face_pairs]
frame_anomaly = False
frame_tags = set()
for idx, p in enumerate(persons):
act = estimate_activity(p.box, phones, prev_person_boxes.get(idx), motion_px)
on_phone_yolo = phone_near_head(p.box, phones, face_boxes_only)
on_phone_heur = phone_like_rect_near_face(frame, gray, p.box, face_boxes_only)
on_phone_now = on_phone_yolo or on_phone_heur
if on_phone_now:
act = "On Phone"
# identify name
name = None
for (fbox, emb) in face_pairs:
fx1, fy1, fx2, fy2 = fbox
cx = (fx1+fx2)/2; cy = (fy1+fy2)/2
if (p.box[0] <= cx <= p.box[2]) and (p.box[1] <= cy <= p.box[3]):
name = recognize_name_from_embedding(emb, st.session_state.face_db)
if name: break
if not name:
name = f"Unknown #{idx+1}"
name = demo_map_name(name)
conf = int(p.conf * 100)
(x1,y1,x2,y2) = p.box
color = (16,185,129) if act=="Working" else (245,158,11) if act=="On Phone" \
else (113,113,122) if act=="Idle" else (239,68,68)
cv2.rectangle(vis,(x1,y1),(x2,y2),color,2)
label = f"{name} ({act} {conf}%)"
cv2.rectangle(vis,(x1,y1-20),(x1+min(280, x2-x1), y1), color, -1)
cv2.putText(vis,label,(x1+5,y1-6),cv2.FONT_HERSHEY_SIMPLEX,0.45,(0,0,0),1,cv2.LINE_AA)
cam_name = tag_camera or f"Tile {tile_index+1}"
zone = "Desk"
if CAM:
seed_list = list(CAM.values())
zone = seed_list[min(tile_index, len(seed_list)-1)].get("zone","Desk")
if act in ("On Phone", "Idle") or zone in ("Corridor","Lobby","Common"):
frame_anomaly = True
if act == "On Phone": frame_tags.add("On Phone")
elif act == "Idle": frame_tags.add("Not Working")
if zone in ("Corridor","Lobby","Common"): frame_tags.add("Away")
now_ts = datetime.utcnow()
event_tag = f"{cam_name}|{act}|{zone}|{name}|T{tile_index+1}"
can_emit = True
if event_cooldown_sec > 0:
last_t = last_tag_time.get(event_tag)
if last_t is not None and (now_ts - last_t).total_seconds() < event_cooldown_sec:
can_emit = False
if can_emit:
evt_id = f"evt-{len(st.session_state.events)+1}"
row = {
"id": evt_id,
"ts": now_ts,
"camera": cam_name,
"employee": name,
"activity": act,
"zone": zone,
"confidence": conf,
"run_id": run_id
}
st.session_state.events = pd.concat(
[st.session_state.events, pd.DataFrame([row])],
ignore_index=True
)
last_tag_time[event_tag] = now_ts
cache_event_frame(evt_id, vis)
# ---- live per-employee aggregation
if name not in st.session_state.emp_summary:
st.session_state.emp_summary[name] = {"Working":0,"On Phone":0,"Idle":0,"Away":0,"Corridor":0}
st.session_state.emp_summary[name][act] = st.session_state.emp_summary[name].get(act,0) + 1
if zone in ("Corridor","Lobby","Common"):
st.session_state.emp_summary[name]["Corridor"] = st.session_state.emp_summary[name].get("Corridor",0) + 1
prev_person_boxes[idx] = p.box
for ph in phones:
(x1,y1,x2,y2) = ph.box
cv2.rectangle(vis,(x1,y1),(x2,y2),(58,131,255),2)
cv2.putText(vis,"phone",(x1,y1-6),cv2.FONT_HERSHEY_SIMPLEX,0.45,(58,131,255),1,cv2.LINE_AA)
if stream_live and stream_canvases[tile_index] is not None:
stream_canvases[tile_index].image(cv2.cvtColor(vis, cv2.COLOR_BGR2RGB), channels="RGB", use_column_width=True)
time.sleep(0.001)
if frame_anomaly:
preview_imgs.append(Image.fromarray(cv2.cvtColor(vis, cv2.COLOR_BGR2RGB)))
preview_captions.append(f"T{tile_index+1} β’ " + (" β’ ".join(sorted(frame_tags)) if frame_tags else "Anomaly"))
frame_no += 1
prog.progress(min(1.0, frame_no/max_frames))
cap.release(); os.remove(tmp_path); prog.progress(1.0)
st.success(f"Tile {tile_index+1}: processed {frame_no} frames")
out_csv = save_run_summary_csv(run_id, tag_site_floor, tag_camera)
if out_csv:
with open(out_csv, "rb") as f:
st.download_button(
label=f"Download run summary CSV ({Path(out_csv).name})",
data=f.read(),
file_name=Path(out_csv).name,
mime="text/csv",
key=f"dl_{run_id}"
)
write_daily_emp_summary_csv()
# ---- Upload (multi) ----
tagging_ready = bool(st.session_state.selected_site_floor) and bool(st.session_state.selected_camera)
ups = st.file_uploader(
"Upload one or more demo videos (MP4/AVI/MOV/MKV)",
type=["mp4","avi","mov","mkv"],
accept_multiple_files=True,
key="uploader_multi",
disabled=not tagging_ready
)
if not tagging_ready:
st.info("Pick **Site / Floor** and **Camera** first to enable uploads.")
names = [u.name for u in ups] if ups else []
if ups and st.session_state.last_uploaded_names != names:
st.session_state.last_uploaded_names = names
ensure_face_db(ups)
for i in range(min(3, len(ups))):
payload = ups[i].read()
st.session_state[f"last_video_bytes_{i}"] = payload
st.session_state[f"last_video_suffix_{i}"] = os.path.splitext(ups[i].name)[1]
run_in_tile(
i, payload, st.session_state[f"last_video_suffix_{i}"],
st.session_state.selected_site_floor, st.session_state.selected_camera
)
for i in range(3):
if replay_btns[i]:
key_b = f"last_video_bytes_{i}"
key_s = f"last_video_suffix_{i}"
if st.session_state.get(key_b) is None:
st.warning(f"Tile {i+1}: no previous video to replay.")
else:
run_in_tile(
i, st.session_state[key_b], st.session_state.get(key_s, ".mp4"),
st.session_state.selected_site_floor or "β",
st.session_state.selected_camera or f"Tile {i+1}"
)
if preview_imgs:
st.subheader(f"Anomaly Frames ({len(preview_imgs)}) β On Phone / Not Working / Away")
grid = st.columns(3)
for i, img in enumerate(preview_imgs[:9]):
with grid[i % 3]:
st.image(img, use_column_width=True, caption=preview_captions[i] if i < len(preview_captions) else None)
st.markdown("", unsafe_allow_html=True)
st.subheader("Detection Events")
f1, f2, f3 = st.columns(3)
with f1: st.selectbox("Employee", ["All"] + [v.get("name") for v in EMP.values()], key="events_filter_employee")
with f2: st.selectbox("Activity", ["All","Working","On Phone","Idle","Away"], key="events_filter_activity")
with f3: st.selectbox("Zone", ["All","Desk","Corridor","Lobby","Common"], key="events_filter_zone")
only_this_run = st.checkbox("Show only detections from this run", value=True, key="events_only_this_run")
df_show = st.session_state.events.copy()
if not df_show.empty:
df_show["ts"] = pd.to_datetime(df_show["ts"], errors="coerce", utc=True)
if only_this_run and st.session_state.current_run_id:
df_show = df_show[df_show["run_id"].str.startswith(st.session_state.current_run_id, na=False)]
df_show = df_show.sort_values("ts", ascending=False)
sub = df_show.tail(20).reset_index(drop=True)
for _, row in sub.iterrows():
colA, colB, colC, colD, colE = st.columns([1.6, 1.3, 0.8, 0.8, 0.6])
with colA:
st.markdown(f"**{row['activity']}** β {row['employee']}")
st.caption(f"{row['camera']} | {row['zone']}")
with colB:
ts_str = row["ts"]
try: ts_str = row["ts"].strftime("%Y-%m-%d %H:%M:%S %Z")
except: pass
st.caption(ts_str)
with colC:
st.write(f"{int(row.get('confidence',0))}%")
with colD:
st.write(row["id"])
with colE:
if st.button("View", key=f"view_{row['id']}"):
st.session_state.modal_event_id = row["id"]
st.session_state.modal_open = True
st.divider()
if st.session_state.modal_open and st.session_state.modal_event_id:
img = st.session_state.frame_cache.get(st.session_state.modal_event_id)
modal = Modal("Frame Review", key="frame_modal", max_width=900)
modal.open()
with modal.container():
st.markdown(
f"
"
f"Event: {st.session_state.modal_event_id}
",
unsafe_allow_html=True
)
if img is not None:
st.image(img, use_column_width=True)
else:
st.warning("Frame not available for this event.")
if st.button("Close", key="close_frame_modal"):
st.session_state.modal_open = False
st.session_state.modal_event_id = None
else:
st.info("No detections yet β upload a video to generate events.")
st.markdown("
", unsafe_allow_html=True)
st.markdown("", unsafe_allow_html=True)
st.subheader("Per-Employee Distribution (current run)")
dist_df = pd.DataFrame()
if not st.session_state.events.empty and st.session_state.current_run_id:
run_mask = st.session_state.events["run_id"].str.startswith(st.session_state.current_run_id, na=False)
cur = st.session_state.events[run_mask].copy()
if not cur.empty:
counts = cur.groupby(["employee","activity"]).size().unstack(fill_value=0)
for col in ["Working","On Phone","Idle","Away"]:
if col not in counts.columns: counts[col] = 0
counts["total"] = counts.sum(axis=1)
dist = counts[["Working","On Phone","Idle","Away"]].div(counts["total"].replace(0,1), axis=0)*100.0
dist = dist.round(2)
dist_df = counts.join(dist.add_suffix(" %")).reset_index().rename(columns={"employee":"Employee"})
st.dataframe(dist_df, use_container_width=True, hide_index=True)
emp_opts = list(dist_df["Employee"])
if emp_opts:
sel_emp = st.selectbox("Employee breakdown", emp_opts, key="emp_breakdown")
rowb = dist_df[dist_df["Employee"]==sel_emp].iloc[0]
chart_df = pd.DataFrame({
"Activity":["Working","On Phone","Idle","Away"],
"Percent":[rowb["Working %"],rowb["On Phone %"],rowb["Idle %"],rowb["Away %"]]
})
figb = px.bar(chart_df, x="Activity", y="Percent", range_y=[0,100])
figb.update_layout(height=260, margin=dict(l=0,r=0,t=10,b=0),
paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(0,0,0,0)")
st.plotly_chart(figb, use_container_width=True, config={"displayModeBar": False})
else:
st.caption("No events in this run yet.")
else:
st.caption("Run a video to see per-employee distribution.")
st.markdown("
", unsafe_allow_html=True)
c1, c2, c3 = st.columns(3)
with c1:
if st.button("Clear detections (all)"):
st.session_state.events = st.session_state.events.iloc[0:0]
st.session_state.frame_cache = {}
st.session_state.frame_paths = {}
st.session_state.emp_summary = {}
st.session_state.face_demo_map = {}
st.session_state.face_demo_used = set()
st.session_state.daily_summary_path = ""
st.success("Cleared all detections.")
with c2:
if st.button("Simulate stream error"):
st.error("Stream lost, reconnectingβ¦")
with c3:
if st.button("Simulate Superbse token expiry"):
st.toast("Superbse auth expired.", icon="β")
# ======================================================
# EMPLOYEES
# ======================================================
with tab_emp:
left, right = st.columns([1,2])
with left:
st.markdown("", unsafe_allow_html=True)
q = st.text_input("Search employees")
df_emp = pd.DataFrame([
{"id": k, "name": v.get("name"), "title": v.get("title"), "desk": v.get("deskId"),
"status": v.get("status"), "avatar": v.get("avatarUrl")}
for k, v in EMP.items()
])
if q:
df_emp = df_emp[df_emp["name"].str.contains(q, case=False)]
for _, row in df_emp.iterrows():
c = st.container()
cols = c.columns([0.2, 1, 0.6])
with cols[0]: st.image(row["avatar"], width=40)
with cols[1]: st.write(f"**{row['name']}**\n\n{row['title']} β’ {row['desk']}")
with cols[2]:
# live status hint
summ = st.session_state.emp_summary.get(row["name"], {})
status = "At Desk"
if summ.get("Away",0) > 0: status = "Away"
badge = "pill-success" if status=="At Desk" else "pill-danger"
st.markdown(f"{status}", unsafe_allow_html=True)
st.markdown("
", unsafe_allow_html=True)
with right:
st.markdown("", unsafe_allow_html=True)
if not df_emp.empty:
active = df_emp.iloc[0]
b1,b2,b3 = st.columns(3)
with b1:
if st.button("Sync to Superbse"):
st.toast("Synced to Superbse", icon="β
")
with b2:
st.button("Open Employee in Superbse")
with b3:
st.button("Export Day as PDF")
st.subheader(f"{active['name']} β {active['title']}")
st.caption(f"Desk {active['desk']}")
# >>> updated Face Gallery uses recent detection frames <<<
st.markdown("**Face Gallery**")
frames = recent_frames_for_employee(active["name"], n=3)
g1, g2, g3 = st.columns(3)
cols = [g1,g2,g3]
for i in range(3):
with cols[i]:
if i < len(frames):
eid, img = frames[i]
st.image(img, use_column_width=True, caption=f"Event {eid}")
else:
st.image("https://placehold.co/160x120?text=Face", use_column_width=True)
st.button("Approve", key=f"approve:{active['name']}:{i}")
st.button("Reject", key=f"reject:{active['name']}:{i}")
st.markdown("**Today summary (live frames)**")
c1,c2,c3,c4 = st.columns(4)
t = st.session_state.emp_summary.get(active["name"], {})
c1.metric("In / Out", "β")
c2.metric("Working (frames)", int(t.get("Working",0)))
c3.metric("Idle (frames)", int(t.get("Idle",0)))
c4.metric("Corridor (frames)", int(t.get("Corridor",0)))
else:
st.caption("No employees in seed.json")
st.markdown("
", unsafe_allow_html=True)
# ======================================================
# REPORTS
# ======================================================
with tab_reports:
kpi_base = st.session_state.events
if st.session_state.current_run_id:
kpi_base = kpi_base[kpi_base.get("run_id").str.startswith(st.session_state.current_run_id, na=False)]
KPI_LIST = kpis_from_events(kpi_base)
kcols = st.columns(5)
for i, (label, val) in enumerate(KPI_LIST):
with kcols[i % 5]:
st.markdown("", unsafe_allow_html=True)
st.write(label); st.subheader(val)
st.markdown("
", unsafe_allow_html=True)
dept_df = pd.DataFrame([
{"dept":"Eng","Working":6.8,"Idle":0.9,"OnPhone":0.5,"Away":0.3},
{"dept":"Ops","Working":7.2,"Idle":0.6,"OnPhone":0.4,"Away":0.2},
{"dept":"HR","Working":6.3,"Idle":1.1,"OnPhone":0.3,"Away":0.4},
])
c1,c2 = st.columns(2)
with c1:
st.markdown("", unsafe_allow_html=True)
st.write("By Department (Stacked)")
dfm = dept_df.melt(id_vars=["dept"], var_name="state", value_name="hours")
fig = px.bar(dfm, x="dept", y="hours", color="state", barmode="stack")
fig.update_layout(height=320, margin=dict(l=0,r=0,t=0,b=0),
paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(0,0,0,0)")
st.plotly_chart(fig, use_container_width=True, config={"displayModeBar": False})
st.markdown("
", unsafe_allow_html=True)
with c2:
st.markdown("", unsafe_allow_html=True)
st.write("Working Hours (30 days) β Trend")
trend = pd.DataFrame({"day": list(range(1,31)), "hours": [6.5 + np.sin(i/5)*0.7 + np.random.rand()*0.3 for i in range(30)]})
fig2 = px.line(trend, x="day", y="hours")
fig2.update_layout(height=320, margin=dict(l=0,r=0,t=0,b=0),
paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(0,0,0,0)")
st.plotly_chart(fig2, use_container_width=True, config={"displayModeBar": False})
st.markdown("
", unsafe_allow_html=True)
st.markdown("", unsafe_allow_html=True)
st.subheader("Report Builder")
rb1, rb2, rb3, rb4, rb5 = st.columns([1,1,1,1,2])
with rb1: st.date_input("Date range start", key="reports_date_start")
with rb2: st.date_input("Date range end", key="reports_date_end")
with rb3: st.selectbox("Team", ["All","Eng","Ops","HR"], key="reports_team")
with rb4: st.selectbox("Zone", ["All","Desk","Corridor","Lobby","Common"], key="reports_zone")
with rb5: st.slider("Activity threshold (Idle min)", 0, 120, 30, key="reports_idle_thresh")
rep = None
if st.session_state.get("daily_summary_path") and os.path.exists(st.session_state.daily_summary_path):
rep = pd.read_csv(st.session_state.daily_summary_path)
if "Employee" in rep.columns and "Working" in rep.columns:
rep = rep.copy()
if "Corridor" not in rep.columns:
rep["Corridor"] = 0
rep["Alerts"] = 0
else:
rep = pd.DataFrame([
{"Employee":"Aparna Rao","In":"09:12","Out":"18:07","Working":382,"Idle":46,"OnPhone":18,"Away":10,"Corridor":21,"Alerts":0},
{"Employee":"Karthik S","In":"09:15","Out":"18:05","Working":368,"Idle":43,"OnPhone":22,"Away":12,"Corridor":19,"Alerts":1},
{"Employee":"Nisha M","In":"09:20","Out":"18:10","Working":372,"Idle":40,"OnPhone":24,"Away":11,"Corridor":17,"Alerts":2},
])
st.dataframe(rep, use_container_width=True, hide_index=True)
d1, d2, d3 = st.columns([1,1,2])
with d1:
st.download_button("Export CSV", rep.to_csv(index=False).encode("utf-8"),
file_name="report.csv", mime="text/csv")
with d2:
if st.button("Export PDF"):
st.toast("PDF export queued (demo)", icon="π")
with d3:
st.toggle("Schedule to Superbse", value=False, help="Creates/refreshes daily summary (mock)")
st.markdown("
", unsafe_allow_html=True)
# -------------------- Settings Modal --------------------
if st.session_state.get("show_settings"):
st.markdown("""
Settings β Thresholds & PII
""", unsafe_allow_html=True)
s1, s2 = st.columns(2)
with s1: st.number_input("On Phone threshold (sec)", min_value=0, value=int(APPS.get("onPhoneSec", 60)))
with s2: st.number_input("Idle threshold (min)", min_value=0, value=int(APPS.get("idleMin", 10)))
st.toggle("Blur faces (PII)", key="settings_blur", value=bool(APPS.get("blurFaces", False)))
st.number_input("Retention days", min_value=0, value=int(APPS.get("retentionDays", 30)))
b1, b2 = st.columns(2)
with b1:
if st.button("Cancel"):
st.session_state.show_settings = False
with b2:
if st.button("Save"):
st.session_state.show_settings = False
st.session_state.privacy_blur = st.session_state.get("settings_blur", False)
st.toast("Settings saved", icon="β
")
st.markdown("
", unsafe_allow_html=True)
# -------------------- Daily detections CSV (append-only existing) --------------------
def persist_events_daily():
df = st.session_state.events
if df is None or df.empty:
return
new_count = len(df)
if new_count <= st.session_state.persist_count:
return
today_str = date.today().strftime("%Y%m%d")
file_path = f"/tmp/detections_{today_str}.csv"
chunk = df.iloc[st.session_state.persist_count:].copy()
chunk["ts"] = pd.to_datetime(chunk["ts"], errors="coerce", utc=True).astype(str)
if os.path.exists(file_path):
chunk.to_csv(file_path, mode="a", index=False, header=False)
else:
chunk.to_csv(file_path, index=False)
st.session_state.persist_count = new_count
st.session_state.persisted_csv = file_path
persist_events_daily()
if st.session_state.get("persisted_csv"):
st.caption(f"π Appending to `{Path(st.session_state.persisted_csv).name}` in /tmp (auto every batch)")
st.markdown("", unsafe_allow_html=True)