| |
|
| | import os |
| | import io |
| | import cv2 |
| | import glob |
| | import time |
| | import base64 |
| | import shutil |
| | import uuid |
| | import numpy as np |
| | import pandas as pd |
| | import streamlit as st |
| | from datetime import datetime, timedelta |
| | from collections import defaultdict, deque |
| |
|
| | |
| | |
| | def make_lbph(): |
| | try: |
| | return cv2.face.LBPHFaceRecognizer_create() |
| | except Exception as e: |
| | return None |
| |
|
| | |
| | st.set_page_config(page_title="FitHub AI - Gym Ops", layout="wide", page_icon="💪") |
| |
|
| | DARK_CSS = """ |
| | <style> |
| | :root { |
| | --bg: #0f172a; |
| | --panel: #111827; |
| | --muted: #1f2937; |
| | --card: #0b1220; |
| | --text: #e5e7eb; |
| | --subtext: #9ca3af; |
| | --primary: #7c3aed; |
| | --primary-2: #a78bfa; |
| | --good: #22c55e; |
| | --warn: #f59e0b; |
| | --bad: #ef4444; |
| | } |
| | |
| | html, body, [class^="css"] { |
| | background-color: var(--bg); |
| | color: var(--text); |
| | } |
| | section.main > div { padding-top: 0.5rem; } |
| | .block-container { padding-top: 1rem; max-width: 1400px; } |
| | |
| | .card { |
| | background: linear-gradient(180deg, rgba(124,58,237,0.06), rgba(10,10,10,0.4)); |
| | border: 1px solid rgba(167,139,250,0.25); |
| | border-radius: 16px; |
| | padding: 18px 18px 14px 18px; |
| | box-shadow: 0 8px 20px rgba(0,0,0,0.35); |
| | } |
| | .metric { font-size: 28px; font-weight: 800; letter-spacing: .5px; } |
| | .label { color: var(--subtext); font-size: 12px; text-transform: uppercase; } |
| | |
| | .stButton>button { |
| | background: radial-gradient(120% 120% at 50% 0%, var(--primary), #5b21b6); |
| | border: 0; |
| | color: white; |
| | padding: .6rem 1rem; |
| | border-radius: 12px; |
| | } |
| | .stDownloadButton>button { border-radius: 12px; } |
| | </style> |
| | """ |
| | st.markdown(DARK_CSS, unsafe_allow_html=True) |
| | st.markdown(""" |
| | <style> |
| | h2, .stMarkdown h2 { letter-spacing: .3px; font-weight: 800; } |
| | hr.sep { border: 0; height: 1px; background: rgba(255,255,255,0.08); margin: 8px 0 16px; } |
| | </style> |
| | """, unsafe_allow_html=True) |
| |
|
| | |
| | SLOTS = [ |
| | ("05:00","07:00"), |
| | ("07:00","09:00"), |
| | ("09:00","11:00"), |
| | ("11:00","13:00"), |
| | ("13:00","15:00"), |
| | ("15:00","17:00"), |
| | ("17:00","19:00"), |
| | ("19:00","21:00"), |
| | ] |
| | def slot_index_for(ts: datetime): |
| | t = ts.time() |
| | for i,(a,b) in enumerate(SLOTS): |
| | a_dt = datetime.combine(ts.date(), datetime.strptime(a,"%H:%M").time()) |
| | b_dt = datetime.combine(ts.date(), datetime.strptime(b,"%H:%M").time()) |
| | if a_dt.time() <= t < b_dt.time(): |
| | return i |
| | return None |
| |
|
| | DATA_DIR = "data" |
| | FACES_DIR = os.path.join(DATA_DIR, "faces") |
| | INTRUDERS_DIR = os.path.join(DATA_DIR, "intruders") |
| | EVENTS_CSV = os.path.join(DATA_DIR, "events.csv") |
| | MEMBERS_CSV = os.path.join(DATA_DIR, "members.csv") |
| | SUBS_CSV = os.path.join(DATA_DIR, "subscriptions.csv") |
| |
|
| | os.makedirs(FACES_DIR, exist_ok=True) |
| | os.makedirs(INTRUDERS_DIR, exist_ok=True) |
| | os.makedirs(DATA_DIR, exist_ok=True) |
| |
|
| | |
| | for fp, cols in [ |
| | (MEMBERS_CSV, ["member_id","name","phone","plan","status"]), |
| | (SUBS_CSV, ["plan","price","active_members"]), |
| | (EVENTS_CSV, ["ts","type","member_id","slot_idx","details","img_path"]), |
| | ]: |
| | if not os.path.exists(fp): |
| | pd.DataFrame(columns=cols).to_csv(fp, index=False) |
| |
|
| | |
| | def load_df(path): |
| | return pd.read_csv(path) if os.path.exists(path) else pd.DataFrame() |
| |
|
| | def save_df(df, path): |
| | df.to_csv(path, index=False) |
| |
|
| | def add_event(evt_type, member_id=None, slot_idx=None, details="", img=None, ts=None): |
| | ts = ts or datetime.utcnow().isoformat() |
| | img_path = "" |
| | if img is not None: |
| | uid = str(uuid.uuid4())[:8] + ".jpg" |
| | img_path = os.path.join(INTRUDERS_DIR if evt_type=="intruder" else DATA_DIR, uid) |
| | cv2.imwrite(img_path, img) |
| | df = load_df(EVENTS_CSV) |
| | row = {"ts": ts, "type": evt_type, "member_id": member_id or "", |
| | "slot_idx": slot_idx if slot_idx is not None else "", |
| | "details": details, "img_path": img_path} |
| | df = pd.concat([df, pd.DataFrame([row])], ignore_index=True) |
| | save_df(df, EVENTS_CSV) |
| |
|
| | |
| | try: |
| | import mediapipe as mp |
| | mp_face = mp.solutions.face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) |
| | mp_pose = mp.solutions.pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) |
| | except Exception as e: |
| | mp_face = None |
| | mp_pose = None |
| |
|
| | def detect_faces(frame_bgr): |
| | if mp_face is None: return [] |
| | img_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) |
| | res = mp_face.process(img_rgb) |
| | boxes = [] |
| | if res.detections: |
| | h, w = frame_bgr.shape[:2] |
| | for det in res.detections: |
| | bbox = det.location_data.relative_bounding_box |
| | x, y, bw, bh = bbox.xmin, bbox.ymin, bbox.width, bbox.height |
| | x1 = max(int(x * w), 0); y1 = max(int(y * h), 0) |
| | x2 = min(int((x + bw) * w), w); y2 = min(int((y + bh) * h), h) |
| | boxes.append((x1, y1, x2, y2, float(det.score[0]))) |
| | return boxes |
| |
|
| | def detect_fall(frame_bgr, history: dict, person_id=0, ts=None): |
| | if mp_pose is None: |
| | return False, {} |
| | ts = ts or datetime.utcnow() |
| | img_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) |
| | res = mp_pose.process(img_rgb) |
| | if not res.pose_landmarks: |
| | return False, {} |
| | h, w = frame_bgr.shape[:2] |
| | lm = res.pose_landmarks.landmark |
| |
|
| | def p(i): |
| | return np.array([lm[i].x*w, lm[i].y*h]) |
| |
|
| | nose = p(0); lhip = p(23); rhip = p(24); lsh = p(11); rsh = p(12) |
| | hip = (lhip + rhip) / 2 |
| | shoulder = (lsh + rsh) / 2 |
| | body_vec = shoulder - hip |
| | angle = abs(np.degrees(np.arctan2(body_vec[1], body_vec[0]))) |
| |
|
| | q = history.setdefault("hip_y", deque(maxlen=10)) |
| | q.append(float(hip[1])) |
| | sudden_drop = len(q) >= 5 and (q[-5] - q[-1]) < -40 |
| |
|
| | horizontal = angle < 25 |
| | low_head = abs(nose[1] - hip[1]) < 80 |
| | fallen = (horizontal and low_head) or sudden_drop |
| |
|
| | return fallen, {"angle": angle, "hip_y": float(hip[1])} |
| |
|
| | def train_lbph_from_faces(): |
| | rec = make_lbph() |
| | if rec is None: |
| | return None, {} |
| | images, labels = [], [] |
| | label_to_member, label = {}, 0 |
| | for member_id in os.listdir(FACES_DIR): |
| | p = os.path.join(FACES_DIR, member_id) |
| | if not os.path.isdir(p): |
| | continue |
| | for imgp in glob.glob(os.path.join(p, "*.jpg")): |
| | img = cv2.imread(imgp, cv2.IMREAD_GRAYSCALE) |
| | if img is None: continue |
| | images.append(img) |
| | labels.append(label) |
| | label_to_member[label] = member_id |
| | label += 1 |
| | if not images: |
| | return None, {} |
| | rec.train(images, np.array(labels)) |
| | return rec, label_to_member |
| |
|
| | @st.cache_resource(show_spinner=False) |
| | def get_face_model(): |
| | return train_lbph_from_faces() |
| |
|
| | def recognize_or_intruder(face_img_bgr, rec, label_map, conf_thresh=65): |
| | g = cv2.cvtColor(cv2.resize(face_img_bgr, (200,200)), cv2.COLOR_BGR2GRAY) |
| | try: |
| | pred, conf = rec.predict(g) |
| | except Exception as e: |
| | return None, 1000.0 |
| | member_id = label_map.get(int(pred)) |
| | return member_id, conf |
| |
|
| | |
| | with st.sidebar: |
| | st.markdown("### FitHub • Admin") |
| | st.write("AI-powered gym operations — visitors, intruders, attendance, and anomaly detection from CCTV frames.") |
| | st.markdown("---") |
| | st.markdown("**Time Slots (2h, 5am–9pm)**") |
| | for i,(a,b) in enumerate(SLOTS): |
| | st.caption(f"{i+1}. {a}–{b}") |
| | st.markdown("---") |
| | st.caption("Upload videos or a frames folder. The app detects faces, recognizes members if trained, logs visitors, flags intruders, and detects possible falls.") |
| |
|
| | |
| | def metric_card(label, value, delta=None, help_txt=None): |
| | delta_html = f'<div class="label">{delta}</div>' if delta else "" |
| | html = f""" |
| | <div class="card"> |
| | <div class="label">{label}</div> |
| | <div class="metric">{value}</div> |
| | {delta_html} |
| | </div> |
| | """ |
| | st.markdown(html, unsafe_allow_html=True) |
| |
|
| | def load_df(path): |
| | return pd.read_csv(path) if os.path.exists(path) else pd.DataFrame() |
| |
|
| | members = load_df(MEMBERS_CSV) |
| | subs = load_df(SUBS_CSV) |
| | events = load_df(EVENTS_CSV) |
| |
|
| | |
| | tab_dash, tab_att, tab_members, tab_subs, tab_ingest, tab_incidents = st.tabs([ |
| | "📊 Dashboard","🧑🤝🧑 Attendance","👥 Members","💳 Subscriptions","📹 Process CCTV","🚨 Incidents" |
| | ]) |
| |
|
| | |
| | with tab_dash: |
| | st.markdown("## Admin Dashboard") |
| | c1,c2,c3,c4 = st.columns(4) |
| | with c1: metric_card("Total Members", len(members)) |
| | month = datetime.utcnow().strftime("%Y-%m") |
| | month_events = events[events["ts"].astype(str).str.startswith(month)] if not events.empty else pd.DataFrame() |
| | with c2: metric_card("Monthly Visitors", len(month_events[month_events["type"]=="visit"]) if not month_events.empty else 0) |
| | with c3: metric_card("Active Subscriptions", int(subs["active_members"].sum()) if not subs.empty else 0) |
| | today_str = datetime.utcnow().date().isoformat() |
| | with c4: metric_card("Check-ins Today", len(events[events["ts"].astype(str).str.startswith(today_str)]) if not events.empty else 0) |
| |
|
| | st.markdown("### Expected vs Actual per Slot (Today)") |
| | expected_rate = st.slider("Expected % of active members per slot", 0, 100, 18, 1) |
| | active_members = int(subs["active_members"].sum()) if not subs.empty else len(members) |
| | expected = [int(active_members * expected_rate / 100) for _ in SLOTS] |
| |
|
| | slot_counts = [0]*len(SLOTS) |
| | if not events.empty: |
| | today = events[events["ts"].astype(str).str.startswith(today_str)] |
| | for idx in today["slot_idx"].dropna().astype(int).values: |
| | if 0 <= idx < len(slot_counts): slot_counts[idx] += 1 |
| | else: |
| | today = pd.DataFrame() |
| |
|
| | df = pd.DataFrame({ |
| | "Slot": [f"{a}-{b}" for a,b in SLOTS], |
| | "Expected": expected, |
| | "Actual": slot_counts |
| | }) |
| | import altair as alt |
| | df_long = df.melt("Slot", var_name="Type", value_name="Count") |
| | chart = ( |
| | alt.Chart(df_long) |
| | .mark_bar() |
| | .encode( |
| | x=alt.X("Slot:N", sort=list(df["Slot"])), |
| | y=alt.Y("Count:Q"), |
| | color=alt.Color("Type:N"), |
| | tooltip=["Slot","Type","Count"] |
| | ) |
| | ) |
| | st.altair_chart(chart, use_container_width=True) |
| |
|
| | st.markdown("### Less Attendance (Last 7 days)") |
| | low_df = pd.DataFrame(columns=["member_id","checkins_7d"]) |
| | if not events.empty: |
| | mask = (pd.to_datetime(events["ts"]) >= (datetime.utcnow()-timedelta(days=7))) |
| | vis = events[mask & (events["type"]=="visit")] |
| | if not vis.empty: |
| | cnt = vis.groupby("member_id").size().reset_index(name="checkins_7d") |
| | low_df = cnt[cnt["checkins_7d"] < st.number_input("Threshold check-ins/week", 0, 20, 2)] |
| | st.dataframe(low_df, use_container_width=True) |
| |
|
| | st.markdown("### Intruders (Today)") |
| | if not today.empty: |
| | intr_today = today[today["type"]=="intruder"] |
| | if not intr_today.empty: |
| | img_cols = st.columns(6) |
| | for i,(_,row) in enumerate(intr_today.head(24).iterrows()): |
| | c = img_cols[i%6] |
| | with c: |
| | if row["img_path"] and os.path.exists(row["img_path"]): |
| | st.image(row["img_path"], caption=row["ts"].split("T")[1][:8], use_column_width=True) |
| |
|
| | |
| | with tab_att: |
| | st.markdown("## Attendance Tracker") |
| | member_ids = members["member_id"].tolist() if not members.empty else [] |
| | colA, colB = st.columns([1,2]) |
| | with colA: |
| | sel_member = st.selectbox("Select member", ["-"]+member_ids) |
| | period = st.selectbox("Time Period", ["Today","This Week","This Month"]) |
| | if sel_member != "-": |
| | dfm = events[(events["type"]=="visit") & (events["member_id"]==sel_member)].copy() if not events.empty else pd.DataFrame() |
| | if period == "Today": |
| | dfm = dfm[dfm["ts"].astype(str).str.startswith(datetime.utcnow().date().isoformat())] |
| | elif period == "This Week": |
| | start = (datetime.utcnow()-timedelta(days=7)).date().isoformat() |
| | dfm = dfm[dfm["ts"] >= start] |
| | elif period == "This Month": |
| | month = datetime.utcnow().strftime("%Y-%m") |
| | dfm = dfm[dfm["ts"].astype(str).str.startswith(month)] |
| | st.write(f"Check-ins: **{len(dfm)}**") |
| | if not dfm.empty: |
| | dfm["Slot"] = dfm["slot_idx"].astype(int).map(lambda i: f"{SLOTS[i][0]}-{SLOTS[i][1]}" if 0<=i<len(SLOTS) else "?") |
| | st.dataframe(dfm[["ts","Slot","details"]], use_container_width=True) |
| |
|
| | |
| | with tab_members: |
| | st.markdown("## Members") |
| | st.caption("Add members with a unique **member_id**. Optionally drop face images into `data/faces/<member_id>/*.jpg` and click **Train Face Model**.") |
| | st.write("### Roster") |
| | st.dataframe(members, use_container_width=True) |
| | with st.expander("➕ Add Member"): |
| | mid = st.text_input("Member ID") |
| | name = st.text_input("Name") |
| | phone = st.text_input("Phone") |
| | plan = st.text_input("Plan", value="Basic") |
| | if st.button("Add Member"): |
| | if mid and (members.empty or mid not in members["member_id"].values): |
| | new = pd.DataFrame([{"member_id":mid,"name":name,"phone":phone,"plan":plan,"status":"active"}]) |
| | df_all = pd.concat([members,new], ignore_index=True) if not members.empty else new |
| | df_all.to_csv(MEMBERS_CSV, index=False) |
| | st.success("Member added. Reload the app to refresh.") |
| | else: |
| | st.error("Member ID exists or invalid.") |
| |
|
| | st.markdown("### Face Recognition") |
| | if st.button("Train Face Model"): |
| | rec, label_map = train_lbph_from_faces() |
| | if rec: |
| | st.success(f"Trained on {len(label_map)} members") |
| | else: |
| | st.warning("No training data found or recognizer unavailable.") |
| |
|
| | |
| | with tab_subs: |
| | st.markdown("## Subscriptions") |
| | st.dataframe(subs, use_container_width=True) |
| | with st.expander("➕ Add/Update Plan Summary"): |
| | plan = st.text_input("Plan Name", "Basic") |
| | price = st.number_input("Price (₹/30 days)", 0, 100000, 999) |
| | active = st.number_input("Active Members", 0, 100000, 0) |
| | if st.button("Save Plan"): |
| | df = load_df(SUBS_CSV) |
| | if not df.empty and plan in df["plan"].values: |
| | df.loc[df["plan"]==plan, ["price","active_members"]] = [price, active] |
| | else: |
| | df = pd.concat([df, pd.DataFrame([{"plan":plan,"price":price,"active_members":active}])], ignore_index=True) |
| | df.to_csv(SUBS_CSV, index=False) |
| | st.success("Saved. Reload to refresh.") |
| |
|
| | |
| | with tab_ingest: |
| | st.markdown("## Process CCTV Footage") |
| | st.caption("Upload a **video** or a **frames zip/folder**. We detect faces, recognize members if trained, log visits per 2h slot, capture intruders, and flag possible falls.") |
| | input_type = st.radio("Input Type", ["Video (mp4/avi)","Frames Folder"], horizontal=True) |
| |
|
| | face_model = None |
| | label_map = {} |
| | rec_cached = get_face_model() |
| | if rec_cached and rec_cached[0] is not None: |
| | face_model, label_map = rec_cached |
| |
|
| | conf_thresh = st.slider("LBPH confidence threshold (lower = stricter)", 40, 120, 65) |
| |
|
| | def process_frame(frame, ts): |
| | slot_idx = slot_index_for(ts) |
| | boxes = detect_faces(frame) |
| | visited_members = set() |
| | for (x1,y1,x2,y2,score) in boxes: |
| | crop = frame[y1:y2, x1:x2].copy() |
| | member_id = None |
| | if face_model is not None and len(label_map): |
| | mid, conf = recognize_or_intruder(crop, face_model, label_map, conf_thresh) |
| | if mid is not None and conf <= conf_thresh: |
| | member_id = mid |
| | if member_id: |
| | add_event("visit", member_id=member_id, slot_idx=slot_idx, details=f"face_conf_ok", img=None, ts=ts.isoformat()) |
| | visited_members.add(member_id) |
| | else: |
| | add_event("intruder", member_id="", slot_idx=slot_idx, details="unknown_face", img=crop, ts=ts.isoformat()) |
| | fell, debug = detect_fall(frame, history=process_frame.history, ts=ts) |
| | if fell: |
| | add_event("anomaly_fall", slot_idx=slot_idx, details=str(debug), img=frame, ts=ts.isoformat()) |
| | return boxes, visited_members |
| |
|
| | process_frame.history = {} |
| |
|
| | if input_type == "Video (mp4/avi)": |
| | vid = st.file_uploader("Upload video", type=["mp4","avi","mov","mkv"]) |
| | fps = st.number_input("Assumed FPS (if missing)", 1, 60, 10) |
| | if st.button("Run Video Analysis") and vid is not None: |
| | t0 = datetime.utcnow() |
| | tmp_path = os.path.join(DATA_DIR, f"upload_{uuid.uuid4().hex}.mp4") |
| | with open(tmp_path, "wb") as f: f.write(vid.read()) |
| | cap = cv2.VideoCapture(tmp_path) |
| | frame_idx = 0 |
| | while True: |
| | ok, frame = cap.read() |
| | if not ok: break |
| | ts = t0 + timedelta(seconds=frame_idx/float(fps)) |
| | boxes, visited = process_frame(frame, ts) |
| | frame_idx += 1 |
| | cap.release() |
| | st.success("Video processed. Check Dashboard & Attendance tabs.") |
| | else: |
| | z = st.file_uploader("Upload a ZIP of frames (named by time order)", type=["zip"]) |
| | if st.button("Run Frames Analysis") and z is not None: |
| | tmp_zip = os.path.join(DATA_DIR, f"frames_{uuid.uuid4().hex}.zip") |
| | with open(tmp_zip, "wb") as f: f.write(z.read()) |
| | extract_dir = tmp_zip.replace(".zip","") |
| | shutil.unpack_archive(tmp_zip, extract_dir) |
| | paths = sorted([p for p in glob.glob(os.path.join(extract_dir, "**/*.*"), recursive=True) if p.lower().endswith((".jpg",".png",".jpeg"))]) |
| | t0 = datetime.utcnow() |
| | for i,p in enumerate(paths): |
| | frame = cv2.imread(p) |
| | if frame is None: continue |
| | ts = t0 + timedelta(seconds=i*0.5) |
| | process_frame(frame, ts) |
| | st.success("Frames processed.") |
| |
|
| | |
| | with tab_incidents: |
| | st.markdown("## Security Incidents") |
| | if not events.empty: |
| | anomalies = events[events["type"].str.contains("anomaly", na=False)] |
| | st.write(f"Anomalies recorded: **{len(anomalies)}**") |
| | st.dataframe(anomalies.sort_values("ts", ascending=False), use_container_width=True) |
| | else: |
| | st.info("No incidents recorded yet.") |
| |
|
| | |
| | st.markdown("---") |
| | c1,c2 = st.columns(2) |
| | with c1: |
| | if os.path.exists(EVENTS_CSV): |
| | with open(EVENTS_CSV, "rb") as fh: |
| | st.download_button("Download events.csv", data=fh, file_name="events.csv") |
| | with c2: |
| | st.caption("Pro tip: mount a Space Storage to persist `/data` across runs.") |
| |
|