Spaces:
Runtime error
Runtime error
File size: 12,273 Bytes
3d2ecec a8034fd 3d2ecec a8034fd 637375e a8034fd 010ddfd a8034fd cf45b0f 010ddfd bca95c6 a8034fd 010ddfd a8034fd 010ddfd a8034fd 3d2ecec a8034fd d7f9742 a8034fd 010ddfd bca95c6 010ddfd bca95c6 010ddfd a22abd7 010ddfd a8034fd 010ddfd a8034fd a22abd7 a8034fd 3d2ecec a8034fd 3d2ecec a8034fd 010ddfd a8034fd a22abd7 bca95c6 a8034fd a22abd7 a8034fd bca95c6 a8034fd 010ddfd a8034fd 010ddfd a8034fd 3d2ecec a8034fd 3d2ecec a8034fd aa938e5 a8034fd d14fed8 a8034fd d14fed8 a8034fd d14fed8 a8034fd d7f9742 a8034fd 010ddfd d7f9742 637375e a8034fd 637375e a8034fd 010ddfd a8034fd c75fd7e a8034fd 3d2ecec c75fd7e d14fed8 6d3e171 c75fd7e 6d3e171 c75fd7e 6d3e171 c75fd7e 6d3e171 a8034fd 92c7c48 3259094 3d2ecec a8034fd 3259094 637375e a8034fd 637375e 010ddfd 1b733bd d14fed8 6d3e171 c75fd7e a8034fd c75fd7e a8034fd 92c7c48 c75fd7e 3d2ecec c75fd7e a22abd7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
import os
import math
import shutil
import tempfile
import cv2
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
import gradio as gr
# ----------------------------
# Settings (optimized for speed, still robust)
# ----------------------------
UP_ANGLE = 125
DOWN_ANGLE = 90
# Aggressive sampling target (effective inference rate)
# 6 fps usually gives ~5x fewer YOLO calls on 30fps videos.
TARGET_FPS = 6.0
# Minimum rep duration in seconds (keeps behavior stable when stride changes)
MIN_REP_SECONDS = 0.33
# NEW (from our efficient logic): Maximum rep duration in seconds
# Prevents very long false reps when tracking fails.
MAX_REP_SECONDS = 8.0
# ----------------------------
# Load YOLO pose model (lazy)
# ----------------------------
_MODEL = None
def load_pose_model():
global _MODEL
if _MODEL is not None:
return _MODEL
from ultralytics import YOLO
last_err = None
for w in ["yolo11n-pose.pt", "yolov8n-pose.pt"]:
try:
_MODEL = YOLO(w)
print("Loaded model:", w)
return _MODEL
except Exception as e:
last_err = e
raise RuntimeError(f"Could not load YOLO pose model. Last error: {last_err}")
# ----------------------------
# Helpers
# ----------------------------
def angle_deg(a, b, c):
a = np.asarray(a, dtype=np.float32)
b = np.asarray(b, dtype=np.float32)
c = np.asarray(c, dtype=np.float32)
ba = a - b
bc = c - b
denom = (np.linalg.norm(ba) * np.linalg.norm(bc)) + 1e-9
cosv = np.clip(np.dot(ba, bc) / denom, -1.0, 1.0)
return float(math.degrees(math.acos(cosv)))
def pick_best_side(kxy, kconf):
left = [5, 7, 9] # L shoulder, L elbow, L wrist (YOLO COCO indices)
right = [6, 8, 10] # R shoulder, R elbow, R wrist
if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])):
return right, float(np.mean(kconf[right]))
return left, float(np.mean(kconf[left]))
def sigmoid(x):
return 1.0 / (1.0 + math.exp(-x))
def rep_likelihood(min_ang, max_ang, mean_conf):
ang_range = max_ang - min_ang
range_score = sigmoid((ang_range - 45) / 10)
depth_score = sigmoid((DOWN_ANGLE - min_ang) / 8)
lockout_score = sigmoid((max_ang - UP_ANGLE) / 8)
conf_score = float(np.clip(mean_conf, 0.0, 1.0))
return float(np.clip(range_score * depth_score * lockout_score * conf_score, 0.0, 1.0))
def likelihood_to_score(p):
p = float(np.clip(p, 0.0, 1.0))
buckets = [
(0.50, 1.00, 90, 100),
(0.45, 0.50, 80, 89),
(0.40, 0.45, 70, 79),
(0.35, 0.40, 60, 69),
(0.30, 0.35, 50, 59),
(0.25, 0.30, 40, 49),
(0.20, 0.25, 30, 39),
(0.15, 0.20, 20, 29),
(0.10, 0.15, 10, 19),
(0.00, 0.10, 0, 9),
]
for lo, hi, s_lo, s_hi in buckets:
if (lo <= p < hi) or (p == 1.0 and hi == 1.0):
t = (p - lo) / max(hi - lo, 1e-6)
return int(round(s_lo + t * (s_hi - s_lo)))
return 0
# ----------------------------
# Core pipeline
# ----------------------------
def analyze_pushup_video_yolo(video_path: str, out_dir: str):
model = load_pose_model()
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError("OpenCV could not open the video. Try a different mp4 encoding.")
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
# Compute stride to hit TARGET_FPS (effective inference rate)
frame_stride = max(1, int(round(float(fps) / float(TARGET_FPS))))
effective_fps = float(fps) / float(frame_stride)
# Convert time-based rep duration limits to sampled frames (matches our efficient logic)
min_rep_frames = int(math.ceil(MIN_REP_SECONDS * effective_fps))
min_rep_frames = max(2, min_rep_frames)
max_rep_frames = int(math.ceil(MAX_REP_SECONDS * effective_fps))
max_rep_frames = max(min_rep_frames + 2, max_rep_frames)
print(
f"[speed] video_fps={fps:.2f} target_fps={TARGET_FPS:.2f} "
f"stride={frame_stride} effective_fps={effective_fps:.2f} "
f"min_rep_frames={min_rep_frames} max_rep_frames={max_rep_frames}"
)
# 1) First pass: compute angles + confs per sampled frame
angles, confs, frame_ids = [], [], []
frame_i = 0
while True:
ok, frame = cap.read()
if not ok:
break
if frame_i % frame_stride != 0:
frame_i += 1
continue
res = model(frame, verbose=False)[0]
if res.keypoints is None or len(res.keypoints.xy) == 0:
angles.append(np.nan)
confs.append(0.0)
frame_ids.append(frame_i)
frame_i += 1
continue
kxy_all = res.keypoints.xy.cpu().numpy()
kconf_all = res.keypoints.conf.cpu().numpy()
# choose best person by mean confidence
pidx = int(np.argmax(np.mean(kconf_all, axis=1)))
kxy = kxy_all[pidx]
kconf = kconf_all[pidx]
ids, side_conf = pick_best_side(kxy, kconf)
if side_conf < 0.2:
angles.append(np.nan)
confs.append(float(side_conf))
frame_ids.append(frame_i)
frame_i += 1
continue
a, b, c = kxy[ids[0]], kxy[ids[1]], kxy[ids[2]]
angles.append(angle_deg(a, b, c))
confs.append(float(side_conf))
frame_ids.append(frame_i)
frame_i += 1
cap.release()
angles = np.array(angles, dtype=np.float32)
confs = np.array(confs, dtype=np.float32)
frame_ids = np.array(frame_ids, dtype=np.int32)
if len(angles) < 5:
raise RuntimeError("Video too short or no usable frames detected.")
# Interpolate missing angles
mask = np.isfinite(angles)
if np.any(mask) and not np.all(mask):
angles[~mask] = np.interp(frame_ids[~mask], frame_ids[mask], angles[mask])
elif not np.any(mask):
raise RuntimeError("No valid pose angles detected.")
# Smooth (match our efficient logic: ~1 second window scaled by effective_fps)
win = int(round(effective_fps * 1.0))
win = max(5, win)
if win % 2 == 0:
win += 1
win = min(win, (len(angles) // 2) * 2 + 1)
angles_smooth = savgol_filter(angles, win, 2)
# 2) Rep detection on smoothed angles (match our efficient logic)
reps = []
state = "WAIT_DOWN"
rep_min = rep_max = rep_conf_sum = rep_len = rep_start = None
for i, ang in enumerate(angles_smooth):
cf = float(confs[i])
if state == "WAIT_DOWN":
if ang <= DOWN_ANGLE:
state = "IN_DOWN"
rep_min = rep_max = float(ang)
rep_conf_sum = cf
rep_len = 1
rep_start = i
else:
rep_min = min(rep_min, float(ang))
rep_max = max(rep_max, float(ang))
rep_conf_sum += cf
rep_len += 1
# Abort absurdly long reps (tracking failure / stall)
if rep_len > max_rep_frames:
state = "WAIT_DOWN"
continue
if ang >= UP_ANGLE:
if rep_len >= min_rep_frames:
mean_cf = float(rep_conf_sum / rep_len)
likelihood = rep_likelihood(rep_min, rep_max, mean_cf)
score = likelihood_to_score(likelihood)
sf = int(frame_ids[rep_start])
ef = int(frame_ids[i])
reps.append({
"rep": len(reps) + 1,
"start_frame": sf,
"end_frame": ef,
"start_time_s": float(sf / fps),
"end_time_s": float(ef / fps),
"min_elbow_angle": float(rep_min),
"max_elbow_angle": float(rep_max),
"mean_kpt_conf": float(mean_cf),
"pushup_likelihood": float(likelihood),
"pushup_score": int(score),
})
state = "WAIT_DOWN"
# 3) Save CSV
csv_path = os.path.join(out_dir, "pushup_reps.csv")
df = pd.DataFrame(reps)
df.to_csv(csv_path, index=False)
# 4) Annotated video (kept original resolution)
annotated_path = os.path.join(out_dir, "pushup_annotated.mp4")
cap = cv2.VideoCapture(video_path)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(annotated_path, fourcc, fps, (w, h))
rep_windows = [(r["start_frame"], r["end_frame"], r["pushup_score"]) for r in reps]
frame_i = 0
while True:
ok, frame = cap.read()
if not ok:
break
active = next((s for sf, ef, s in rep_windows if sf <= frame_i <= ef), None)
count = sum(1 for _, ef, _ in rep_windows if ef < frame_i)
j = int(min(np.searchsorted(frame_ids, frame_i), len(angles_smooth) - 1))
ang_disp = float(angles_smooth[j])
cv2.putText(frame, f"Reps: {count}/{len(reps)}", (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,255), 2)
cv2.putText(frame, f"Elbow angle: {ang_disp:.1f}", (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
cv2.putText(frame, f"Rep score: {active if active is not None else '-'}", (20, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
writer.write(frame)
frame_i += 1
cap.release()
writer.release()
summary = {
"ok": True,
"error": None,
"rep_count": int(len(reps)),
"avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0,
"avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0,
"rep_events": reps,
"speed_settings": {
"video_fps": float(fps),
"target_fps": float(TARGET_FPS),
"frame_stride": int(frame_stride),
"effective_fps": float(effective_fps),
"min_rep_frames": int(min_rep_frames),
}
}
return summary, annotated_path, csv_path
# ----------------------------
# API wrapper
# ----------------------------
def api_analyze(uploaded_file):
if uploaded_file is None:
return {"ok": False, "error": "No file received.", "rep_count": 0, "rep_events": []}, None, None
workdir = tempfile.mkdtemp()
in_path = os.path.join(workdir, "input.mp4")
# Resolve source path robustly
src_path = None
if hasattr(uploaded_file, "path") and uploaded_file.path:
src_path = uploaded_file.path
elif isinstance(uploaded_file, dict) and uploaded_file.get("path"):
src_path = uploaded_file["path"]
elif hasattr(uploaded_file, "name") and uploaded_file.name:
src_path = uploaded_file.name
else:
src_path = str(uploaded_file)
ext = os.path.splitext(src_path)[1].lower()
allowed = {".mp4", ".mov", ".webm", ".mkv"}
if ext and ext not in allowed:
return {"ok": False, "error": f"Unsupported extension: {ext}. Use mp4/mov/webm/mkv.", "rep_count": 0, "rep_events": []}, None, None
shutil.copy(src_path, in_path)
try:
summary, annotated_path, csv_path = analyze_pushup_video_yolo(in_path, out_dir=workdir)
return summary, annotated_path, csv_path
except Exception as e:
return {"ok": False, "error": f"{type(e).__name__}: {e}", "rep_count": 0, "rep_events": []}, None, None
# ----------------------------
# Gradio UI + API endpoint
# ----------------------------
with gr.Blocks(title="Pushup API (YOLO)") as demo:
gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n")
# Keep gr.File to avoid Invalid file type issues
video_file = gr.File(label="Upload video")
btn = gr.Button("Analyze")
out_json = gr.JSON(label="Results JSON")
out_video = gr.Video(label="Annotated Output")
out_csv = gr.File(label="CSV Output")
btn.click(
fn=api_analyze,
inputs=[video_file],
outputs=[out_json, out_video, out_csv],
api_name="analyze",
)
if __name__ == "__main__":
demo.launch()
|