import os import io import cv2 import glob import time import base64 import shutil import uuid import numpy as np import pandas as pd import streamlit as st from datetime import datetime, timedelta from collections import defaultdict, deque # Optional but lightweight classical face recognizer # Requires opencv-contrib-python def make_lbph(): try: return cv2.face.LBPHFaceRecognizer_create() except Exception as e: return None # -------------------- THEME & STYLES -------------------- st.set_page_config(page_title="FitHub AI - Gym Ops", layout="wide", page_icon="💪") DARK_CSS = """ """ st.markdown(DARK_CSS, unsafe_allow_html=True) st.markdown(""" """, unsafe_allow_html=True) # -------------------- CONFIG -------------------- SLOTS = [ ("05:00","07:00"), ("07:00","09:00"), ("09:00","11:00"), ("11:00","13:00"), ("13:00","15:00"), ("15:00","17:00"), ("17:00","19:00"), ("19:00","21:00"), ] def slot_index_for(ts: datetime): t = ts.time() for i,(a,b) in enumerate(SLOTS): a_dt = datetime.combine(ts.date(), datetime.strptime(a,"%H:%M").time()) b_dt = datetime.combine(ts.date(), datetime.strptime(b,"%H:%M").time()) if a_dt.time() <= t < b_dt.time(): return i return None DATA_DIR = "data" FACES_DIR = os.path.join(DATA_DIR, "faces") INTRUDERS_DIR = os.path.join(DATA_DIR, "intruders") EVENTS_CSV = os.path.join(DATA_DIR, "events.csv") MEMBERS_CSV = os.path.join(DATA_DIR, "members.csv") SUBS_CSV = os.path.join(DATA_DIR, "subscriptions.csv") os.makedirs(FACES_DIR, exist_ok=True) os.makedirs(INTRUDERS_DIR, exist_ok=True) os.makedirs(DATA_DIR, exist_ok=True) # Ensure CSVs exist for fp, cols in [ (MEMBERS_CSV, ["member_id","name","phone","plan","status"]), (SUBS_CSV, ["plan","price","active_members"]), (EVENTS_CSV, ["ts","type","member_id","slot_idx","details","img_path"]), ]: if not os.path.exists(fp): pd.DataFrame(columns=cols).to_csv(fp, index=False) # -------------------- HELPERS -------------------- def load_df(path): return pd.read_csv(path) if os.path.exists(path) else pd.DataFrame() def save_df(df, path): df.to_csv(path, index=False) def add_event(evt_type, member_id=None, slot_idx=None, details="", img=None, ts=None): ts = ts or datetime.utcnow().isoformat() img_path = "" if img is not None: uid = str(uuid.uuid4())[:8] + ".jpg" img_path = os.path.join(INTRUDERS_DIR if evt_type=="intruder" else DATA_DIR, uid) cv2.imwrite(img_path, img) df = load_df(EVENTS_CSV) row = {"ts": ts, "type": evt_type, "member_id": member_id or "", "slot_idx": slot_idx if slot_idx is not None else "", "details": details, "img_path": img_path} df = pd.concat([df, pd.DataFrame([row])], ignore_index=True) save_df(df, EVENTS_CSV) # -------------------- ML: DETECTORS -------------------- try: import mediapipe as mp mp_face = mp.solutions.face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) mp_pose = mp.solutions.pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) except Exception as e: mp_face = None mp_pose = None def detect_faces(frame_bgr): if mp_face is None: return [] img_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) res = mp_face.process(img_rgb) boxes = [] if res.detections: h, w = frame_bgr.shape[:2] for det in res.detections: bbox = det.location_data.relative_bounding_box x, y, bw, bh = bbox.xmin, bbox.ymin, bbox.width, bbox.height x1 = max(int(x * w), 0); y1 = max(int(y * h), 0) x2 = min(int((x + bw) * w), w); y2 = min(int((y + bh) * h), h) boxes.append((x1, y1, x2, y2, float(det.score[0]))) return boxes def detect_fall(frame_bgr, history: dict, person_id=0, ts=None): if mp_pose is None: return False, {} ts = ts or datetime.utcnow() img_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) res = mp_pose.process(img_rgb) if not res.pose_landmarks: return False, {} h, w = frame_bgr.shape[:2] lm = res.pose_landmarks.landmark def p(i): return np.array([lm[i].x*w, lm[i].y*h]) nose = p(0); lhip = p(23); rhip = p(24); lsh = p(11); rsh = p(12) hip = (lhip + rhip) / 2 shoulder = (lsh + rsh) / 2 body_vec = shoulder - hip angle = abs(np.degrees(np.arctan2(body_vec[1], body_vec[0]))) q = history.setdefault("hip_y", deque(maxlen=10)) q.append(float(hip[1])) sudden_drop = len(q) >= 5 and (q[-5] - q[-1]) < -40 horizontal = angle < 25 low_head = abs(nose[1] - hip[1]) < 80 fallen = (horizontal and low_head) or sudden_drop return fallen, {"angle": angle, "hip_y": float(hip[1])} def train_lbph_from_faces(): rec = make_lbph() if rec is None: return None, {} images, labels = [], [] label_to_member, label = {}, 0 for member_id in os.listdir(FACES_DIR): p = os.path.join(FACES_DIR, member_id) if not os.path.isdir(p): continue for imgp in glob.glob(os.path.join(p, "*.jpg")): img = cv2.imread(imgp, cv2.IMREAD_GRAYSCALE) if img is None: continue images.append(img) labels.append(label) label_to_member[label] = member_id label += 1 if not images: return None, {} rec.train(images, np.array(labels)) return rec, label_to_member @st.cache_resource(show_spinner=False) def get_face_model(): return train_lbph_from_faces() def recognize_or_intruder(face_img_bgr, rec, label_map, conf_thresh=65): g = cv2.cvtColor(cv2.resize(face_img_bgr, (200,200)), cv2.COLOR_BGR2GRAY) try: pred, conf = rec.predict(g) except Exception as e: return None, 1000.0 member_id = label_map.get(int(pred)) return member_id, conf # -------------------- SIDEBAR -------------------- with st.sidebar: st.markdown("### FitHub • Admin") st.write("AI-powered gym operations — visitors, intruders, attendance, and anomaly detection from CCTV frames.") st.markdown("---") st.markdown("**Time Slots (2h, 5am–9pm)**") for i,(a,b) in enumerate(SLOTS): st.caption(f"{i+1}. {a}–{b}") st.markdown("---") st.caption("Upload videos or a frames folder. The app detects faces, recognizes members if trained, logs visitors, flags intruders, and detects possible falls.") # -------------------- DATA LOAD -------------------- def metric_card(label, value, delta=None, help_txt=None): delta_html = f'
{delta}
' if delta else "" html = f"""
{label}
{value}
{delta_html}
""" st.markdown(html, unsafe_allow_html=True) def load_df(path): return pd.read_csv(path) if os.path.exists(path) else pd.DataFrame() members = load_df(MEMBERS_CSV) subs = load_df(SUBS_CSV) events = load_df(EVENTS_CSV) # -------------------- TABS -------------------- tab_dash, tab_att, tab_members, tab_subs, tab_ingest, tab_incidents = st.tabs([ "📊 Dashboard","🧑‍🤝‍🧑 Attendance","👥 Members","💳 Subscriptions","📹 Process CCTV","🚨 Incidents" ]) # -------------------- DASHBOARD -------------------- with tab_dash: st.markdown("## Admin Dashboard") c1,c2,c3,c4 = st.columns(4) with c1: metric_card("Total Members", len(members)) month = datetime.utcnow().strftime("%Y-%m") month_events = events[events["ts"].astype(str).str.startswith(month)] if not events.empty else pd.DataFrame() with c2: metric_card("Monthly Visitors", len(month_events[month_events["type"]=="visit"]) if not month_events.empty else 0) with c3: metric_card("Active Subscriptions", int(subs["active_members"].sum()) if not subs.empty else 0) today_str = datetime.utcnow().date().isoformat() with c4: metric_card("Check-ins Today", len(events[events["ts"].astype(str).str.startswith(today_str)]) if not events.empty else 0) st.markdown("### Expected vs Actual per Slot (Today)") expected_rate = st.slider("Expected % of active members per slot", 0, 100, 18, 1) active_members = int(subs["active_members"].sum()) if not subs.empty else len(members) expected = [int(active_members * expected_rate / 100) for _ in SLOTS] slot_counts = [0]*len(SLOTS) if not events.empty: today = events[events["ts"].astype(str).str.startswith(today_str)] for idx in today["slot_idx"].dropna().astype(int).values: if 0 <= idx < len(slot_counts): slot_counts[idx] += 1 else: today = pd.DataFrame() df = pd.DataFrame({ "Slot": [f"{a}-{b}" for a,b in SLOTS], "Expected": expected, "Actual": slot_counts }) import altair as alt df_long = df.melt("Slot", var_name="Type", value_name="Count") chart = ( alt.Chart(df_long) .mark_bar() .encode( x=alt.X("Slot:N", sort=list(df["Slot"])), y=alt.Y("Count:Q"), color=alt.Color("Type:N"), tooltip=["Slot","Type","Count"] ) ) st.altair_chart(chart, use_container_width=True) st.markdown("### Less Attendance (Last 7 days)") low_df = pd.DataFrame(columns=["member_id","checkins_7d"]) if not events.empty: mask = (pd.to_datetime(events["ts"]) >= (datetime.utcnow()-timedelta(days=7))) vis = events[mask & (events["type"]=="visit")] if not vis.empty: cnt = vis.groupby("member_id").size().reset_index(name="checkins_7d") low_df = cnt[cnt["checkins_7d"] < st.number_input("Threshold check-ins/week", 0, 20, 2)] st.dataframe(low_df, use_container_width=True) st.markdown("### Intruders (Today)") if not today.empty: intr_today = today[today["type"]=="intruder"] if not intr_today.empty: img_cols = st.columns(6) for i,(_,row) in enumerate(intr_today.head(24).iterrows()): c = img_cols[i%6] with c: if row["img_path"] and os.path.exists(row["img_path"]): st.image(row["img_path"], caption=row["ts"].split("T")[1][:8], use_column_width=True) # -------------------- ATTENDANCE -------------------- with tab_att: st.markdown("## Attendance Tracker") member_ids = members["member_id"].tolist() if not members.empty else [] colA, colB = st.columns([1,2]) with colA: sel_member = st.selectbox("Select member", ["-"]+member_ids) period = st.selectbox("Time Period", ["Today","This Week","This Month"]) if sel_member != "-": dfm = events[(events["type"]=="visit") & (events["member_id"]==sel_member)].copy() if not events.empty else pd.DataFrame() if period == "Today": dfm = dfm[dfm["ts"].astype(str).str.startswith(datetime.utcnow().date().isoformat())] elif period == "This Week": start = (datetime.utcnow()-timedelta(days=7)).date().isoformat() dfm = dfm[dfm["ts"] >= start] elif period == "This Month": month = datetime.utcnow().strftime("%Y-%m") dfm = dfm[dfm["ts"].astype(str).str.startswith(month)] st.write(f"Check-ins: **{len(dfm)}**") if not dfm.empty: dfm["Slot"] = dfm["slot_idx"].astype(int).map(lambda i: f"{SLOTS[i][0]}-{SLOTS[i][1]}" if 0<=i/*.jpg` and click **Train Face Model**.") st.write("### Roster") st.dataframe(members, use_container_width=True) with st.expander("➕ Add Member"): mid = st.text_input("Member ID") name = st.text_input("Name") phone = st.text_input("Phone") plan = st.text_input("Plan", value="Basic") if st.button("Add Member"): if mid and (members.empty or mid not in members["member_id"].values): new = pd.DataFrame([{"member_id":mid,"name":name,"phone":phone,"plan":plan,"status":"active"}]) df_all = pd.concat([members,new], ignore_index=True) if not members.empty else new df_all.to_csv(MEMBERS_CSV, index=False) st.success("Member added. Reload the app to refresh.") else: st.error("Member ID exists or invalid.") st.markdown("### Face Recognition") if st.button("Train Face Model"): rec, label_map = train_lbph_from_faces() if rec: st.success(f"Trained on {len(label_map)} members") else: st.warning("No training data found or recognizer unavailable.") # -------------------- SUBSCRIPTIONS -------------------- with tab_subs: st.markdown("## Subscriptions") st.dataframe(subs, use_container_width=True) with st.expander("➕ Add/Update Plan Summary"): plan = st.text_input("Plan Name", "Basic") price = st.number_input("Price (₹/30 days)", 0, 100000, 999) active = st.number_input("Active Members", 0, 100000, 0) if st.button("Save Plan"): df = load_df(SUBS_CSV) if not df.empty and plan in df["plan"].values: df.loc[df["plan"]==plan, ["price","active_members"]] = [price, active] else: df = pd.concat([df, pd.DataFrame([{"plan":plan,"price":price,"active_members":active}])], ignore_index=True) df.to_csv(SUBS_CSV, index=False) st.success("Saved. Reload to refresh.") # -------------------- PROCESSING -------------------- with tab_ingest: st.markdown("## Process CCTV Footage") st.caption("Upload a **video** or a **frames zip/folder**. We detect faces, recognize members if trained, log visits per 2h slot, capture intruders, and flag possible falls.") input_type = st.radio("Input Type", ["Video (mp4/avi)","Frames Folder"], horizontal=True) face_model = None label_map = {} rec_cached = get_face_model() if rec_cached and rec_cached[0] is not None: face_model, label_map = rec_cached conf_thresh = st.slider("LBPH confidence threshold (lower = stricter)", 40, 120, 65) def process_frame(frame, ts): slot_idx = slot_index_for(ts) boxes = detect_faces(frame) visited_members = set() for (x1,y1,x2,y2,score) in boxes: crop = frame[y1:y2, x1:x2].copy() member_id = None if face_model is not None and len(label_map): mid, conf = recognize_or_intruder(crop, face_model, label_map, conf_thresh) if mid is not None and conf <= conf_thresh: member_id = mid if member_id: add_event("visit", member_id=member_id, slot_idx=slot_idx, details=f"face_conf_ok", img=None, ts=ts.isoformat()) visited_members.add(member_id) else: add_event("intruder", member_id="", slot_idx=slot_idx, details="unknown_face", img=crop, ts=ts.isoformat()) fell, debug = detect_fall(frame, history=process_frame.history, ts=ts) if fell: add_event("anomaly_fall", slot_idx=slot_idx, details=str(debug), img=frame, ts=ts.isoformat()) return boxes, visited_members process_frame.history = {} if input_type == "Video (mp4/avi)": vid = st.file_uploader("Upload video", type=["mp4","avi","mov","mkv"]) fps = st.number_input("Assumed FPS (if missing)", 1, 60, 10) if st.button("Run Video Analysis") and vid is not None: t0 = datetime.utcnow() tmp_path = os.path.join(DATA_DIR, f"upload_{uuid.uuid4().hex}.mp4") with open(tmp_path, "wb") as f: f.write(vid.read()) cap = cv2.VideoCapture(tmp_path) frame_idx = 0 while True: ok, frame = cap.read() if not ok: break ts = t0 + timedelta(seconds=frame_idx/float(fps)) boxes, visited = process_frame(frame, ts) frame_idx += 1 cap.release() st.success("Video processed. Check Dashboard & Attendance tabs.") else: z = st.file_uploader("Upload a ZIP of frames (named by time order)", type=["zip"]) if st.button("Run Frames Analysis") and z is not None: tmp_zip = os.path.join(DATA_DIR, f"frames_{uuid.uuid4().hex}.zip") with open(tmp_zip, "wb") as f: f.write(z.read()) extract_dir = tmp_zip.replace(".zip","") shutil.unpack_archive(tmp_zip, extract_dir) paths = sorted([p for p in glob.glob(os.path.join(extract_dir, "**/*.*"), recursive=True) if p.lower().endswith((".jpg",".png",".jpeg"))]) t0 = datetime.utcnow() for i,p in enumerate(paths): frame = cv2.imread(p) if frame is None: continue ts = t0 + timedelta(seconds=i*0.5) # assume 2 FPS process_frame(frame, ts) st.success("Frames processed.") # -------------------- INCIDENTS -------------------- with tab_incidents: st.markdown("## Security Incidents") if not events.empty: anomalies = events[events["type"].str.contains("anomaly", na=False)] st.write(f"Anomalies recorded: **{len(anomalies)}**") st.dataframe(anomalies.sort_values("ts", ascending=False), use_container_width=True) else: st.info("No incidents recorded yet.") # -------------------- UTIL: EXPORT -------------------- st.markdown("---") c1,c2 = st.columns(2) with c1: if os.path.exists(EVENTS_CSV): with open(EVENTS_CSV, "rb") as fh: st.download_button("Download events.csv", data=fh, file_name="events.csv") with c2: st.caption("Pro tip: mount a Space Storage to persist `/data` across runs.")