gibil's picture
Update app.py
cf45b0f verified
import os
import math
import shutil
import tempfile
import cv2
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
import gradio as gr
# ----------------------------
# Settings (optimized for speed, still robust)
# ----------------------------
UP_ANGLE = 125
DOWN_ANGLE = 90
# Aggressive sampling target (effective inference rate)
# 6 fps usually gives ~5x fewer YOLO calls on 30fps videos.
TARGET_FPS = 6.0
# Minimum rep duration in seconds (keeps behavior stable when stride changes)
MIN_REP_SECONDS = 0.33
# NEW (from our efficient logic): Maximum rep duration in seconds
# Prevents very long false reps when tracking fails.
MAX_REP_SECONDS = 8.0
# ----------------------------
# Load YOLO pose model (lazy)
# ----------------------------
_MODEL = None
def load_pose_model():
global _MODEL
if _MODEL is not None:
return _MODEL
from ultralytics import YOLO
last_err = None
for w in ["yolo11n-pose.pt", "yolov8n-pose.pt"]:
try:
_MODEL = YOLO(w)
print("Loaded model:", w)
return _MODEL
except Exception as e:
last_err = e
raise RuntimeError(f"Could not load YOLO pose model. Last error: {last_err}")
# ----------------------------
# Helpers
# ----------------------------
def angle_deg(a, b, c):
a = np.asarray(a, dtype=np.float32)
b = np.asarray(b, dtype=np.float32)
c = np.asarray(c, dtype=np.float32)
ba = a - b
bc = c - b
denom = (np.linalg.norm(ba) * np.linalg.norm(bc)) + 1e-9
cosv = np.clip(np.dot(ba, bc) / denom, -1.0, 1.0)
return float(math.degrees(math.acos(cosv)))
def pick_best_side(kxy, kconf):
left = [5, 7, 9] # L shoulder, L elbow, L wrist (YOLO COCO indices)
right = [6, 8, 10] # R shoulder, R elbow, R wrist
if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])):
return right, float(np.mean(kconf[right]))
return left, float(np.mean(kconf[left]))
def sigmoid(x):
return 1.0 / (1.0 + math.exp(-x))
def rep_likelihood(min_ang, max_ang, mean_conf):
ang_range = max_ang - min_ang
range_score = sigmoid((ang_range - 45) / 10)
depth_score = sigmoid((DOWN_ANGLE - min_ang) / 8)
lockout_score = sigmoid((max_ang - UP_ANGLE) / 8)
conf_score = float(np.clip(mean_conf, 0.0, 1.0))
return float(np.clip(range_score * depth_score * lockout_score * conf_score, 0.0, 1.0))
def likelihood_to_score(p):
p = float(np.clip(p, 0.0, 1.0))
buckets = [
(0.50, 1.00, 90, 100),
(0.45, 0.50, 80, 89),
(0.40, 0.45, 70, 79),
(0.35, 0.40, 60, 69),
(0.30, 0.35, 50, 59),
(0.25, 0.30, 40, 49),
(0.20, 0.25, 30, 39),
(0.15, 0.20, 20, 29),
(0.10, 0.15, 10, 19),
(0.00, 0.10, 0, 9),
]
for lo, hi, s_lo, s_hi in buckets:
if (lo <= p < hi) or (p == 1.0 and hi == 1.0):
t = (p - lo) / max(hi - lo, 1e-6)
return int(round(s_lo + t * (s_hi - s_lo)))
return 0
# ----------------------------
# Core pipeline
# ----------------------------
def analyze_pushup_video_yolo(video_path: str, out_dir: str):
model = load_pose_model()
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError("OpenCV could not open the video. Try a different mp4 encoding.")
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
# Compute stride to hit TARGET_FPS (effective inference rate)
frame_stride = max(1, int(round(float(fps) / float(TARGET_FPS))))
effective_fps = float(fps) / float(frame_stride)
# Convert time-based rep duration limits to sampled frames (matches our efficient logic)
min_rep_frames = int(math.ceil(MIN_REP_SECONDS * effective_fps))
min_rep_frames = max(2, min_rep_frames)
max_rep_frames = int(math.ceil(MAX_REP_SECONDS * effective_fps))
max_rep_frames = max(min_rep_frames + 2, max_rep_frames)
print(
f"[speed] video_fps={fps:.2f} target_fps={TARGET_FPS:.2f} "
f"stride={frame_stride} effective_fps={effective_fps:.2f} "
f"min_rep_frames={min_rep_frames} max_rep_frames={max_rep_frames}"
)
# 1) First pass: compute angles + confs per sampled frame
angles, confs, frame_ids = [], [], []
frame_i = 0
while True:
ok, frame = cap.read()
if not ok:
break
if frame_i % frame_stride != 0:
frame_i += 1
continue
res = model(frame, verbose=False)[0]
if res.keypoints is None or len(res.keypoints.xy) == 0:
angles.append(np.nan)
confs.append(0.0)
frame_ids.append(frame_i)
frame_i += 1
continue
kxy_all = res.keypoints.xy.cpu().numpy()
kconf_all = res.keypoints.conf.cpu().numpy()
# choose best person by mean confidence
pidx = int(np.argmax(np.mean(kconf_all, axis=1)))
kxy = kxy_all[pidx]
kconf = kconf_all[pidx]
ids, side_conf = pick_best_side(kxy, kconf)
if side_conf < 0.2:
angles.append(np.nan)
confs.append(float(side_conf))
frame_ids.append(frame_i)
frame_i += 1
continue
a, b, c = kxy[ids[0]], kxy[ids[1]], kxy[ids[2]]
angles.append(angle_deg(a, b, c))
confs.append(float(side_conf))
frame_ids.append(frame_i)
frame_i += 1
cap.release()
angles = np.array(angles, dtype=np.float32)
confs = np.array(confs, dtype=np.float32)
frame_ids = np.array(frame_ids, dtype=np.int32)
if len(angles) < 5:
raise RuntimeError("Video too short or no usable frames detected.")
# Interpolate missing angles
mask = np.isfinite(angles)
if np.any(mask) and not np.all(mask):
angles[~mask] = np.interp(frame_ids[~mask], frame_ids[mask], angles[mask])
elif not np.any(mask):
raise RuntimeError("No valid pose angles detected.")
# Smooth (match our efficient logic: ~1 second window scaled by effective_fps)
win = int(round(effective_fps * 1.0))
win = max(5, win)
if win % 2 == 0:
win += 1
win = min(win, (len(angles) // 2) * 2 + 1)
angles_smooth = savgol_filter(angles, win, 2)
# 2) Rep detection on smoothed angles (match our efficient logic)
reps = []
state = "WAIT_DOWN"
rep_min = rep_max = rep_conf_sum = rep_len = rep_start = None
for i, ang in enumerate(angles_smooth):
cf = float(confs[i])
if state == "WAIT_DOWN":
if ang <= DOWN_ANGLE:
state = "IN_DOWN"
rep_min = rep_max = float(ang)
rep_conf_sum = cf
rep_len = 1
rep_start = i
else:
rep_min = min(rep_min, float(ang))
rep_max = max(rep_max, float(ang))
rep_conf_sum += cf
rep_len += 1
# Abort absurdly long reps (tracking failure / stall)
if rep_len > max_rep_frames:
state = "WAIT_DOWN"
continue
if ang >= UP_ANGLE:
if rep_len >= min_rep_frames:
mean_cf = float(rep_conf_sum / rep_len)
likelihood = rep_likelihood(rep_min, rep_max, mean_cf)
score = likelihood_to_score(likelihood)
sf = int(frame_ids[rep_start])
ef = int(frame_ids[i])
reps.append({
"rep": len(reps) + 1,
"start_frame": sf,
"end_frame": ef,
"start_time_s": float(sf / fps),
"end_time_s": float(ef / fps),
"min_elbow_angle": float(rep_min),
"max_elbow_angle": float(rep_max),
"mean_kpt_conf": float(mean_cf),
"pushup_likelihood": float(likelihood),
"pushup_score": int(score),
})
state = "WAIT_DOWN"
# 3) Save CSV
csv_path = os.path.join(out_dir, "pushup_reps.csv")
df = pd.DataFrame(reps)
df.to_csv(csv_path, index=False)
# 4) Annotated video (kept original resolution)
annotated_path = os.path.join(out_dir, "pushup_annotated.mp4")
cap = cv2.VideoCapture(video_path)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(annotated_path, fourcc, fps, (w, h))
rep_windows = [(r["start_frame"], r["end_frame"], r["pushup_score"]) for r in reps]
frame_i = 0
while True:
ok, frame = cap.read()
if not ok:
break
active = next((s for sf, ef, s in rep_windows if sf <= frame_i <= ef), None)
count = sum(1 for _, ef, _ in rep_windows if ef < frame_i)
j = int(min(np.searchsorted(frame_ids, frame_i), len(angles_smooth) - 1))
ang_disp = float(angles_smooth[j])
cv2.putText(frame, f"Reps: {count}/{len(reps)}", (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,255), 2)
cv2.putText(frame, f"Elbow angle: {ang_disp:.1f}", (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
cv2.putText(frame, f"Rep score: {active if active is not None else '-'}", (20, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
writer.write(frame)
frame_i += 1
cap.release()
writer.release()
summary = {
"ok": True,
"error": None,
"rep_count": int(len(reps)),
"avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0,
"avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0,
"rep_events": reps,
"speed_settings": {
"video_fps": float(fps),
"target_fps": float(TARGET_FPS),
"frame_stride": int(frame_stride),
"effective_fps": float(effective_fps),
"min_rep_frames": int(min_rep_frames),
}
}
return summary, annotated_path, csv_path
# ----------------------------
# API wrapper
# ----------------------------
def api_analyze(uploaded_file):
if uploaded_file is None:
return {"ok": False, "error": "No file received.", "rep_count": 0, "rep_events": []}, None, None
workdir = tempfile.mkdtemp()
in_path = os.path.join(workdir, "input.mp4")
# Resolve source path robustly
src_path = None
if hasattr(uploaded_file, "path") and uploaded_file.path:
src_path = uploaded_file.path
elif isinstance(uploaded_file, dict) and uploaded_file.get("path"):
src_path = uploaded_file["path"]
elif hasattr(uploaded_file, "name") and uploaded_file.name:
src_path = uploaded_file.name
else:
src_path = str(uploaded_file)
ext = os.path.splitext(src_path)[1].lower()
allowed = {".mp4", ".mov", ".webm", ".mkv"}
if ext and ext not in allowed:
return {"ok": False, "error": f"Unsupported extension: {ext}. Use mp4/mov/webm/mkv.", "rep_count": 0, "rep_events": []}, None, None
shutil.copy(src_path, in_path)
try:
summary, annotated_path, csv_path = analyze_pushup_video_yolo(in_path, out_dir=workdir)
return summary, annotated_path, csv_path
except Exception as e:
return {"ok": False, "error": f"{type(e).__name__}: {e}", "rep_count": 0, "rep_events": []}, None, None
# ----------------------------
# Gradio UI + API endpoint
# ----------------------------
with gr.Blocks(title="Pushup API (YOLO)") as demo:
gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n")
# Keep gr.File to avoid Invalid file type issues
video_file = gr.File(label="Upload video")
btn = gr.Button("Analyze")
out_json = gr.JSON(label="Results JSON")
out_video = gr.Video(label="Annotated Output")
out_csv = gr.File(label="CSV Output")
btn.click(
fn=api_analyze,
inputs=[video_file],
outputs=[out_json, out_video, out_csv],
api_name="analyze",
)
if __name__ == "__main__":
demo.launch()