Spaces:
Runtime error
Runtime error
Update app.py
Browse files
app.py
CHANGED
|
@@ -11,12 +11,21 @@ import gradio as gr
|
|
| 11 |
|
| 12 |
|
| 13 |
# ----------------------------
|
| 14 |
-
# Settings (
|
| 15 |
# ----------------------------
|
| 16 |
UP_ANGLE = 155
|
| 17 |
DOWN_ANGLE = 105
|
| 18 |
-
|
| 19 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 20 |
|
| 21 |
|
| 22 |
# ----------------------------
|
|
@@ -43,7 +52,7 @@ def load_pose_model():
|
|
| 43 |
|
| 44 |
|
| 45 |
# ----------------------------
|
| 46 |
-
# Helpers
|
| 47 |
# ----------------------------
|
| 48 |
def angle_deg(a, b, c):
|
| 49 |
a = np.asarray(a, dtype=np.float32)
|
|
@@ -56,8 +65,8 @@ def angle_deg(a, b, c):
|
|
| 56 |
return float(math.degrees(math.acos(cosv)))
|
| 57 |
|
| 58 |
def pick_best_side(kxy, kconf):
|
| 59 |
-
left = [5, 7, 9]
|
| 60 |
-
right = [6, 8, 10]
|
| 61 |
if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])):
|
| 62 |
return right, float(np.mean(kconf[right]))
|
| 63 |
return left, float(np.mean(kconf[left]))
|
|
@@ -93,6 +102,17 @@ def likelihood_to_score(p):
|
|
| 93 |
return int(round(s_lo + t * (s_hi - s_lo)))
|
| 94 |
return 0
|
| 95 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 96 |
|
| 97 |
# ----------------------------
|
| 98 |
# Core pipeline
|
|
@@ -108,6 +128,21 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 108 |
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
|
| 109 |
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
|
| 110 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 111 |
# 1) First pass: compute angles + confs per sampled frame
|
| 112 |
angles, confs, frame_ids = [], [], []
|
| 113 |
frame_i = 0
|
|
@@ -117,11 +152,13 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 117 |
if not ok:
|
| 118 |
break
|
| 119 |
|
| 120 |
-
if frame_i %
|
| 121 |
frame_i += 1
|
| 122 |
continue
|
| 123 |
|
| 124 |
-
|
|
|
|
|
|
|
| 125 |
if res.keypoints is None or len(res.keypoints.xy) == 0:
|
| 126 |
angles.append(np.nan)
|
| 127 |
confs.append(0.0)
|
|
@@ -160,14 +197,16 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 160 |
if len(angles) < 5:
|
| 161 |
raise RuntimeError("Video too short or no usable frames detected.")
|
| 162 |
|
|
|
|
| 163 |
mask = np.isfinite(angles)
|
| 164 |
if np.any(mask) and not np.all(mask):
|
| 165 |
angles[~mask] = np.interp(frame_ids[~mask], frame_ids[mask], angles[mask])
|
| 166 |
elif not np.any(mask):
|
| 167 |
raise RuntimeError("No valid pose angles detected.")
|
| 168 |
|
|
|
|
| 169 |
win = min(31, (len(angles) // 2) * 2 + 1)
|
| 170 |
-
win = max(win, 5)
|
| 171 |
angles_smooth = savgol_filter(angles, win, 2)
|
| 172 |
|
| 173 |
# 2) Rep detection on smoothed angles
|
|
@@ -192,7 +231,7 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 192 |
rep_len += 1
|
| 193 |
|
| 194 |
if ang >= UP_ANGLE:
|
| 195 |
-
if rep_len >=
|
| 196 |
mean_cf = float(rep_conf_sum / rep_len)
|
| 197 |
likelihood = rep_likelihood(rep_min, rep_max, mean_cf)
|
| 198 |
score = likelihood_to_score(likelihood)
|
|
@@ -220,7 +259,7 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 220 |
df = pd.DataFrame(reps)
|
| 221 |
df.to_csv(csv_path, index=False)
|
| 222 |
|
| 223 |
-
# 4) Annotated video
|
| 224 |
annotated_path = os.path.join(out_dir, "pushup_annotated.mp4")
|
| 225 |
cap = cv2.VideoCapture(video_path)
|
| 226 |
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
|
@@ -260,13 +299,21 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 260 |
"avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0,
|
| 261 |
"avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0,
|
| 262 |
"rep_events": reps,
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 263 |
}
|
| 264 |
|
| 265 |
return summary, annotated_path, csv_path
|
| 266 |
|
| 267 |
|
| 268 |
# ----------------------------
|
| 269 |
-
# API wrapper
|
| 270 |
# ----------------------------
|
| 271 |
def api_analyze(uploaded_file):
|
| 272 |
if uploaded_file is None:
|
|
@@ -286,7 +333,6 @@ def api_analyze(uploaded_file):
|
|
| 286 |
else:
|
| 287 |
src_path = str(uploaded_file)
|
| 288 |
|
| 289 |
-
# Optional extension check (same idea as your old code)
|
| 290 |
ext = os.path.splitext(src_path)[1].lower()
|
| 291 |
allowed = {".mp4", ".mov", ".webm", ".mkv"}
|
| 292 |
if ext and ext not in allowed:
|
|
@@ -307,7 +353,7 @@ def api_analyze(uploaded_file):
|
|
| 307 |
with gr.Blocks(title="Pushup API (YOLO)") as demo:
|
| 308 |
gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n")
|
| 309 |
|
| 310 |
-
#
|
| 311 |
video_file = gr.File(label="Upload video")
|
| 312 |
|
| 313 |
btn = gr.Button("Analyze")
|
|
@@ -323,4 +369,4 @@ with gr.Blocks(title="Pushup API (YOLO)") as demo:
|
|
| 323 |
)
|
| 324 |
|
| 325 |
if __name__ == "__main__":
|
| 326 |
-
demo.launch()
|
|
|
|
| 11 |
|
| 12 |
|
| 13 |
# ----------------------------
|
| 14 |
+
# Settings (optimized for speed, still robust)
|
| 15 |
# ----------------------------
|
| 16 |
UP_ANGLE = 155
|
| 17 |
DOWN_ANGLE = 105
|
| 18 |
+
|
| 19 |
+
# Aggressive sampling target (effective inference rate)
|
| 20 |
+
# 6 fps usually gives ~5x fewer YOLO calls on 30fps videos.
|
| 21 |
+
TARGET_FPS = 6.0
|
| 22 |
+
|
| 23 |
+
# Minimum rep duration in seconds (keeps behavior stable when stride changes)
|
| 24 |
+
MIN_REP_SECONDS = 0.33
|
| 25 |
+
|
| 26 |
+
# Downscale only (no crop) for YOLO inference
|
| 27 |
+
# 640 is a safe default across varying camera angles.
|
| 28 |
+
MAX_INFER_SIDE = 640
|
| 29 |
|
| 30 |
|
| 31 |
# ----------------------------
|
|
|
|
| 52 |
|
| 53 |
|
| 54 |
# ----------------------------
|
| 55 |
+
# Helpers
|
| 56 |
# ----------------------------
|
| 57 |
def angle_deg(a, b, c):
|
| 58 |
a = np.asarray(a, dtype=np.float32)
|
|
|
|
| 65 |
return float(math.degrees(math.acos(cosv)))
|
| 66 |
|
| 67 |
def pick_best_side(kxy, kconf):
|
| 68 |
+
left = [5, 7, 9] # L shoulder, L elbow, L wrist (YOLO COCO indices)
|
| 69 |
+
right = [6, 8, 10] # R shoulder, R elbow, R wrist
|
| 70 |
if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])):
|
| 71 |
return right, float(np.mean(kconf[right]))
|
| 72 |
return left, float(np.mean(kconf[left]))
|
|
|
|
| 102 |
return int(round(s_lo + t * (s_hi - s_lo)))
|
| 103 |
return 0
|
| 104 |
|
| 105 |
+
def resize_for_inference(frame, max_side=640):
|
| 106 |
+
"""Downscale only (no crop) to speed YOLO. Keeps aspect ratio."""
|
| 107 |
+
h, w = frame.shape[:2]
|
| 108 |
+
m = max(h, w)
|
| 109 |
+
if m <= max_side:
|
| 110 |
+
return frame
|
| 111 |
+
scale = max_side / float(m)
|
| 112 |
+
new_w = int(round(w * scale))
|
| 113 |
+
new_h = int(round(h * scale))
|
| 114 |
+
return cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
| 115 |
+
|
| 116 |
|
| 117 |
# ----------------------------
|
| 118 |
# Core pipeline
|
|
|
|
| 128 |
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
|
| 129 |
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
|
| 130 |
|
| 131 |
+
# Compute stride to hit TARGET_FPS (effective inference rate)
|
| 132 |
+
frame_stride = max(1, int(round(float(fps) / float(TARGET_FPS))))
|
| 133 |
+
effective_fps = float(fps) / float(frame_stride)
|
| 134 |
+
|
| 135 |
+
# Convert time-based minimum rep duration to sampled frames
|
| 136 |
+
min_rep_frames = int(math.ceil(MIN_REP_SECONDS * effective_fps))
|
| 137 |
+
# Clamp so we don't reject real reps at low fps
|
| 138 |
+
min_rep_frames = max(2, min_rep_frames)
|
| 139 |
+
|
| 140 |
+
print(
|
| 141 |
+
f"[speed] video_fps={fps:.2f} target_fps={TARGET_FPS:.2f} "
|
| 142 |
+
f"stride={frame_stride} effective_fps={effective_fps:.2f} "
|
| 143 |
+
f"min_rep_frames={min_rep_frames} infer_max_side={MAX_INFER_SIDE}"
|
| 144 |
+
)
|
| 145 |
+
|
| 146 |
# 1) First pass: compute angles + confs per sampled frame
|
| 147 |
angles, confs, frame_ids = [], [], []
|
| 148 |
frame_i = 0
|
|
|
|
| 152 |
if not ok:
|
| 153 |
break
|
| 154 |
|
| 155 |
+
if frame_i % frame_stride != 0:
|
| 156 |
frame_i += 1
|
| 157 |
continue
|
| 158 |
|
| 159 |
+
infer_frame = resize_for_inference(frame, MAX_INFER_SIDE)
|
| 160 |
+
|
| 161 |
+
res = model(infer_frame, verbose=False)[0]
|
| 162 |
if res.keypoints is None or len(res.keypoints.xy) == 0:
|
| 163 |
angles.append(np.nan)
|
| 164 |
confs.append(0.0)
|
|
|
|
| 197 |
if len(angles) < 5:
|
| 198 |
raise RuntimeError("Video too short or no usable frames detected.")
|
| 199 |
|
| 200 |
+
# Interpolate missing angles
|
| 201 |
mask = np.isfinite(angles)
|
| 202 |
if np.any(mask) and not np.all(mask):
|
| 203 |
angles[~mask] = np.interp(frame_ids[~mask], frame_ids[mask], angles[mask])
|
| 204 |
elif not np.any(mask):
|
| 205 |
raise RuntimeError("No valid pose angles detected.")
|
| 206 |
|
| 207 |
+
# Smooth
|
| 208 |
win = min(31, (len(angles) // 2) * 2 + 1)
|
| 209 |
+
win = max(win, 5)
|
| 210 |
angles_smooth = savgol_filter(angles, win, 2)
|
| 211 |
|
| 212 |
# 2) Rep detection on smoothed angles
|
|
|
|
| 231 |
rep_len += 1
|
| 232 |
|
| 233 |
if ang >= UP_ANGLE:
|
| 234 |
+
if rep_len >= min_rep_frames:
|
| 235 |
mean_cf = float(rep_conf_sum / rep_len)
|
| 236 |
likelihood = rep_likelihood(rep_min, rep_max, mean_cf)
|
| 237 |
score = likelihood_to_score(likelihood)
|
|
|
|
| 259 |
df = pd.DataFrame(reps)
|
| 260 |
df.to_csv(csv_path, index=False)
|
| 261 |
|
| 262 |
+
# 4) Annotated video (kept original resolution)
|
| 263 |
annotated_path = os.path.join(out_dir, "pushup_annotated.mp4")
|
| 264 |
cap = cv2.VideoCapture(video_path)
|
| 265 |
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
|
|
|
| 299 |
"avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0,
|
| 300 |
"avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0,
|
| 301 |
"rep_events": reps,
|
| 302 |
+
"speed_settings": {
|
| 303 |
+
"video_fps": float(fps),
|
| 304 |
+
"target_fps": float(TARGET_FPS),
|
| 305 |
+
"frame_stride": int(frame_stride),
|
| 306 |
+
"effective_fps": float(effective_fps),
|
| 307 |
+
"min_rep_frames": int(min_rep_frames),
|
| 308 |
+
"max_infer_side": int(MAX_INFER_SIDE),
|
| 309 |
+
}
|
| 310 |
}
|
| 311 |
|
| 312 |
return summary, annotated_path, csv_path
|
| 313 |
|
| 314 |
|
| 315 |
# ----------------------------
|
| 316 |
+
# API wrapper
|
| 317 |
# ----------------------------
|
| 318 |
def api_analyze(uploaded_file):
|
| 319 |
if uploaded_file is None:
|
|
|
|
| 333 |
else:
|
| 334 |
src_path = str(uploaded_file)
|
| 335 |
|
|
|
|
| 336 |
ext = os.path.splitext(src_path)[1].lower()
|
| 337 |
allowed = {".mp4", ".mov", ".webm", ".mkv"}
|
| 338 |
if ext and ext not in allowed:
|
|
|
|
| 353 |
with gr.Blocks(title="Pushup API (YOLO)") as demo:
|
| 354 |
gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n")
|
| 355 |
|
| 356 |
+
# Keep gr.File to avoid Invalid file type issues
|
| 357 |
video_file = gr.File(label="Upload video")
|
| 358 |
|
| 359 |
btn = gr.Button("Analyze")
|
|
|
|
| 369 |
)
|
| 370 |
|
| 371 |
if __name__ == "__main__":
|
| 372 |
+
demo.launch()
|