import os import math import shutil import tempfile import cv2 import numpy as np import pandas as pd from scipy.signal import savgol_filter import gradio as gr # ---------------------------- # Settings (optimized for speed, still robust) # ---------------------------- UP_ANGLE = 125 DOWN_ANGLE = 90 # Aggressive sampling target (effective inference rate) # 6 fps usually gives ~5x fewer YOLO calls on 30fps videos. TARGET_FPS = 6.0 # Minimum rep duration in seconds (keeps behavior stable when stride changes) MIN_REP_SECONDS = 0.33 # NEW (from our efficient logic): Maximum rep duration in seconds # Prevents very long false reps when tracking fails. MAX_REP_SECONDS = 8.0 # ---------------------------- # Load YOLO pose model (lazy) # ---------------------------- _MODEL = None def load_pose_model(): global _MODEL if _MODEL is not None: return _MODEL from ultralytics import YOLO last_err = None for w in ["yolo11n-pose.pt", "yolov8n-pose.pt"]: try: _MODEL = YOLO(w) print("Loaded model:", w) return _MODEL except Exception as e: last_err = e raise RuntimeError(f"Could not load YOLO pose model. Last error: {last_err}") # ---------------------------- # Helpers # ---------------------------- def angle_deg(a, b, c): a = np.asarray(a, dtype=np.float32) b = np.asarray(b, dtype=np.float32) c = np.asarray(c, dtype=np.float32) ba = a - b bc = c - b denom = (np.linalg.norm(ba) * np.linalg.norm(bc)) + 1e-9 cosv = np.clip(np.dot(ba, bc) / denom, -1.0, 1.0) return float(math.degrees(math.acos(cosv))) def pick_best_side(kxy, kconf): left = [5, 7, 9] # L shoulder, L elbow, L wrist (YOLO COCO indices) right = [6, 8, 10] # R shoulder, R elbow, R wrist if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])): return right, float(np.mean(kconf[right])) return left, float(np.mean(kconf[left])) def sigmoid(x): return 1.0 / (1.0 + math.exp(-x)) def rep_likelihood(min_ang, max_ang, mean_conf): ang_range = max_ang - min_ang range_score = sigmoid((ang_range - 45) / 10) depth_score = sigmoid((DOWN_ANGLE - min_ang) / 8) lockout_score = sigmoid((max_ang - UP_ANGLE) / 8) conf_score = float(np.clip(mean_conf, 0.0, 1.0)) return float(np.clip(range_score * depth_score * lockout_score * conf_score, 0.0, 1.0)) def likelihood_to_score(p): p = float(np.clip(p, 0.0, 1.0)) buckets = [ (0.50, 1.00, 90, 100), (0.45, 0.50, 80, 89), (0.40, 0.45, 70, 79), (0.35, 0.40, 60, 69), (0.30, 0.35, 50, 59), (0.25, 0.30, 40, 49), (0.20, 0.25, 30, 39), (0.15, 0.20, 20, 29), (0.10, 0.15, 10, 19), (0.00, 0.10, 0, 9), ] for lo, hi, s_lo, s_hi in buckets: if (lo <= p < hi) or (p == 1.0 and hi == 1.0): t = (p - lo) / max(hi - lo, 1e-6) return int(round(s_lo + t * (s_hi - s_lo))) return 0 # ---------------------------- # Core pipeline # ---------------------------- def analyze_pushup_video_yolo(video_path: str, out_dir: str): model = load_pose_model() cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError("OpenCV could not open the video. Try a different mp4 encoding.") fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0 h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0 # Compute stride to hit TARGET_FPS (effective inference rate) frame_stride = max(1, int(round(float(fps) / float(TARGET_FPS)))) effective_fps = float(fps) / float(frame_stride) # Convert time-based rep duration limits to sampled frames (matches our efficient logic) min_rep_frames = int(math.ceil(MIN_REP_SECONDS * effective_fps)) min_rep_frames = max(2, min_rep_frames) max_rep_frames = int(math.ceil(MAX_REP_SECONDS * effective_fps)) max_rep_frames = max(min_rep_frames + 2, max_rep_frames) print( f"[speed] video_fps={fps:.2f} target_fps={TARGET_FPS:.2f} " f"stride={frame_stride} effective_fps={effective_fps:.2f} " f"min_rep_frames={min_rep_frames} max_rep_frames={max_rep_frames}" ) # 1) First pass: compute angles + confs per sampled frame angles, confs, frame_ids = [], [], [] frame_i = 0 while True: ok, frame = cap.read() if not ok: break if frame_i % frame_stride != 0: frame_i += 1 continue res = model(frame, verbose=False)[0] if res.keypoints is None or len(res.keypoints.xy) == 0: angles.append(np.nan) confs.append(0.0) frame_ids.append(frame_i) frame_i += 1 continue kxy_all = res.keypoints.xy.cpu().numpy() kconf_all = res.keypoints.conf.cpu().numpy() # choose best person by mean confidence pidx = int(np.argmax(np.mean(kconf_all, axis=1))) kxy = kxy_all[pidx] kconf = kconf_all[pidx] ids, side_conf = pick_best_side(kxy, kconf) if side_conf < 0.2: angles.append(np.nan) confs.append(float(side_conf)) frame_ids.append(frame_i) frame_i += 1 continue a, b, c = kxy[ids[0]], kxy[ids[1]], kxy[ids[2]] angles.append(angle_deg(a, b, c)) confs.append(float(side_conf)) frame_ids.append(frame_i) frame_i += 1 cap.release() angles = np.array(angles, dtype=np.float32) confs = np.array(confs, dtype=np.float32) frame_ids = np.array(frame_ids, dtype=np.int32) if len(angles) < 5: raise RuntimeError("Video too short or no usable frames detected.") # Interpolate missing angles mask = np.isfinite(angles) if np.any(mask) and not np.all(mask): angles[~mask] = np.interp(frame_ids[~mask], frame_ids[mask], angles[mask]) elif not np.any(mask): raise RuntimeError("No valid pose angles detected.") # Smooth (match our efficient logic: ~1 second window scaled by effective_fps) win = int(round(effective_fps * 1.0)) win = max(5, win) if win % 2 == 0: win += 1 win = min(win, (len(angles) // 2) * 2 + 1) angles_smooth = savgol_filter(angles, win, 2) # 2) Rep detection on smoothed angles (match our efficient logic) reps = [] state = "WAIT_DOWN" rep_min = rep_max = rep_conf_sum = rep_len = rep_start = None for i, ang in enumerate(angles_smooth): cf = float(confs[i]) if state == "WAIT_DOWN": if ang <= DOWN_ANGLE: state = "IN_DOWN" rep_min = rep_max = float(ang) rep_conf_sum = cf rep_len = 1 rep_start = i else: rep_min = min(rep_min, float(ang)) rep_max = max(rep_max, float(ang)) rep_conf_sum += cf rep_len += 1 # Abort absurdly long reps (tracking failure / stall) if rep_len > max_rep_frames: state = "WAIT_DOWN" continue if ang >= UP_ANGLE: if rep_len >= min_rep_frames: mean_cf = float(rep_conf_sum / rep_len) likelihood = rep_likelihood(rep_min, rep_max, mean_cf) score = likelihood_to_score(likelihood) sf = int(frame_ids[rep_start]) ef = int(frame_ids[i]) reps.append({ "rep": len(reps) + 1, "start_frame": sf, "end_frame": ef, "start_time_s": float(sf / fps), "end_time_s": float(ef / fps), "min_elbow_angle": float(rep_min), "max_elbow_angle": float(rep_max), "mean_kpt_conf": float(mean_cf), "pushup_likelihood": float(likelihood), "pushup_score": int(score), }) state = "WAIT_DOWN" # 3) Save CSV csv_path = os.path.join(out_dir, "pushup_reps.csv") df = pd.DataFrame(reps) df.to_csv(csv_path, index=False) # 4) Annotated video (kept original resolution) annotated_path = os.path.join(out_dir, "pushup_annotated.mp4") cap = cv2.VideoCapture(video_path) fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(annotated_path, fourcc, fps, (w, h)) rep_windows = [(r["start_frame"], r["end_frame"], r["pushup_score"]) for r in reps] frame_i = 0 while True: ok, frame = cap.read() if not ok: break active = next((s for sf, ef, s in rep_windows if sf <= frame_i <= ef), None) count = sum(1 for _, ef, _ in rep_windows if ef < frame_i) j = int(min(np.searchsorted(frame_ids, frame_i), len(angles_smooth) - 1)) ang_disp = float(angles_smooth[j]) cv2.putText(frame, f"Reps: {count}/{len(reps)}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,255), 2) cv2.putText(frame, f"Elbow angle: {ang_disp:.1f}", (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2) cv2.putText(frame, f"Rep score: {active if active is not None else '-'}", (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2) writer.write(frame) frame_i += 1 cap.release() writer.release() summary = { "ok": True, "error": None, "rep_count": int(len(reps)), "avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0, "avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0, "rep_events": reps, "speed_settings": { "video_fps": float(fps), "target_fps": float(TARGET_FPS), "frame_stride": int(frame_stride), "effective_fps": float(effective_fps), "min_rep_frames": int(min_rep_frames), } } return summary, annotated_path, csv_path # ---------------------------- # API wrapper # ---------------------------- def api_analyze(uploaded_file): if uploaded_file is None: return {"ok": False, "error": "No file received.", "rep_count": 0, "rep_events": []}, None, None workdir = tempfile.mkdtemp() in_path = os.path.join(workdir, "input.mp4") # Resolve source path robustly src_path = None if hasattr(uploaded_file, "path") and uploaded_file.path: src_path = uploaded_file.path elif isinstance(uploaded_file, dict) and uploaded_file.get("path"): src_path = uploaded_file["path"] elif hasattr(uploaded_file, "name") and uploaded_file.name: src_path = uploaded_file.name else: src_path = str(uploaded_file) ext = os.path.splitext(src_path)[1].lower() allowed = {".mp4", ".mov", ".webm", ".mkv"} if ext and ext not in allowed: return {"ok": False, "error": f"Unsupported extension: {ext}. Use mp4/mov/webm/mkv.", "rep_count": 0, "rep_events": []}, None, None shutil.copy(src_path, in_path) try: summary, annotated_path, csv_path = analyze_pushup_video_yolo(in_path, out_dir=workdir) return summary, annotated_path, csv_path except Exception as e: return {"ok": False, "error": f"{type(e).__name__}: {e}", "rep_count": 0, "rep_events": []}, None, None # ---------------------------- # Gradio UI + API endpoint # ---------------------------- with gr.Blocks(title="Pushup API (YOLO)") as demo: gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n") # Keep gr.File to avoid Invalid file type issues video_file = gr.File(label="Upload video") btn = gr.Button("Analyze") out_json = gr.JSON(label="Results JSON") out_video = gr.Video(label="Annotated Output") out_csv = gr.File(label="CSV Output") btn.click( fn=api_analyze, inputs=[video_file], outputs=[out_json, out_video, out_csv], api_name="analyze", ) if __name__ == "__main__": demo.launch()