gibil commited on
Commit
d7f9742
·
verified ·
1 Parent(s): a0d73b7

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +70 -59
app.py CHANGED
@@ -6,36 +6,37 @@ import mediapipe as mp
6
  import gradio as gr
7
  import tempfile
8
  import shutil
9
- from typing import Optional, Any, Dict
10
 
11
 
12
  # -----------------------
13
  # Core pipeline function
14
  # -----------------------
15
- def analyze_pushup_video(
16
- video_path: str,
17
- save_annotated: bool = True,
18
- annotated_out_path: Optional[str] = None,
19
- ):
20
  """
21
- Returns dict:
22
  {
23
  "ok": bool,
24
- "error": str|None,
25
  "rep_count": int,
26
  "rep_events": list[dict],
27
- "annotated_video_path": str|None
28
  }
29
  """
30
  if not os.path.exists(video_path):
31
- return {"ok": False, "error": f"Could not find input video: {video_path}",
32
- "rep_count": 0, "rep_events": [], "annotated_video_path": None}
33
-
34
- # ---------- helpers ----------
 
 
 
 
 
35
  def clamp(x, lo=0.0, hi=1.0):
36
  return max(lo, min(hi, x))
37
 
38
  def angle_deg(a, b, c):
 
39
  a = np.array(a, dtype=np.float32)
40
  b = np.array(b, dtype=np.float32)
41
  c = np.array(c, dtype=np.float32)
@@ -47,16 +48,21 @@ def analyze_pushup_video(
47
  return float(np.degrees(np.arccos(cosang)))
48
 
49
  def score_from_range(val, good_lo, good_hi, ok_lo, ok_hi):
 
 
 
 
50
  if good_lo <= val <= good_hi:
51
  return 1.0
52
  if val < good_lo:
53
  return clamp((val - ok_lo) / (good_lo - ok_lo))
54
- return clamp((ok_hi - val) / (ok_hi - good_hi))
 
55
 
56
  def ema(prev, x, a=0.25):
57
  return x if prev is None else (a * x + (1 - a) * prev)
58
 
59
- # ---------- pose ----------
60
  mp_pose = mp.solutions.pose
61
  pose = mp_pose.Pose(
62
  static_image_mode=False,
@@ -67,26 +73,34 @@ def analyze_pushup_video(
67
  min_tracking_confidence=0.5,
68
  )
69
 
 
70
  cap = cv2.VideoCapture(video_path)
71
  if not cap.isOpened():
72
  pose.close()
73
- return {"ok": False, "error": "OpenCV could not open the video. Try a different MP4 encoding.",
74
- "rep_count": 0, "rep_events": [], "annotated_video_path": None}
75
-
76
- fps = float(cap.get(cv2.CAP_PROP_FPS) or 30.0)
77
- W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
78
- H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
 
 
 
 
 
79
 
80
  if W <= 0 or H <= 0:
81
  cap.release()
82
  pose.close()
83
- return {"ok": False, "error": f"Video decode failed (W={W}, H={H}). Try re-encoding the video.",
84
- "rep_count": 0, "rep_events": [], "annotated_video_path": None}
85
-
86
- # Some encoders hate odd dimensions
87
- if W % 2 == 1: W -= 1
88
- if H % 2 == 1: H -= 1
89
-
 
 
90
  annotated_path = None
91
  writer = None
92
  if save_annotated:
@@ -95,13 +109,9 @@ def analyze_pushup_video(
95
  annotated_path = annotated_out_path
96
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
97
  writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))
98
- if not writer.isOpened():
99
- # Don’t crash the whole run if writing fails
100
- writer = None
101
- annotated_path = None
102
 
103
- # ---------- detection ----------
104
- state = "UNKNOWN"
105
  rep_events = []
106
  current_rep = None
107
  rep_count = 0
@@ -125,9 +135,6 @@ def analyze_pushup_video(
125
  break
126
  frame_idx += 1
127
 
128
- # Resize to the exact writer size if we adjusted odd dims
129
- frame = frame[:H, :W]
130
-
131
  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
132
  res = pose.process(rgb)
133
 
@@ -137,6 +144,7 @@ def analyze_pushup_video(
137
  if res.pose_landmarks:
138
  lms = res.pose_landmarks.landmark
139
 
 
140
  Ls = lms[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
141
  Rs = lms[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
142
  left_side = (Ls.visibility >= Rs.visibility)
@@ -171,7 +179,7 @@ def analyze_pushup_video(
171
 
172
  s_straight = score_from_range(ema_straight, 165, 185, 145, 195)
173
  s_elbow = score_from_range(ema_elbow, 85, 175, 60, 190)
174
- s_vis = clamp((ema_vis - MIN_VIS) / (0.85 - MIN_VIS)) if ema_vis is not None else 0.0
175
 
176
  frame_prob = clamp(0.15 + 0.45 * s_elbow + 0.30 * s_straight + 0.10 * s_vis)
177
 
@@ -188,6 +196,7 @@ def analyze_pushup_video(
188
  "min_elbow": float(ema_elbow),
189
  "min_straight": float(ema_straight),
190
  }
 
191
  elif state == "DOWN":
192
  if ema_elbow >= UP_ELBOW_DEG and frame_prob >= 0.35:
193
  end_f = frame_idx
@@ -197,8 +206,11 @@ def analyze_pushup_video(
197
  rep_count += 1
198
  probs = current_rep["frame_probs"] if current_rep["frame_probs"] else [frame_prob]
199
  rep_prob = float(np.mean(probs))
 
200
  rep_events.append({
201
- "rep": rep_count,
 
 
202
  "start_t": float(current_rep["start_f"] / fps),
203
  "end_t": float(end_f / fps),
204
  "prob": float(rep_prob),
@@ -215,6 +227,7 @@ def analyze_pushup_video(
215
 
216
  debug_txt = f"{'L' if left_side else 'R'} vis={ema_vis:.2f} elbow={ema_elbow:.0f} straight={ema_straight:.0f} p={frame_prob:.2f} state={state}"
217
 
 
218
  cv2.putText(frame, f"Reps: {rep_count}", (20, 40),
219
  cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA)
220
  cv2.putText(frame, debug_txt[:90], (20, 75),
@@ -228,39 +241,35 @@ def analyze_pushup_video(
228
  if writer is not None:
229
  writer.release()
230
  pose.close()
231
- return {"ok": False, "error": f"Runtime error: {type(e).__name__}: {e}",
232
- "rep_count": rep_count, "rep_events": rep_events, "annotated_video_path": annotated_path}
 
 
 
 
 
233
 
234
  cap.release()
235
  if writer is not None:
236
  writer.release()
237
  pose.close()
238
 
239
- return {"ok": True, "error": None, "rep_count": rep_count,
240
- "rep_events": rep_events, "annotated_video_path": annotated_path}
 
 
 
 
 
241
 
242
 
243
  # -----------------------
244
  # Gradio wrapper
245
  # -----------------------
246
- def _extract_video_path(video_input: Any) -> str:
247
- # Gradio sometimes gives a string path, sometimes a dict with "path"
248
- if isinstance(video_input, str):
249
- return video_input
250
- if isinstance(video_input, dict) and "path" in video_input:
251
- return video_input["path"]
252
- # Some versions use "name"
253
- if isinstance(video_input, dict) and "name" in video_input:
254
- return video_input["name"]
255
- raise ValueError(f"Unexpected video input type: {type(video_input)} value={video_input}")
256
-
257
-
258
  def gradio_run(video_file):
259
  workdir = tempfile.mkdtemp()
260
  in_path = os.path.join(workdir, "input.mp4")
261
-
262
- src_path = _extract_video_path(video_file)
263
- shutil.copy(src_path, in_path)
264
 
265
  out_path = os.path.join(workdir, "annotated.mp4")
266
  result = analyze_pushup_video(in_path, save_annotated=True, annotated_out_path=out_path)
@@ -284,7 +293,9 @@ demo = gr.Interface(
284
  gr.Video(label="Annotated output"),
285
  gr.JSON(label="Per-rep details"),
286
  ],
287
- allow_flagging="never",
 
 
288
  )
289
 
290
  if __name__ == "__main__":
 
6
  import gradio as gr
7
  import tempfile
8
  import shutil
 
9
 
10
 
11
  # -----------------------
12
  # Core pipeline function
13
  # -----------------------
14
+ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated_out_path: str | None = None):
 
 
 
 
15
  """
16
+ Runs MediaPipe Pose on a video, counts pushup reps, and returns:
17
  {
18
  "ok": bool,
19
+ "error": str | None,
20
  "rep_count": int,
21
  "rep_events": list[dict],
22
+ "annotated_video_path": str | None
23
  }
24
  """
25
  if not os.path.exists(video_path):
26
+ return {
27
+ "ok": False,
28
+ "error": f"Could not find input video: {video_path}",
29
+ "rep_count": 0,
30
+ "rep_events": [],
31
+ "annotated_video_path": None,
32
+ }
33
+
34
+ # ---------- Math helpers ----------
35
  def clamp(x, lo=0.0, hi=1.0):
36
  return max(lo, min(hi, x))
37
 
38
  def angle_deg(a, b, c):
39
+ """Angle ABC in degrees using points a,b,c as (x,y)."""
40
  a = np.array(a, dtype=np.float32)
41
  b = np.array(b, dtype=np.float32)
42
  c = np.array(c, dtype=np.float32)
 
48
  return float(np.degrees(np.arccos(cosang)))
49
 
50
  def score_from_range(val, good_lo, good_hi, ok_lo, ok_hi):
51
+ """
52
+ Returns 1 if val in [good_lo, good_hi],
53
+ fades to 0 by the time it reaches ok_lo/ok_hi.
54
+ """
55
  if good_lo <= val <= good_hi:
56
  return 1.0
57
  if val < good_lo:
58
  return clamp((val - ok_lo) / (good_lo - ok_lo))
59
+ else:
60
+ return clamp((ok_hi - val) / (ok_hi - good_hi))
61
 
62
  def ema(prev, x, a=0.25):
63
  return x if prev is None else (a * x + (1 - a) * prev)
64
 
65
+ # ---------- Pose setup ----------
66
  mp_pose = mp.solutions.pose
67
  pose = mp_pose.Pose(
68
  static_image_mode=False,
 
73
  min_tracking_confidence=0.5,
74
  )
75
 
76
+ # ---------- Video I/O ----------
77
  cap = cv2.VideoCapture(video_path)
78
  if not cap.isOpened():
79
  pose.close()
80
+ return {
81
+ "ok": False,
82
+ "error": "OpenCV could not open the video. Try a different mp4 encoding.",
83
+ "rep_count": 0,
84
+ "rep_events": [],
85
+ "annotated_video_path": None,
86
+ }
87
+
88
+ fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
89
+ W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
90
+ H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
91
 
92
  if W <= 0 or H <= 0:
93
  cap.release()
94
  pose.close()
95
+ return {
96
+ "ok": False,
97
+ "error": f"Bad video dimensions from OpenCV: W={W}, H={H}.",
98
+ "rep_count": 0,
99
+ "rep_events": [],
100
+ "annotated_video_path": None,
101
+ }
102
+
103
+ # Output path handling
104
  annotated_path = None
105
  writer = None
106
  if save_annotated:
 
109
  annotated_path = annotated_out_path
110
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
111
  writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))
 
 
 
 
112
 
113
+ # ---------- Pushup detection logic ----------
114
+ state = "UNKNOWN" # "UP" or "DOWN"
115
  rep_events = []
116
  current_rep = None
117
  rep_count = 0
 
135
  break
136
  frame_idx += 1
137
 
 
 
 
138
  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
139
  res = pose.process(rgb)
140
 
 
144
  if res.pose_landmarks:
145
  lms = res.pose_landmarks.landmark
146
 
147
+ # Choose side: whichever shoulder has higher visibility
148
  Ls = lms[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
149
  Rs = lms[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
150
  left_side = (Ls.visibility >= Rs.visibility)
 
179
 
180
  s_straight = score_from_range(ema_straight, 165, 185, 145, 195)
181
  s_elbow = score_from_range(ema_elbow, 85, 175, 60, 190)
182
+ s_vis = clamp((ema_vis - MIN_VIS) / (0.85 - MIN_VIS))
183
 
184
  frame_prob = clamp(0.15 + 0.45 * s_elbow + 0.30 * s_straight + 0.10 * s_vis)
185
 
 
196
  "min_elbow": float(ema_elbow),
197
  "min_straight": float(ema_straight),
198
  }
199
+
200
  elif state == "DOWN":
201
  if ema_elbow >= UP_ELBOW_DEG and frame_prob >= 0.35:
202
  end_f = frame_idx
 
206
  rep_count += 1
207
  probs = current_rep["frame_probs"] if current_rep["frame_probs"] else [frame_prob]
208
  rep_prob = float(np.mean(probs))
209
+
210
  rep_events.append({
211
+ "rep": int(rep_count),
212
+ "start_f": int(current_rep["start_f"]),
213
+ "end_f": int(end_f),
214
  "start_t": float(current_rep["start_f"] / fps),
215
  "end_t": float(end_f / fps),
216
  "prob": float(rep_prob),
 
227
 
228
  debug_txt = f"{'L' if left_side else 'R'} vis={ema_vis:.2f} elbow={ema_elbow:.0f} straight={ema_straight:.0f} p={frame_prob:.2f} state={state}"
229
 
230
+ # Overlay text on every frame
231
  cv2.putText(frame, f"Reps: {rep_count}", (20, 40),
232
  cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA)
233
  cv2.putText(frame, debug_txt[:90], (20, 75),
 
241
  if writer is not None:
242
  writer.release()
243
  pose.close()
244
+ return {
245
+ "ok": False,
246
+ "error": f"Runtime error: {type(e).__name__}: {e}",
247
+ "rep_count": rep_count,
248
+ "rep_events": rep_events,
249
+ "annotated_video_path": annotated_path,
250
+ }
251
 
252
  cap.release()
253
  if writer is not None:
254
  writer.release()
255
  pose.close()
256
 
257
+ return {
258
+ "ok": True,
259
+ "error": None,
260
+ "rep_count": rep_count,
261
+ "rep_events": rep_events,
262
+ "annotated_video_path": annotated_path,
263
+ }
264
 
265
 
266
  # -----------------------
267
  # Gradio wrapper
268
  # -----------------------
 
 
 
 
 
 
 
 
 
 
 
 
269
  def gradio_run(video_file):
270
  workdir = tempfile.mkdtemp()
271
  in_path = os.path.join(workdir, "input.mp4")
272
+ shutil.copy(video_file, in_path)
 
 
273
 
274
  out_path = os.path.join(workdir, "annotated.mp4")
275
  result = analyze_pushup_video(in_path, save_annotated=True, annotated_out_path=out_path)
 
293
  gr.Video(label="Annotated output"),
294
  gr.JSON(label="Per-rep details"),
295
  ],
296
+ title="Pushup Prototype",
297
+ description="Uploads a video, counts reps, and gives per-rep likelihood.",
298
+ flagging_mode="never", # ✅ correct for newer gradio
299
  )
300
 
301
  if __name__ == "__main__":