GymFit / app.py
SuriRaja's picture
Update app.py
b6e693d verified
import os
import io
import cv2
import glob
import time
import base64
import shutil
import uuid
import numpy as np
import pandas as pd
import streamlit as st
from datetime import datetime, timedelta
from collections import defaultdict, deque
# Optional but lightweight classical face recognizer
# Requires opencv-contrib-python
def make_lbph():
try:
return cv2.face.LBPHFaceRecognizer_create()
except Exception as e:
return None
# -------------------- THEME & STYLES --------------------
st.set_page_config(page_title="FitHub AI - Gym Ops", layout="wide", page_icon="💪")
DARK_CSS = """
<style>
:root {
--bg: #0f172a;
--panel: #111827;
--muted: #1f2937;
--card: #0b1220;
--text: #e5e7eb;
--subtext: #9ca3af;
--primary: #7c3aed;
--primary-2: #a78bfa;
--good: #22c55e;
--warn: #f59e0b;
--bad: #ef4444;
}
html, body, [class^="css"] {
background-color: var(--bg);
color: var(--text);
}
section.main > div { padding-top: 0.5rem; }
.block-container { padding-top: 1rem; max-width: 1400px; }
.card {
background: linear-gradient(180deg, rgba(124,58,237,0.06), rgba(10,10,10,0.4));
border: 1px solid rgba(167,139,250,0.25);
border-radius: 16px;
padding: 18px 18px 14px 18px;
box-shadow: 0 8px 20px rgba(0,0,0,0.35);
}
.metric { font-size: 28px; font-weight: 800; letter-spacing: .5px; }
.label { color: var(--subtext); font-size: 12px; text-transform: uppercase; }
.stButton>button {
background: radial-gradient(120% 120% at 50% 0%, var(--primary), #5b21b6);
border: 0;
color: white;
padding: .6rem 1rem;
border-radius: 12px;
}
.stDownloadButton>button { border-radius: 12px; }
</style>
"""
st.markdown(DARK_CSS, unsafe_allow_html=True)
st.markdown("""
<style>
h2, .stMarkdown h2 { letter-spacing: .3px; font-weight: 800; }
hr.sep { border: 0; height: 1px; background: rgba(255,255,255,0.08); margin: 8px 0 16px; }
</style>
""", unsafe_allow_html=True)
# -------------------- CONFIG --------------------
SLOTS = [
("05:00","07:00"),
("07:00","09:00"),
("09:00","11:00"),
("11:00","13:00"),
("13:00","15:00"),
("15:00","17:00"),
("17:00","19:00"),
("19:00","21:00"),
]
def slot_index_for(ts: datetime):
t = ts.time()
for i,(a,b) in enumerate(SLOTS):
a_dt = datetime.combine(ts.date(), datetime.strptime(a,"%H:%M").time())
b_dt = datetime.combine(ts.date(), datetime.strptime(b,"%H:%M").time())
if a_dt.time() <= t < b_dt.time():
return i
return None
DATA_DIR = "data"
FACES_DIR = os.path.join(DATA_DIR, "faces")
INTRUDERS_DIR = os.path.join(DATA_DIR, "intruders")
EVENTS_CSV = os.path.join(DATA_DIR, "events.csv")
MEMBERS_CSV = os.path.join(DATA_DIR, "members.csv")
SUBS_CSV = os.path.join(DATA_DIR, "subscriptions.csv")
os.makedirs(FACES_DIR, exist_ok=True)
os.makedirs(INTRUDERS_DIR, exist_ok=True)
os.makedirs(DATA_DIR, exist_ok=True)
# Ensure CSVs exist
for fp, cols in [
(MEMBERS_CSV, ["member_id","name","phone","plan","status"]),
(SUBS_CSV, ["plan","price","active_members"]),
(EVENTS_CSV, ["ts","type","member_id","slot_idx","details","img_path"]),
]:
if not os.path.exists(fp):
pd.DataFrame(columns=cols).to_csv(fp, index=False)
# -------------------- HELPERS --------------------
def load_df(path):
return pd.read_csv(path) if os.path.exists(path) else pd.DataFrame()
def save_df(df, path):
df.to_csv(path, index=False)
def add_event(evt_type, member_id=None, slot_idx=None, details="", img=None, ts=None):
ts = ts or datetime.utcnow().isoformat()
img_path = ""
if img is not None:
uid = str(uuid.uuid4())[:8] + ".jpg"
img_path = os.path.join(INTRUDERS_DIR if evt_type=="intruder" else DATA_DIR, uid)
cv2.imwrite(img_path, img)
df = load_df(EVENTS_CSV)
row = {"ts": ts, "type": evt_type, "member_id": member_id or "",
"slot_idx": slot_idx if slot_idx is not None else "",
"details": details, "img_path": img_path}
df = pd.concat([df, pd.DataFrame([row])], ignore_index=True)
save_df(df, EVENTS_CSV)
# -------------------- ML: DETECTORS --------------------
try:
import mediapipe as mp
mp_face = mp.solutions.face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
mp_pose = mp.solutions.pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
except Exception as e:
mp_face = None
mp_pose = None
def detect_faces(frame_bgr):
if mp_face is None: return []
img_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
res = mp_face.process(img_rgb)
boxes = []
if res.detections:
h, w = frame_bgr.shape[:2]
for det in res.detections:
bbox = det.location_data.relative_bounding_box
x, y, bw, bh = bbox.xmin, bbox.ymin, bbox.width, bbox.height
x1 = max(int(x * w), 0); y1 = max(int(y * h), 0)
x2 = min(int((x + bw) * w), w); y2 = min(int((y + bh) * h), h)
boxes.append((x1, y1, x2, y2, float(det.score[0])))
return boxes
def detect_fall(frame_bgr, history: dict, person_id=0, ts=None):
if mp_pose is None:
return False, {}
ts = ts or datetime.utcnow()
img_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
res = mp_pose.process(img_rgb)
if not res.pose_landmarks:
return False, {}
h, w = frame_bgr.shape[:2]
lm = res.pose_landmarks.landmark
def p(i):
return np.array([lm[i].x*w, lm[i].y*h])
nose = p(0); lhip = p(23); rhip = p(24); lsh = p(11); rsh = p(12)
hip = (lhip + rhip) / 2
shoulder = (lsh + rsh) / 2
body_vec = shoulder - hip
angle = abs(np.degrees(np.arctan2(body_vec[1], body_vec[0])))
q = history.setdefault("hip_y", deque(maxlen=10))
q.append(float(hip[1]))
sudden_drop = len(q) >= 5 and (q[-5] - q[-1]) < -40
horizontal = angle < 25
low_head = abs(nose[1] - hip[1]) < 80
fallen = (horizontal and low_head) or sudden_drop
return fallen, {"angle": angle, "hip_y": float(hip[1])}
def train_lbph_from_faces():
rec = make_lbph()
if rec is None:
return None, {}
images, labels = [], []
label_to_member, label = {}, 0
for member_id in os.listdir(FACES_DIR):
p = os.path.join(FACES_DIR, member_id)
if not os.path.isdir(p):
continue
for imgp in glob.glob(os.path.join(p, "*.jpg")):
img = cv2.imread(imgp, cv2.IMREAD_GRAYSCALE)
if img is None: continue
images.append(img)
labels.append(label)
label_to_member[label] = member_id
label += 1
if not images:
return None, {}
rec.train(images, np.array(labels))
return rec, label_to_member
@st.cache_resource(show_spinner=False)
def get_face_model():
return train_lbph_from_faces()
def recognize_or_intruder(face_img_bgr, rec, label_map, conf_thresh=65):
g = cv2.cvtColor(cv2.resize(face_img_bgr, (200,200)), cv2.COLOR_BGR2GRAY)
try:
pred, conf = rec.predict(g)
except Exception as e:
return None, 1000.0
member_id = label_map.get(int(pred))
return member_id, conf
# -------------------- SIDEBAR --------------------
with st.sidebar:
st.markdown("### FitHub • Admin")
st.write("AI-powered gym operations — visitors, intruders, attendance, and anomaly detection from CCTV frames.")
st.markdown("---")
st.markdown("**Time Slots (2h, 5am–9pm)**")
for i,(a,b) in enumerate(SLOTS):
st.caption(f"{i+1}. {a}{b}")
st.markdown("---")
st.caption("Upload videos or a frames folder. The app detects faces, recognizes members if trained, logs visitors, flags intruders, and detects possible falls.")
# -------------------- DATA LOAD --------------------
def metric_card(label, value, delta=None, help_txt=None):
delta_html = f'<div class="label">{delta}</div>' if delta else ""
html = f"""
<div class="card">
<div class="label">{label}</div>
<div class="metric">{value}</div>
{delta_html}
</div>
"""
st.markdown(html, unsafe_allow_html=True)
def load_df(path):
return pd.read_csv(path) if os.path.exists(path) else pd.DataFrame()
members = load_df(MEMBERS_CSV)
subs = load_df(SUBS_CSV)
events = load_df(EVENTS_CSV)
# -------------------- TABS --------------------
tab_dash, tab_att, tab_members, tab_subs, tab_ingest, tab_incidents = st.tabs([
"📊 Dashboard","🧑‍🤝‍🧑 Attendance","👥 Members","💳 Subscriptions","📹 Process CCTV","🚨 Incidents"
])
# -------------------- DASHBOARD --------------------
with tab_dash:
st.markdown("## Admin Dashboard")
c1,c2,c3,c4 = st.columns(4)
with c1: metric_card("Total Members", len(members))
month = datetime.utcnow().strftime("%Y-%m")
month_events = events[events["ts"].astype(str).str.startswith(month)] if not events.empty else pd.DataFrame()
with c2: metric_card("Monthly Visitors", len(month_events[month_events["type"]=="visit"]) if not month_events.empty else 0)
with c3: metric_card("Active Subscriptions", int(subs["active_members"].sum()) if not subs.empty else 0)
today_str = datetime.utcnow().date().isoformat()
with c4: metric_card("Check-ins Today", len(events[events["ts"].astype(str).str.startswith(today_str)]) if not events.empty else 0)
st.markdown("### Expected vs Actual per Slot (Today)")
expected_rate = st.slider("Expected % of active members per slot", 0, 100, 18, 1)
active_members = int(subs["active_members"].sum()) if not subs.empty else len(members)
expected = [int(active_members * expected_rate / 100) for _ in SLOTS]
slot_counts = [0]*len(SLOTS)
if not events.empty:
today = events[events["ts"].astype(str).str.startswith(today_str)]
for idx in today["slot_idx"].dropna().astype(int).values:
if 0 <= idx < len(slot_counts): slot_counts[idx] += 1
else:
today = pd.DataFrame()
df = pd.DataFrame({
"Slot": [f"{a}-{b}" for a,b in SLOTS],
"Expected": expected,
"Actual": slot_counts
})
import altair as alt
df_long = df.melt("Slot", var_name="Type", value_name="Count")
chart = (
alt.Chart(df_long)
.mark_bar()
.encode(
x=alt.X("Slot:N", sort=list(df["Slot"])),
y=alt.Y("Count:Q"),
color=alt.Color("Type:N"),
tooltip=["Slot","Type","Count"]
)
)
st.altair_chart(chart, use_container_width=True)
st.markdown("### Less Attendance (Last 7 days)")
low_df = pd.DataFrame(columns=["member_id","checkins_7d"])
if not events.empty:
mask = (pd.to_datetime(events["ts"]) >= (datetime.utcnow()-timedelta(days=7)))
vis = events[mask & (events["type"]=="visit")]
if not vis.empty:
cnt = vis.groupby("member_id").size().reset_index(name="checkins_7d")
low_df = cnt[cnt["checkins_7d"] < st.number_input("Threshold check-ins/week", 0, 20, 2)]
st.dataframe(low_df, use_container_width=True)
st.markdown("### Intruders (Today)")
if not today.empty:
intr_today = today[today["type"]=="intruder"]
if not intr_today.empty:
img_cols = st.columns(6)
for i,(_,row) in enumerate(intr_today.head(24).iterrows()):
c = img_cols[i%6]
with c:
if row["img_path"] and os.path.exists(row["img_path"]):
st.image(row["img_path"], caption=row["ts"].split("T")[1][:8], use_column_width=True)
# -------------------- ATTENDANCE --------------------
with tab_att:
st.markdown("## Attendance Tracker")
member_ids = members["member_id"].tolist() if not members.empty else []
colA, colB = st.columns([1,2])
with colA:
sel_member = st.selectbox("Select member", ["-"]+member_ids)
period = st.selectbox("Time Period", ["Today","This Week","This Month"])
if sel_member != "-":
dfm = events[(events["type"]=="visit") & (events["member_id"]==sel_member)].copy() if not events.empty else pd.DataFrame()
if period == "Today":
dfm = dfm[dfm["ts"].astype(str).str.startswith(datetime.utcnow().date().isoformat())]
elif period == "This Week":
start = (datetime.utcnow()-timedelta(days=7)).date().isoformat()
dfm = dfm[dfm["ts"] >= start]
elif period == "This Month":
month = datetime.utcnow().strftime("%Y-%m")
dfm = dfm[dfm["ts"].astype(str).str.startswith(month)]
st.write(f"Check-ins: **{len(dfm)}**")
if not dfm.empty:
dfm["Slot"] = dfm["slot_idx"].astype(int).map(lambda i: f"{SLOTS[i][0]}-{SLOTS[i][1]}" if 0<=i<len(SLOTS) else "?")
st.dataframe(dfm[["ts","Slot","details"]], use_container_width=True)
# -------------------- MEMBERS --------------------
with tab_members:
st.markdown("## Members")
st.caption("Add members with a unique **member_id**. Optionally drop face images into `data/faces/<member_id>/*.jpg` and click **Train Face Model**.")
st.write("### Roster")
st.dataframe(members, use_container_width=True)
with st.expander("➕ Add Member"):
mid = st.text_input("Member ID")
name = st.text_input("Name")
phone = st.text_input("Phone")
plan = st.text_input("Plan", value="Basic")
if st.button("Add Member"):
if mid and (members.empty or mid not in members["member_id"].values):
new = pd.DataFrame([{"member_id":mid,"name":name,"phone":phone,"plan":plan,"status":"active"}])
df_all = pd.concat([members,new], ignore_index=True) if not members.empty else new
df_all.to_csv(MEMBERS_CSV, index=False)
st.success("Member added. Reload the app to refresh.")
else:
st.error("Member ID exists or invalid.")
st.markdown("### Face Recognition")
if st.button("Train Face Model"):
rec, label_map = train_lbph_from_faces()
if rec:
st.success(f"Trained on {len(label_map)} members")
else:
st.warning("No training data found or recognizer unavailable.")
# -------------------- SUBSCRIPTIONS --------------------
with tab_subs:
st.markdown("## Subscriptions")
st.dataframe(subs, use_container_width=True)
with st.expander("➕ Add/Update Plan Summary"):
plan = st.text_input("Plan Name", "Basic")
price = st.number_input("Price (₹/30 days)", 0, 100000, 999)
active = st.number_input("Active Members", 0, 100000, 0)
if st.button("Save Plan"):
df = load_df(SUBS_CSV)
if not df.empty and plan in df["plan"].values:
df.loc[df["plan"]==plan, ["price","active_members"]] = [price, active]
else:
df = pd.concat([df, pd.DataFrame([{"plan":plan,"price":price,"active_members":active}])], ignore_index=True)
df.to_csv(SUBS_CSV, index=False)
st.success("Saved. Reload to refresh.")
# -------------------- PROCESSING --------------------
with tab_ingest:
st.markdown("## Process CCTV Footage")
st.caption("Upload a **video** or a **frames zip/folder**. We detect faces, recognize members if trained, log visits per 2h slot, capture intruders, and flag possible falls.")
input_type = st.radio("Input Type", ["Video (mp4/avi)","Frames Folder"], horizontal=True)
face_model = None
label_map = {}
rec_cached = get_face_model()
if rec_cached and rec_cached[0] is not None:
face_model, label_map = rec_cached
conf_thresh = st.slider("LBPH confidence threshold (lower = stricter)", 40, 120, 65)
def process_frame(frame, ts):
slot_idx = slot_index_for(ts)
boxes = detect_faces(frame)
visited_members = set()
for (x1,y1,x2,y2,score) in boxes:
crop = frame[y1:y2, x1:x2].copy()
member_id = None
if face_model is not None and len(label_map):
mid, conf = recognize_or_intruder(crop, face_model, label_map, conf_thresh)
if mid is not None and conf <= conf_thresh:
member_id = mid
if member_id:
add_event("visit", member_id=member_id, slot_idx=slot_idx, details=f"face_conf_ok", img=None, ts=ts.isoformat())
visited_members.add(member_id)
else:
add_event("intruder", member_id="", slot_idx=slot_idx, details="unknown_face", img=crop, ts=ts.isoformat())
fell, debug = detect_fall(frame, history=process_frame.history, ts=ts)
if fell:
add_event("anomaly_fall", slot_idx=slot_idx, details=str(debug), img=frame, ts=ts.isoformat())
return boxes, visited_members
process_frame.history = {}
if input_type == "Video (mp4/avi)":
vid = st.file_uploader("Upload video", type=["mp4","avi","mov","mkv"])
fps = st.number_input("Assumed FPS (if missing)", 1, 60, 10)
if st.button("Run Video Analysis") and vid is not None:
t0 = datetime.utcnow()
tmp_path = os.path.join(DATA_DIR, f"upload_{uuid.uuid4().hex}.mp4")
with open(tmp_path, "wb") as f: f.write(vid.read())
cap = cv2.VideoCapture(tmp_path)
frame_idx = 0
while True:
ok, frame = cap.read()
if not ok: break
ts = t0 + timedelta(seconds=frame_idx/float(fps))
boxes, visited = process_frame(frame, ts)
frame_idx += 1
cap.release()
st.success("Video processed. Check Dashboard & Attendance tabs.")
else:
z = st.file_uploader("Upload a ZIP of frames (named by time order)", type=["zip"])
if st.button("Run Frames Analysis") and z is not None:
tmp_zip = os.path.join(DATA_DIR, f"frames_{uuid.uuid4().hex}.zip")
with open(tmp_zip, "wb") as f: f.write(z.read())
extract_dir = tmp_zip.replace(".zip","")
shutil.unpack_archive(tmp_zip, extract_dir)
paths = sorted([p for p in glob.glob(os.path.join(extract_dir, "**/*.*"), recursive=True) if p.lower().endswith((".jpg",".png",".jpeg"))])
t0 = datetime.utcnow()
for i,p in enumerate(paths):
frame = cv2.imread(p)
if frame is None: continue
ts = t0 + timedelta(seconds=i*0.5) # assume 2 FPS
process_frame(frame, ts)
st.success("Frames processed.")
# -------------------- INCIDENTS --------------------
with tab_incidents:
st.markdown("## Security Incidents")
if not events.empty:
anomalies = events[events["type"].str.contains("anomaly", na=False)]
st.write(f"Anomalies recorded: **{len(anomalies)}**")
st.dataframe(anomalies.sort_values("ts", ascending=False), use_container_width=True)
else:
st.info("No incidents recorded yet.")
# -------------------- UTIL: EXPORT --------------------
st.markdown("---")
c1,c2 = st.columns(2)
with c1:
if os.path.exists(EVENTS_CSV):
with open(EVENTS_CSV, "rb") as fh:
st.download_button("Download events.csv", data=fh, file_name="events.csv")
with c2:
st.caption("Pro tip: mount a Space Storage to persist `/data` across runs.")