Spaces:
Runtime error
Runtime error
Update app.py
Browse files
app.py
CHANGED
|
@@ -11,12 +11,22 @@ import gradio as gr
|
|
| 11 |
|
| 12 |
|
| 13 |
# ----------------------------
|
| 14 |
-
# Settings
|
| 15 |
# ----------------------------
|
| 16 |
UP_ANGLE = 155
|
| 17 |
DOWN_ANGLE = 105
|
| 18 |
-
|
| 19 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 20 |
|
| 21 |
|
| 22 |
# ----------------------------
|
|
@@ -43,7 +53,7 @@ def load_pose_model():
|
|
| 43 |
|
| 44 |
|
| 45 |
# ----------------------------
|
| 46 |
-
# Helpers
|
| 47 |
# ----------------------------
|
| 48 |
def angle_deg(a, b, c):
|
| 49 |
a = np.asarray(a, dtype=np.float32)
|
|
@@ -56,8 +66,8 @@ def angle_deg(a, b, c):
|
|
| 56 |
return float(math.degrees(math.acos(cosv)))
|
| 57 |
|
| 58 |
def pick_best_side(kxy, kconf):
|
| 59 |
-
left = [5, 7, 9]
|
| 60 |
-
right = [6, 8, 10]
|
| 61 |
if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])):
|
| 62 |
return right, float(np.mean(kconf[right]))
|
| 63 |
return left, float(np.mean(kconf[left]))
|
|
@@ -93,6 +103,16 @@ def likelihood_to_score(p):
|
|
| 93 |
return int(round(s_lo + t * (s_hi - s_lo)))
|
| 94 |
return 0
|
| 95 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 96 |
|
| 97 |
# ----------------------------
|
| 98 |
# Core pipeline
|
|
@@ -108,6 +128,17 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 108 |
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
|
| 109 |
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
|
| 110 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 111 |
# 1) First pass: compute angles + confs per sampled frame
|
| 112 |
angles, confs, frame_ids = [], [], []
|
| 113 |
frame_i = 0
|
|
@@ -117,11 +148,13 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 117 |
if not ok:
|
| 118 |
break
|
| 119 |
|
| 120 |
-
if frame_i %
|
| 121 |
frame_i += 1
|
| 122 |
continue
|
| 123 |
|
| 124 |
-
|
|
|
|
|
|
|
| 125 |
if res.keypoints is None or len(res.keypoints.xy) == 0:
|
| 126 |
angles.append(np.nan)
|
| 127 |
confs.append(0.0)
|
|
@@ -167,7 +200,7 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 167 |
raise RuntimeError("No valid pose angles detected.")
|
| 168 |
|
| 169 |
win = min(31, (len(angles) // 2) * 2 + 1)
|
| 170 |
-
win = max(win, 5)
|
| 171 |
angles_smooth = savgol_filter(angles, win, 2)
|
| 172 |
|
| 173 |
# 2) Rep detection on smoothed angles
|
|
@@ -192,7 +225,7 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 192 |
rep_len += 1
|
| 193 |
|
| 194 |
if ang >= UP_ANGLE:
|
| 195 |
-
if rep_len >=
|
| 196 |
mean_cf = float(rep_conf_sum / rep_len)
|
| 197 |
likelihood = rep_likelihood(rep_min, rep_max, mean_cf)
|
| 198 |
score = likelihood_to_score(likelihood)
|
|
@@ -220,7 +253,7 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 220 |
df = pd.DataFrame(reps)
|
| 221 |
df.to_csv(csv_path, index=False)
|
| 222 |
|
| 223 |
-
# 4) Annotated video
|
| 224 |
annotated_path = os.path.join(out_dir, "pushup_annotated.mp4")
|
| 225 |
cap = cv2.VideoCapture(video_path)
|
| 226 |
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
|
@@ -241,11 +274,11 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 241 |
ang_disp = float(angles_smooth[j])
|
| 242 |
|
| 243 |
cv2.putText(frame, f"Reps: {count}/{len(reps)}", (20, 40),
|
| 244 |
-
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,255), 2)
|
| 245 |
cv2.putText(frame, f"Elbow angle: {ang_disp:.1f}", (20, 80),
|
| 246 |
-
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
|
| 247 |
cv2.putText(frame, f"Rep score: {active if active is not None else '-'}", (20, 120),
|
| 248 |
-
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
|
| 249 |
|
| 250 |
writer.write(frame)
|
| 251 |
frame_i += 1
|
|
@@ -260,13 +293,21 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
|
|
| 260 |
"avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0,
|
| 261 |
"avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0,
|
| 262 |
"rep_events": reps,
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 263 |
}
|
| 264 |
|
| 265 |
return summary, annotated_path, csv_path
|
| 266 |
|
| 267 |
|
| 268 |
# ----------------------------
|
| 269 |
-
# API wrapper
|
| 270 |
# ----------------------------
|
| 271 |
def api_analyze(uploaded_file):
|
| 272 |
if uploaded_file is None:
|
|
@@ -275,7 +316,6 @@ def api_analyze(uploaded_file):
|
|
| 275 |
workdir = tempfile.mkdtemp()
|
| 276 |
in_path = os.path.join(workdir, "input.mp4")
|
| 277 |
|
| 278 |
-
# Resolve source path robustly
|
| 279 |
src_path = None
|
| 280 |
if hasattr(uploaded_file, "path") and uploaded_file.path:
|
| 281 |
src_path = uploaded_file.path
|
|
@@ -286,7 +326,6 @@ def api_analyze(uploaded_file):
|
|
| 286 |
else:
|
| 287 |
src_path = str(uploaded_file)
|
| 288 |
|
| 289 |
-
# Optional extension check (same idea as your old code)
|
| 290 |
ext = os.path.splitext(src_path)[1].lower()
|
| 291 |
allowed = {".mp4", ".mov", ".webm", ".mkv"}
|
| 292 |
if ext and ext not in allowed:
|
|
@@ -307,9 +346,7 @@ def api_analyze(uploaded_file):
|
|
| 307 |
with gr.Blocks(title="Pushup API (YOLO)") as demo:
|
| 308 |
gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n")
|
| 309 |
|
| 310 |
-
# IMPORTANT: keep this as gr.File to avoid the “Invalid file type: ['video']” problem you hit before
|
| 311 |
video_file = gr.File(label="Upload video")
|
| 312 |
-
|
| 313 |
btn = gr.Button("Analyze")
|
| 314 |
out_json = gr.JSON(label="Results JSON")
|
| 315 |
out_video = gr.Video(label="Annotated Output")
|
|
|
|
| 11 |
|
| 12 |
|
| 13 |
# ----------------------------
|
| 14 |
+
# Settings
|
| 15 |
# ----------------------------
|
| 16 |
UP_ANGLE = 155
|
| 17 |
DOWN_ANGLE = 105
|
| 18 |
+
|
| 19 |
+
# Target processing FPS (how often we run YOLO)
|
| 20 |
+
# 10 is a good balance for speed vs accuracy.
|
| 21 |
+
TARGET_FPS = 10
|
| 22 |
+
|
| 23 |
+
# Minimum rep duration in seconds (more robust than hardcoding frames)
|
| 24 |
+
# 0.25s is a safe filter against noise but won't kill real reps.
|
| 25 |
+
MIN_REP_SECONDS = 0.25
|
| 26 |
+
|
| 27 |
+
# YOLO inference resize (no cropping, only downscale).
|
| 28 |
+
# 640 is typically safe with small accuracy loss, big speed gain on high-res videos.
|
| 29 |
+
MAX_INFER_SIDE = 640
|
| 30 |
|
| 31 |
|
| 32 |
# ----------------------------
|
|
|
|
| 53 |
|
| 54 |
|
| 55 |
# ----------------------------
|
| 56 |
+
# Helpers
|
| 57 |
# ----------------------------
|
| 58 |
def angle_deg(a, b, c):
|
| 59 |
a = np.asarray(a, dtype=np.float32)
|
|
|
|
| 66 |
return float(math.degrees(math.acos(cosv)))
|
| 67 |
|
| 68 |
def pick_best_side(kxy, kconf):
|
| 69 |
+
left = [5, 7, 9] # L shoulder, L elbow, L wrist
|
| 70 |
+
right = [6, 8, 10] # R shoulder, R elbow, R wrist
|
| 71 |
if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])):
|
| 72 |
return right, float(np.mean(kconf[right]))
|
| 73 |
return left, float(np.mean(kconf[left]))
|
|
|
|
| 103 |
return int(round(s_lo + t * (s_hi - s_lo)))
|
| 104 |
return 0
|
| 105 |
|
| 106 |
+
def resize_for_inference(frame, max_side=640):
|
| 107 |
+
h, w = frame.shape[:2]
|
| 108 |
+
m = max(h, w)
|
| 109 |
+
if m <= max_side:
|
| 110 |
+
return frame
|
| 111 |
+
scale = max_side / float(m)
|
| 112 |
+
new_w = int(round(w * scale))
|
| 113 |
+
new_h = int(round(h * scale))
|
| 114 |
+
return cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
| 115 |
+
|
| 116 |
|
| 117 |
# ----------------------------
|
| 118 |
# Core pipeline
|
|
|
|
| 128 |
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
|
| 129 |
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
|
| 130 |
|
| 131 |
+
# Compute stride from target FPS
|
| 132 |
+
# Example: fps=30, target=10 => stride=3
|
| 133 |
+
frame_stride = max(1, int(round(float(fps) / float(TARGET_FPS))))
|
| 134 |
+
|
| 135 |
+
# Make MIN_REP_FRAMES consistent in real time, not in raw frames
|
| 136 |
+
# We count "sampled frames", so this should be based on effective fps = fps / stride
|
| 137 |
+
effective_fps = float(fps) / float(frame_stride)
|
| 138 |
+
min_rep_frames = max(3, int(math.ceil(MIN_REP_SECONDS * effective_fps)))
|
| 139 |
+
|
| 140 |
+
print(f"Video fps={fps:.2f}, TARGET_FPS={TARGET_FPS}, stride={frame_stride}, effective_fps={effective_fps:.2f}, MIN_REP_FRAMES={min_rep_frames}")
|
| 141 |
+
|
| 142 |
# 1) First pass: compute angles + confs per sampled frame
|
| 143 |
angles, confs, frame_ids = [], [], []
|
| 144 |
frame_i = 0
|
|
|
|
| 148 |
if not ok:
|
| 149 |
break
|
| 150 |
|
| 151 |
+
if frame_i % frame_stride != 0:
|
| 152 |
frame_i += 1
|
| 153 |
continue
|
| 154 |
|
| 155 |
+
infer_frame = resize_for_inference(frame, MAX_INFER_SIDE)
|
| 156 |
+
|
| 157 |
+
res = model(infer_frame, verbose=False)[0]
|
| 158 |
if res.keypoints is None or len(res.keypoints.xy) == 0:
|
| 159 |
angles.append(np.nan)
|
| 160 |
confs.append(0.0)
|
|
|
|
| 200 |
raise RuntimeError("No valid pose angles detected.")
|
| 201 |
|
| 202 |
win = min(31, (len(angles) // 2) * 2 + 1)
|
| 203 |
+
win = max(win, 5)
|
| 204 |
angles_smooth = savgol_filter(angles, win, 2)
|
| 205 |
|
| 206 |
# 2) Rep detection on smoothed angles
|
|
|
|
| 225 |
rep_len += 1
|
| 226 |
|
| 227 |
if ang >= UP_ANGLE:
|
| 228 |
+
if rep_len >= min_rep_frames:
|
| 229 |
mean_cf = float(rep_conf_sum / rep_len)
|
| 230 |
likelihood = rep_likelihood(rep_min, rep_max, mean_cf)
|
| 231 |
score = likelihood_to_score(likelihood)
|
|
|
|
| 253 |
df = pd.DataFrame(reps)
|
| 254 |
df.to_csv(csv_path, index=False)
|
| 255 |
|
| 256 |
+
# 4) Annotated video (keep original resolution)
|
| 257 |
annotated_path = os.path.join(out_dir, "pushup_annotated.mp4")
|
| 258 |
cap = cv2.VideoCapture(video_path)
|
| 259 |
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
|
|
|
| 274 |
ang_disp = float(angles_smooth[j])
|
| 275 |
|
| 276 |
cv2.putText(frame, f"Reps: {count}/{len(reps)}", (20, 40),
|
| 277 |
+
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
|
| 278 |
cv2.putText(frame, f"Elbow angle: {ang_disp:.1f}", (20, 80),
|
| 279 |
+
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
|
| 280 |
cv2.putText(frame, f"Rep score: {active if active is not None else '-'}", (20, 120),
|
| 281 |
+
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
|
| 282 |
|
| 283 |
writer.write(frame)
|
| 284 |
frame_i += 1
|
|
|
|
| 293 |
"avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0,
|
| 294 |
"avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0,
|
| 295 |
"rep_events": reps,
|
| 296 |
+
"speed_settings": {
|
| 297 |
+
"video_fps": float(fps),
|
| 298 |
+
"target_fps": int(TARGET_FPS),
|
| 299 |
+
"frame_stride": int(frame_stride),
|
| 300 |
+
"effective_fps": float(effective_fps),
|
| 301 |
+
"min_rep_frames": int(min_rep_frames),
|
| 302 |
+
"max_infer_side": int(MAX_INFER_SIDE),
|
| 303 |
+
}
|
| 304 |
}
|
| 305 |
|
| 306 |
return summary, annotated_path, csv_path
|
| 307 |
|
| 308 |
|
| 309 |
# ----------------------------
|
| 310 |
+
# API wrapper
|
| 311 |
# ----------------------------
|
| 312 |
def api_analyze(uploaded_file):
|
| 313 |
if uploaded_file is None:
|
|
|
|
| 316 |
workdir = tempfile.mkdtemp()
|
| 317 |
in_path = os.path.join(workdir, "input.mp4")
|
| 318 |
|
|
|
|
| 319 |
src_path = None
|
| 320 |
if hasattr(uploaded_file, "path") and uploaded_file.path:
|
| 321 |
src_path = uploaded_file.path
|
|
|
|
| 326 |
else:
|
| 327 |
src_path = str(uploaded_file)
|
| 328 |
|
|
|
|
| 329 |
ext = os.path.splitext(src_path)[1].lower()
|
| 330 |
allowed = {".mp4", ".mov", ".webm", ".mkv"}
|
| 331 |
if ext and ext not in allowed:
|
|
|
|
| 346 |
with gr.Blocks(title="Pushup API (YOLO)") as demo:
|
| 347 |
gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n")
|
| 348 |
|
|
|
|
| 349 |
video_file = gr.File(label="Upload video")
|
|
|
|
| 350 |
btn = gr.Button("Analyze")
|
| 351 |
out_json = gr.JSON(label="Results JSON")
|
| 352 |
out_video = gr.Video(label="Annotated Output")
|