gibil commited on
Commit
d14fed8
·
verified ·
1 Parent(s): 79ee78e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +22 -57
app.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  import os
2
  import math
3
  import shutil
@@ -11,22 +13,12 @@ import gradio as gr
11
 
12
 
13
  # ----------------------------
14
- # Settings
15
  # ----------------------------
16
  UP_ANGLE = 155
17
  DOWN_ANGLE = 105
18
-
19
- # Target processing FPS (how often we run YOLO)
20
- # 10 is a good balance for speed vs accuracy.
21
- TARGET_FPS = 10
22
-
23
- # Minimum rep duration in seconds (more robust than hardcoding frames)
24
- # 0.25s is a safe filter against noise but won't kill real reps.
25
- MIN_REP_SECONDS = 0.25
26
-
27
- # YOLO inference resize (no cropping, only downscale).
28
- # 640 is typically safe with small accuracy loss, big speed gain on high-res videos.
29
- MAX_INFER_SIDE = 640
30
 
31
 
32
  # ----------------------------
@@ -53,7 +45,7 @@ def load_pose_model():
53
 
54
 
55
  # ----------------------------
56
- # Helpers
57
  # ----------------------------
58
  def angle_deg(a, b, c):
59
  a = np.asarray(a, dtype=np.float32)
@@ -66,8 +58,8 @@ def angle_deg(a, b, c):
66
  return float(math.degrees(math.acos(cosv)))
67
 
68
  def pick_best_side(kxy, kconf):
69
- left = [5, 7, 9] # L shoulder, L elbow, L wrist
70
- right = [6, 8, 10] # R shoulder, R elbow, R wrist
71
  if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])):
72
  return right, float(np.mean(kconf[right]))
73
  return left, float(np.mean(kconf[left]))
@@ -103,16 +95,6 @@ def likelihood_to_score(p):
103
  return int(round(s_lo + t * (s_hi - s_lo)))
104
  return 0
105
 
106
- def resize_for_inference(frame, max_side=640):
107
- h, w = frame.shape[:2]
108
- m = max(h, w)
109
- if m <= max_side:
110
- return frame
111
- scale = max_side / float(m)
112
- new_w = int(round(w * scale))
113
- new_h = int(round(h * scale))
114
- return cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
115
-
116
 
117
  # ----------------------------
118
  # Core pipeline
@@ -128,17 +110,6 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
128
  w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
129
  h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
130
 
131
- # Compute stride from target FPS
132
- # Example: fps=30, target=10 => stride=3
133
- frame_stride = max(1, int(round(float(fps) / float(TARGET_FPS))))
134
-
135
- # Make MIN_REP_FRAMES consistent in real time, not in raw frames
136
- # We count "sampled frames", so this should be based on effective fps = fps / stride
137
- effective_fps = float(fps) / float(frame_stride)
138
- min_rep_frames = max(3, int(math.ceil(MIN_REP_SECONDS * effective_fps)))
139
-
140
- print(f"Video fps={fps:.2f}, TARGET_FPS={TARGET_FPS}, stride={frame_stride}, effective_fps={effective_fps:.2f}, MIN_REP_FRAMES={min_rep_frames}")
141
-
142
  # 1) First pass: compute angles + confs per sampled frame
143
  angles, confs, frame_ids = [], [], []
144
  frame_i = 0
@@ -148,13 +119,11 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
148
  if not ok:
149
  break
150
 
151
- if frame_i % frame_stride != 0:
152
  frame_i += 1
153
  continue
154
 
155
- infer_frame = resize_for_inference(frame, MAX_INFER_SIDE)
156
-
157
- res = model(infer_frame, verbose=False)[0]
158
  if res.keypoints is None or len(res.keypoints.xy) == 0:
159
  angles.append(np.nan)
160
  confs.append(0.0)
@@ -200,7 +169,7 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
200
  raise RuntimeError("No valid pose angles detected.")
201
 
202
  win = min(31, (len(angles) // 2) * 2 + 1)
203
- win = max(win, 5)
204
  angles_smooth = savgol_filter(angles, win, 2)
205
 
206
  # 2) Rep detection on smoothed angles
@@ -225,7 +194,7 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
225
  rep_len += 1
226
 
227
  if ang >= UP_ANGLE:
228
- if rep_len >= min_rep_frames:
229
  mean_cf = float(rep_conf_sum / rep_len)
230
  likelihood = rep_likelihood(rep_min, rep_max, mean_cf)
231
  score = likelihood_to_score(likelihood)
@@ -253,7 +222,7 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
253
  df = pd.DataFrame(reps)
254
  df.to_csv(csv_path, index=False)
255
 
256
- # 4) Annotated video (keep original resolution)
257
  annotated_path = os.path.join(out_dir, "pushup_annotated.mp4")
258
  cap = cv2.VideoCapture(video_path)
259
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
@@ -274,11 +243,11 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
274
  ang_disp = float(angles_smooth[j])
275
 
276
  cv2.putText(frame, f"Reps: {count}/{len(reps)}", (20, 40),
277
- cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
278
  cv2.putText(frame, f"Elbow angle: {ang_disp:.1f}", (20, 80),
279
- cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
280
  cv2.putText(frame, f"Rep score: {active if active is not None else '-'}", (20, 120),
281
- cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
282
 
283
  writer.write(frame)
284
  frame_i += 1
@@ -293,21 +262,13 @@ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
293
  "avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0,
294
  "avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0,
295
  "rep_events": reps,
296
- "speed_settings": {
297
- "video_fps": float(fps),
298
- "target_fps": int(TARGET_FPS),
299
- "frame_stride": int(frame_stride),
300
- "effective_fps": float(effective_fps),
301
- "min_rep_frames": int(min_rep_frames),
302
- "max_infer_side": int(MAX_INFER_SIDE),
303
- }
304
  }
305
 
306
  return summary, annotated_path, csv_path
307
 
308
 
309
  # ----------------------------
310
- # API wrapper
311
  # ----------------------------
312
  def api_analyze(uploaded_file):
313
  if uploaded_file is None:
@@ -316,6 +277,7 @@ def api_analyze(uploaded_file):
316
  workdir = tempfile.mkdtemp()
317
  in_path = os.path.join(workdir, "input.mp4")
318
 
 
319
  src_path = None
320
  if hasattr(uploaded_file, "path") and uploaded_file.path:
321
  src_path = uploaded_file.path
@@ -326,6 +288,7 @@ def api_analyze(uploaded_file):
326
  else:
327
  src_path = str(uploaded_file)
328
 
 
329
  ext = os.path.splitext(src_path)[1].lower()
330
  allowed = {".mp4", ".mov", ".webm", ".mkv"}
331
  if ext and ext not in allowed:
@@ -346,7 +309,9 @@ def api_analyze(uploaded_file):
346
  with gr.Blocks(title="Pushup API (YOLO)") as demo:
347
  gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n")
348
 
 
349
  video_file = gr.File(label="Upload video")
 
350
  btn = gr.Button("Analyze")
351
  out_json = gr.JSON(label="Results JSON")
352
  out_video = gr.Video(label="Annotated Output")
@@ -360,4 +325,4 @@ with gr.Blocks(title="Pushup API (YOLO)") as demo:
360
  )
361
 
362
  if __name__ == "__main__":
363
- demo.launch()
 
1
+
2
+ You said:
3
  import os
4
  import math
5
  import shutil
 
13
 
14
 
15
  # ----------------------------
16
+ # Settings (same as Colab)
17
  # ----------------------------
18
  UP_ANGLE = 155
19
  DOWN_ANGLE = 105
20
+ MIN_REP_FRAMES = 8
21
+ FRAME_STRIDE = 1
 
 
 
 
 
 
 
 
 
 
22
 
23
 
24
  # ----------------------------
 
45
 
46
 
47
  # ----------------------------
48
+ # Helpers (from your script)
49
  # ----------------------------
50
  def angle_deg(a, b, c):
51
  a = np.asarray(a, dtype=np.float32)
 
58
  return float(math.degrees(math.acos(cosv)))
59
 
60
  def pick_best_side(kxy, kconf):
61
+ left = [5, 7, 9] # L shoulder, L elbow, L wrist (YOLO COCO indices)
62
+ right = [6, 8, 10] # R shoulder, R elbow, R wrist
63
  if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])):
64
  return right, float(np.mean(kconf[right]))
65
  return left, float(np.mean(kconf[left]))
 
95
  return int(round(s_lo + t * (s_hi - s_lo)))
96
  return 0
97
 
 
 
 
 
 
 
 
 
 
 
98
 
99
  # ----------------------------
100
  # Core pipeline
 
110
  w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
111
  h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
112
 
 
 
 
 
 
 
 
 
 
 
 
113
  # 1) First pass: compute angles + confs per sampled frame
114
  angles, confs, frame_ids = [], [], []
115
  frame_i = 0
 
119
  if not ok:
120
  break
121
 
122
+ if frame_i % FRAME_STRIDE != 0:
123
  frame_i += 1
124
  continue
125
 
126
+ res = model(frame, verbose=False)[0]
 
 
127
  if res.keypoints is None or len(res.keypoints.xy) == 0:
128
  angles.append(np.nan)
129
  confs.append(0.0)
 
169
  raise RuntimeError("No valid pose angles detected.")
170
 
171
  win = min(31, (len(angles) // 2) * 2 + 1)
172
+ win = max(win, 5) # savgol requires >= 5 for polyorder=2 comfortably
173
  angles_smooth = savgol_filter(angles, win, 2)
174
 
175
  # 2) Rep detection on smoothed angles
 
194
  rep_len += 1
195
 
196
  if ang >= UP_ANGLE:
197
+ if rep_len >= MIN_REP_FRAMES:
198
  mean_cf = float(rep_conf_sum / rep_len)
199
  likelihood = rep_likelihood(rep_min, rep_max, mean_cf)
200
  score = likelihood_to_score(likelihood)
 
222
  df = pd.DataFrame(reps)
223
  df.to_csv(csv_path, index=False)
224
 
225
+ # 4) Annotated video
226
  annotated_path = os.path.join(out_dir, "pushup_annotated.mp4")
227
  cap = cv2.VideoCapture(video_path)
228
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
 
243
  ang_disp = float(angles_smooth[j])
244
 
245
  cv2.putText(frame, f"Reps: {count}/{len(reps)}", (20, 40),
246
+ cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,255), 2)
247
  cv2.putText(frame, f"Elbow angle: {ang_disp:.1f}", (20, 80),
248
+ cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
249
  cv2.putText(frame, f"Rep score: {active if active is not None else '-'}", (20, 120),
250
+ cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
251
 
252
  writer.write(frame)
253
  frame_i += 1
 
262
  "avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0,
263
  "avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0,
264
  "rep_events": reps,
 
 
 
 
 
 
 
 
265
  }
266
 
267
  return summary, annotated_path, csv_path
268
 
269
 
270
  # ----------------------------
271
+ # API wrapper (robust file handling like your old one)
272
  # ----------------------------
273
  def api_analyze(uploaded_file):
274
  if uploaded_file is None:
 
277
  workdir = tempfile.mkdtemp()
278
  in_path = os.path.join(workdir, "input.mp4")
279
 
280
+ # Resolve source path robustly
281
  src_path = None
282
  if hasattr(uploaded_file, "path") and uploaded_file.path:
283
  src_path = uploaded_file.path
 
288
  else:
289
  src_path = str(uploaded_file)
290
 
291
+ # Optional extension check (same idea as your old code)
292
  ext = os.path.splitext(src_path)[1].lower()
293
  allowed = {".mp4", ".mov", ".webm", ".mkv"}
294
  if ext and ext not in allowed:
 
309
  with gr.Blocks(title="Pushup API (YOLO)") as demo:
310
  gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n")
311
 
312
+ # IMPORTANT: keep this as gr.File to avoid the “Invalid file type: ['video']” problem you hit before
313
  video_file = gr.File(label="Upload video")
314
+
315
  btn = gr.Button("Analyze")
316
  out_json = gr.JSON(label="Results JSON")
317
  out_video = gr.Video(label="Annotated Output")
 
325
  )
326
 
327
  if __name__ == "__main__":
328
+ demo.launch()