Spaces:
Runtime error
Runtime error
| import os | |
| import math | |
| import shutil | |
| import tempfile | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.signal import savgol_filter | |
| import gradio as gr | |
| # ---------------------------- | |
| # Settings (optimized for speed, still robust) | |
| # ---------------------------- | |
| UP_ANGLE = 125 | |
| DOWN_ANGLE = 90 | |
| # Aggressive sampling target (effective inference rate) | |
| # 6 fps usually gives ~5x fewer YOLO calls on 30fps videos. | |
| TARGET_FPS = 6.0 | |
| # Minimum rep duration in seconds (keeps behavior stable when stride changes) | |
| MIN_REP_SECONDS = 0.33 | |
| # NEW (from our efficient logic): Maximum rep duration in seconds | |
| # Prevents very long false reps when tracking fails. | |
| MAX_REP_SECONDS = 8.0 | |
| # ---------------------------- | |
| # Load YOLO pose model (lazy) | |
| # ---------------------------- | |
| _MODEL = None | |
| def load_pose_model(): | |
| global _MODEL | |
| if _MODEL is not None: | |
| return _MODEL | |
| from ultralytics import YOLO | |
| last_err = None | |
| for w in ["yolo11n-pose.pt", "yolov8n-pose.pt"]: | |
| try: | |
| _MODEL = YOLO(w) | |
| print("Loaded model:", w) | |
| return _MODEL | |
| except Exception as e: | |
| last_err = e | |
| raise RuntimeError(f"Could not load YOLO pose model. Last error: {last_err}") | |
| # ---------------------------- | |
| # Helpers | |
| # ---------------------------- | |
| def angle_deg(a, b, c): | |
| a = np.asarray(a, dtype=np.float32) | |
| b = np.asarray(b, dtype=np.float32) | |
| c = np.asarray(c, dtype=np.float32) | |
| ba = a - b | |
| bc = c - b | |
| denom = (np.linalg.norm(ba) * np.linalg.norm(bc)) + 1e-9 | |
| cosv = np.clip(np.dot(ba, bc) / denom, -1.0, 1.0) | |
| return float(math.degrees(math.acos(cosv))) | |
| def pick_best_side(kxy, kconf): | |
| left = [5, 7, 9] # L shoulder, L elbow, L wrist (YOLO COCO indices) | |
| right = [6, 8, 10] # R shoulder, R elbow, R wrist | |
| if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])): | |
| return right, float(np.mean(kconf[right])) | |
| return left, float(np.mean(kconf[left])) | |
| def sigmoid(x): | |
| return 1.0 / (1.0 + math.exp(-x)) | |
| def rep_likelihood(min_ang, max_ang, mean_conf): | |
| ang_range = max_ang - min_ang | |
| range_score = sigmoid((ang_range - 45) / 10) | |
| depth_score = sigmoid((DOWN_ANGLE - min_ang) / 8) | |
| lockout_score = sigmoid((max_ang - UP_ANGLE) / 8) | |
| conf_score = float(np.clip(mean_conf, 0.0, 1.0)) | |
| return float(np.clip(range_score * depth_score * lockout_score * conf_score, 0.0, 1.0)) | |
| def likelihood_to_score(p): | |
| p = float(np.clip(p, 0.0, 1.0)) | |
| buckets = [ | |
| (0.50, 1.00, 90, 100), | |
| (0.45, 0.50, 80, 89), | |
| (0.40, 0.45, 70, 79), | |
| (0.35, 0.40, 60, 69), | |
| (0.30, 0.35, 50, 59), | |
| (0.25, 0.30, 40, 49), | |
| (0.20, 0.25, 30, 39), | |
| (0.15, 0.20, 20, 29), | |
| (0.10, 0.15, 10, 19), | |
| (0.00, 0.10, 0, 9), | |
| ] | |
| for lo, hi, s_lo, s_hi in buckets: | |
| if (lo <= p < hi) or (p == 1.0 and hi == 1.0): | |
| t = (p - lo) / max(hi - lo, 1e-6) | |
| return int(round(s_lo + t * (s_hi - s_lo))) | |
| return 0 | |
| # ---------------------------- | |
| # Core pipeline | |
| # ---------------------------- | |
| def analyze_pushup_video_yolo(video_path: str, out_dir: str): | |
| model = load_pose_model() | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError("OpenCV could not open the video. Try a different mp4 encoding.") | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0 | |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0 | |
| # Compute stride to hit TARGET_FPS (effective inference rate) | |
| frame_stride = max(1, int(round(float(fps) / float(TARGET_FPS)))) | |
| effective_fps = float(fps) / float(frame_stride) | |
| # Convert time-based rep duration limits to sampled frames (matches our efficient logic) | |
| min_rep_frames = int(math.ceil(MIN_REP_SECONDS * effective_fps)) | |
| min_rep_frames = max(2, min_rep_frames) | |
| max_rep_frames = int(math.ceil(MAX_REP_SECONDS * effective_fps)) | |
| max_rep_frames = max(min_rep_frames + 2, max_rep_frames) | |
| print( | |
| f"[speed] video_fps={fps:.2f} target_fps={TARGET_FPS:.2f} " | |
| f"stride={frame_stride} effective_fps={effective_fps:.2f} " | |
| f"min_rep_frames={min_rep_frames} max_rep_frames={max_rep_frames}" | |
| ) | |
| # 1) First pass: compute angles + confs per sampled frame | |
| angles, confs, frame_ids = [], [], [] | |
| frame_i = 0 | |
| while True: | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| if frame_i % frame_stride != 0: | |
| frame_i += 1 | |
| continue | |
| res = model(frame, verbose=False)[0] | |
| if res.keypoints is None or len(res.keypoints.xy) == 0: | |
| angles.append(np.nan) | |
| confs.append(0.0) | |
| frame_ids.append(frame_i) | |
| frame_i += 1 | |
| continue | |
| kxy_all = res.keypoints.xy.cpu().numpy() | |
| kconf_all = res.keypoints.conf.cpu().numpy() | |
| # choose best person by mean confidence | |
| pidx = int(np.argmax(np.mean(kconf_all, axis=1))) | |
| kxy = kxy_all[pidx] | |
| kconf = kconf_all[pidx] | |
| ids, side_conf = pick_best_side(kxy, kconf) | |
| if side_conf < 0.2: | |
| angles.append(np.nan) | |
| confs.append(float(side_conf)) | |
| frame_ids.append(frame_i) | |
| frame_i += 1 | |
| continue | |
| a, b, c = kxy[ids[0]], kxy[ids[1]], kxy[ids[2]] | |
| angles.append(angle_deg(a, b, c)) | |
| confs.append(float(side_conf)) | |
| frame_ids.append(frame_i) | |
| frame_i += 1 | |
| cap.release() | |
| angles = np.array(angles, dtype=np.float32) | |
| confs = np.array(confs, dtype=np.float32) | |
| frame_ids = np.array(frame_ids, dtype=np.int32) | |
| if len(angles) < 5: | |
| raise RuntimeError("Video too short or no usable frames detected.") | |
| # Interpolate missing angles | |
| mask = np.isfinite(angles) | |
| if np.any(mask) and not np.all(mask): | |
| angles[~mask] = np.interp(frame_ids[~mask], frame_ids[mask], angles[mask]) | |
| elif not np.any(mask): | |
| raise RuntimeError("No valid pose angles detected.") | |
| # Smooth (match our efficient logic: ~1 second window scaled by effective_fps) | |
| win = int(round(effective_fps * 1.0)) | |
| win = max(5, win) | |
| if win % 2 == 0: | |
| win += 1 | |
| win = min(win, (len(angles) // 2) * 2 + 1) | |
| angles_smooth = savgol_filter(angles, win, 2) | |
| # 2) Rep detection on smoothed angles (match our efficient logic) | |
| reps = [] | |
| state = "WAIT_DOWN" | |
| rep_min = rep_max = rep_conf_sum = rep_len = rep_start = None | |
| for i, ang in enumerate(angles_smooth): | |
| cf = float(confs[i]) | |
| if state == "WAIT_DOWN": | |
| if ang <= DOWN_ANGLE: | |
| state = "IN_DOWN" | |
| rep_min = rep_max = float(ang) | |
| rep_conf_sum = cf | |
| rep_len = 1 | |
| rep_start = i | |
| else: | |
| rep_min = min(rep_min, float(ang)) | |
| rep_max = max(rep_max, float(ang)) | |
| rep_conf_sum += cf | |
| rep_len += 1 | |
| # Abort absurdly long reps (tracking failure / stall) | |
| if rep_len > max_rep_frames: | |
| state = "WAIT_DOWN" | |
| continue | |
| if ang >= UP_ANGLE: | |
| if rep_len >= min_rep_frames: | |
| mean_cf = float(rep_conf_sum / rep_len) | |
| likelihood = rep_likelihood(rep_min, rep_max, mean_cf) | |
| score = likelihood_to_score(likelihood) | |
| sf = int(frame_ids[rep_start]) | |
| ef = int(frame_ids[i]) | |
| reps.append({ | |
| "rep": len(reps) + 1, | |
| "start_frame": sf, | |
| "end_frame": ef, | |
| "start_time_s": float(sf / fps), | |
| "end_time_s": float(ef / fps), | |
| "min_elbow_angle": float(rep_min), | |
| "max_elbow_angle": float(rep_max), | |
| "mean_kpt_conf": float(mean_cf), | |
| "pushup_likelihood": float(likelihood), | |
| "pushup_score": int(score), | |
| }) | |
| state = "WAIT_DOWN" | |
| # 3) Save CSV | |
| csv_path = os.path.join(out_dir, "pushup_reps.csv") | |
| df = pd.DataFrame(reps) | |
| df.to_csv(csv_path, index=False) | |
| # 4) Annotated video (kept original resolution) | |
| annotated_path = os.path.join(out_dir, "pushup_annotated.mp4") | |
| cap = cv2.VideoCapture(video_path) | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| writer = cv2.VideoWriter(annotated_path, fourcc, fps, (w, h)) | |
| rep_windows = [(r["start_frame"], r["end_frame"], r["pushup_score"]) for r in reps] | |
| frame_i = 0 | |
| while True: | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| active = next((s for sf, ef, s in rep_windows if sf <= frame_i <= ef), None) | |
| count = sum(1 for _, ef, _ in rep_windows if ef < frame_i) | |
| j = int(min(np.searchsorted(frame_ids, frame_i), len(angles_smooth) - 1)) | |
| ang_disp = float(angles_smooth[j]) | |
| cv2.putText(frame, f"Reps: {count}/{len(reps)}", (20, 40), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,255), 2) | |
| cv2.putText(frame, f"Elbow angle: {ang_disp:.1f}", (20, 80), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2) | |
| cv2.putText(frame, f"Rep score: {active if active is not None else '-'}", (20, 120), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2) | |
| writer.write(frame) | |
| frame_i += 1 | |
| cap.release() | |
| writer.release() | |
| summary = { | |
| "ok": True, | |
| "error": None, | |
| "rep_count": int(len(reps)), | |
| "avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0, | |
| "avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0, | |
| "rep_events": reps, | |
| "speed_settings": { | |
| "video_fps": float(fps), | |
| "target_fps": float(TARGET_FPS), | |
| "frame_stride": int(frame_stride), | |
| "effective_fps": float(effective_fps), | |
| "min_rep_frames": int(min_rep_frames), | |
| } | |
| } | |
| return summary, annotated_path, csv_path | |
| # ---------------------------- | |
| # API wrapper | |
| # ---------------------------- | |
| def api_analyze(uploaded_file): | |
| if uploaded_file is None: | |
| return {"ok": False, "error": "No file received.", "rep_count": 0, "rep_events": []}, None, None | |
| workdir = tempfile.mkdtemp() | |
| in_path = os.path.join(workdir, "input.mp4") | |
| # Resolve source path robustly | |
| src_path = None | |
| if hasattr(uploaded_file, "path") and uploaded_file.path: | |
| src_path = uploaded_file.path | |
| elif isinstance(uploaded_file, dict) and uploaded_file.get("path"): | |
| src_path = uploaded_file["path"] | |
| elif hasattr(uploaded_file, "name") and uploaded_file.name: | |
| src_path = uploaded_file.name | |
| else: | |
| src_path = str(uploaded_file) | |
| ext = os.path.splitext(src_path)[1].lower() | |
| allowed = {".mp4", ".mov", ".webm", ".mkv"} | |
| if ext and ext not in allowed: | |
| return {"ok": False, "error": f"Unsupported extension: {ext}. Use mp4/mov/webm/mkv.", "rep_count": 0, "rep_events": []}, None, None | |
| shutil.copy(src_path, in_path) | |
| try: | |
| summary, annotated_path, csv_path = analyze_pushup_video_yolo(in_path, out_dir=workdir) | |
| return summary, annotated_path, csv_path | |
| except Exception as e: | |
| return {"ok": False, "error": f"{type(e).__name__}: {e}", "rep_count": 0, "rep_events": []}, None, None | |
| # ---------------------------- | |
| # Gradio UI + API endpoint | |
| # ---------------------------- | |
| with gr.Blocks(title="Pushup API (YOLO)") as demo: | |
| gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n") | |
| # Keep gr.File to avoid Invalid file type issues | |
| video_file = gr.File(label="Upload video") | |
| btn = gr.Button("Analyze") | |
| out_json = gr.JSON(label="Results JSON") | |
| out_video = gr.Video(label="Annotated Output") | |
| out_csv = gr.File(label="CSV Output") | |
| btn.click( | |
| fn=api_analyze, | |
| inputs=[video_file], | |
| outputs=[out_json, out_video, out_csv], | |
| api_name="analyze", | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |