gibil commited on
Commit
aa938e5
·
verified ·
1 Parent(s): de3494f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +56 -59
app.py CHANGED
@@ -1,11 +1,10 @@
1
  import os
 
 
2
  import cv2
3
  import numpy as np
4
  import mediapipe as mp
5
-
6
  import gradio as gr
7
- import tempfile
8
- import shutil
9
 
10
 
11
  # -----------------------
@@ -13,11 +12,12 @@ import shutil
13
  # -----------------------
14
  def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated_out_path: str | None = None):
15
  """
16
- Runs MediaPipe Pose on a video, counts pushup reps, and returns:
17
  {
18
  "ok": bool,
19
  "error": str | None,
20
  "rep_count": int,
 
21
  "rep_events": list[dict],
22
  "annotated_video_path": str | None
23
  }
@@ -27,16 +27,15 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
27
  "ok": False,
28
  "error": f"Could not find input video: {video_path}",
29
  "rep_count": 0,
 
30
  "rep_events": [],
31
  "annotated_video_path": None,
32
  }
33
 
34
- # ---------- Math helpers ----------
35
  def clamp(x, lo=0.0, hi=1.0):
36
  return max(lo, min(hi, x))
37
 
38
  def angle_deg(a, b, c):
39
- """Angle ABC in degrees using points a,b,c as (x,y)."""
40
  a = np.array(a, dtype=np.float32)
41
  b = np.array(b, dtype=np.float32)
42
  c = np.array(c, dtype=np.float32)
@@ -48,10 +47,6 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
48
  return float(np.degrees(np.arccos(cosang)))
49
 
50
  def score_from_range(val, good_lo, good_hi, ok_lo, ok_hi):
51
- """
52
- Returns 1 if val in [good_lo, good_hi],
53
- fades to 0 by the time it reaches ok_lo/ok_hi.
54
- """
55
  if good_lo <= val <= good_hi:
56
  return 1.0
57
  if val < good_lo:
@@ -62,7 +57,6 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
62
  def ema(prev, x, a=0.25):
63
  return x if prev is None else (a * x + (1 - a) * prev)
64
 
65
- # ---------- Pose setup ----------
66
  mp_pose = mp.solutions.pose
67
  pose = mp_pose.Pose(
68
  static_image_mode=False,
@@ -73,14 +67,14 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
73
  min_tracking_confidence=0.5,
74
  )
75
 
76
- # ---------- Video I/O ----------
77
  cap = cv2.VideoCapture(video_path)
78
  if not cap.isOpened():
79
  pose.close()
80
  return {
81
  "ok": False,
82
- "error": "OpenCV could not open the video. Try a different mp4 encoding.",
83
  "rep_count": 0,
 
84
  "rep_events": [],
85
  "annotated_video_path": None,
86
  }
@@ -89,18 +83,6 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
89
  W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
90
  H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
91
 
92
- if W <= 0 or H <= 0:
93
- cap.release()
94
- pose.close()
95
- return {
96
- "ok": False,
97
- "error": f"Bad video dimensions from OpenCV: W={W}, H={H}.",
98
- "rep_count": 0,
99
- "rep_events": [],
100
- "annotated_video_path": None,
101
- }
102
-
103
- # Output path handling
104
  annotated_path = None
105
  writer = None
106
  if save_annotated:
@@ -110,8 +92,7 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
110
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
111
  writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))
112
 
113
- # ---------- Pushup detection logic ----------
114
- state = "UNKNOWN" # "UP" or "DOWN"
115
  rep_events = []
116
  current_rep = None
117
  rep_count = 0
@@ -144,7 +125,6 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
144
  if res.pose_landmarks:
145
  lms = res.pose_landmarks.landmark
146
 
147
- # Choose side: whichever shoulder has higher visibility
148
  Ls = lms[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
149
  Rs = lms[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
150
  left_side = (Ls.visibility >= Rs.visibility)
@@ -209,8 +189,6 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
209
 
210
  rep_events.append({
211
  "rep": int(rep_count),
212
- "start_f": int(current_rep["start_f"]),
213
- "end_f": int(end_f),
214
  "start_t": float(current_rep["start_f"] / fps),
215
  "end_t": float(end_f / fps),
216
  "prob": float(rep_prob),
@@ -227,7 +205,6 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
227
 
228
  debug_txt = f"{'L' if left_side else 'R'} vis={ema_vis:.2f} elbow={ema_elbow:.0f} straight={ema_straight:.0f} p={frame_prob:.2f} state={state}"
229
 
230
- # Overlay text on every frame
231
  cv2.putText(frame, f"Reps: {rep_count}", (20, 40),
232
  cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA)
233
  cv2.putText(frame, debug_txt[:90], (20, 75),
@@ -245,6 +222,7 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
245
  "ok": False,
246
  "error": f"Runtime error: {type(e).__name__}: {e}",
247
  "rep_count": rep_count,
 
248
  "rep_events": rep_events,
249
  "annotated_video_path": annotated_path,
250
  }
@@ -254,51 +232,70 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated
254
  writer.release()
255
  pose.close()
256
 
 
 
 
 
257
  return {
258
  "ok": True,
259
  "error": None,
260
- "rep_count": rep_count,
 
261
  "rep_events": rep_events,
262
  "annotated_video_path": annotated_path,
263
  }
264
 
265
 
266
  # -----------------------
267
- # Gradio wrapper
268
  # -----------------------
269
- def gradio_run(video_file_path):
270
- # video_file_path is a REAL file path on the HF server (because type="filepath")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
271
  workdir = tempfile.mkdtemp()
272
  in_path = os.path.join(workdir, "input.mp4")
273
- shutil.copy(video_file_path, in_path)
274
 
275
  out_path = os.path.join(workdir, "annotated.mp4")
276
  result = analyze_pushup_video(in_path, save_annotated=True, annotated_out_path=out_path)
277
 
278
- if not result["ok"]:
279
- return {"ok": False, "error": str(result["error"]), "rep_count": 0, "avg_prob": 0.0, "rep_events": []}
280
 
281
- avg_prob = 0.0
282
- if result["rep_events"]:
283
- avg_prob = sum(r["prob"] for r in result["rep_events"]) / len(result["rep_events"])
284
-
285
- # IMPORTANT: return only JSON-ish data (browser can render it easily)
286
- return {
287
- "ok": True,
288
- "error": None,
289
- "rep_count": int(result["rep_count"]),
290
- "avg_prob": float(avg_prob),
291
- "rep_events": result["rep_events"],
292
- }
293
 
 
 
 
 
 
 
294
 
295
- demo = gr.Interface(
296
- fn=gradio_run,
297
- inputs=gr.Video(label="Upload pushup video", type="filepath"),
298
- outputs=gr.JSON(label="Result"),
299
- api_name="analyze", # gives you a stable endpoint name
300
- )
301
 
302
- if __name__ == "__main__":
303
- demo.queue()
304
- demo.launch()
 
1
  import os
2
+ import tempfile
3
+ import shutil
4
  import cv2
5
  import numpy as np
6
  import mediapipe as mp
 
7
  import gradio as gr
 
 
8
 
9
 
10
  # -----------------------
 
12
  # -----------------------
13
  def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated_out_path: str | None = None):
14
  """
15
+ Returns:
16
  {
17
  "ok": bool,
18
  "error": str | None,
19
  "rep_count": int,
20
+ "avg_rep_prob": float | None,
21
  "rep_events": list[dict],
22
  "annotated_video_path": str | None
23
  }
 
27
  "ok": False,
28
  "error": f"Could not find input video: {video_path}",
29
  "rep_count": 0,
30
+ "avg_rep_prob": None,
31
  "rep_events": [],
32
  "annotated_video_path": None,
33
  }
34
 
 
35
  def clamp(x, lo=0.0, hi=1.0):
36
  return max(lo, min(hi, x))
37
 
38
  def angle_deg(a, b, c):
 
39
  a = np.array(a, dtype=np.float32)
40
  b = np.array(b, dtype=np.float32)
41
  c = np.array(c, dtype=np.float32)
 
47
  return float(np.degrees(np.arccos(cosang)))
48
 
49
  def score_from_range(val, good_lo, good_hi, ok_lo, ok_hi):
 
 
 
 
50
  if good_lo <= val <= good_hi:
51
  return 1.0
52
  if val < good_lo:
 
57
  def ema(prev, x, a=0.25):
58
  return x if prev is None else (a * x + (1 - a) * prev)
59
 
 
60
  mp_pose = mp.solutions.pose
61
  pose = mp_pose.Pose(
62
  static_image_mode=False,
 
67
  min_tracking_confidence=0.5,
68
  )
69
 
 
70
  cap = cv2.VideoCapture(video_path)
71
  if not cap.isOpened():
72
  pose.close()
73
  return {
74
  "ok": False,
75
+ "error": "OpenCV could not open the video. Try a different MP4 encoding.",
76
  "rep_count": 0,
77
+ "avg_rep_prob": None,
78
  "rep_events": [],
79
  "annotated_video_path": None,
80
  }
 
83
  W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
84
  H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
85
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  annotated_path = None
87
  writer = None
88
  if save_annotated:
 
92
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
93
  writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))
94
 
95
+ state = "UNKNOWN"
 
96
  rep_events = []
97
  current_rep = None
98
  rep_count = 0
 
125
  if res.pose_landmarks:
126
  lms = res.pose_landmarks.landmark
127
 
 
128
  Ls = lms[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
129
  Rs = lms[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
130
  left_side = (Ls.visibility >= Rs.visibility)
 
189
 
190
  rep_events.append({
191
  "rep": int(rep_count),
 
 
192
  "start_t": float(current_rep["start_f"] / fps),
193
  "end_t": float(end_f / fps),
194
  "prob": float(rep_prob),
 
205
 
206
  debug_txt = f"{'L' if left_side else 'R'} vis={ema_vis:.2f} elbow={ema_elbow:.0f} straight={ema_straight:.0f} p={frame_prob:.2f} state={state}"
207
 
 
208
  cv2.putText(frame, f"Reps: {rep_count}", (20, 40),
209
  cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA)
210
  cv2.putText(frame, debug_txt[:90], (20, 75),
 
222
  "ok": False,
223
  "error": f"Runtime error: {type(e).__name__}: {e}",
224
  "rep_count": rep_count,
225
+ "avg_rep_prob": None,
226
  "rep_events": rep_events,
227
  "annotated_video_path": annotated_path,
228
  }
 
232
  writer.release()
233
  pose.close()
234
 
235
+ avg_rep_prob = None
236
+ if rep_events:
237
+ avg_rep_prob = float(sum(r["prob"] for r in rep_events) / len(rep_events))
238
+
239
  return {
240
  "ok": True,
241
  "error": None,
242
+ "rep_count": int(rep_count),
243
+ "avg_rep_prob": avg_rep_prob,
244
  "rep_events": rep_events,
245
  "annotated_video_path": annotated_path,
246
  }
247
 
248
 
249
  # -----------------------
250
+ # Gradio API wrapper
251
  # -----------------------
252
+ def api_run(uploaded_file):
253
+ """
254
+ uploaded_file can be:
255
+ - a Gradio UploadedFile object with .name
256
+ - a dict with 'path'
257
+ - a string path
258
+ We normalize it to a real filepath, copy it, run pipeline, return JSON + annotated video.
259
+ """
260
+ # get path
261
+ path = None
262
+ if uploaded_file is None:
263
+ return {"ok": False, "error": "No file uploaded."}, None
264
+
265
+ if isinstance(uploaded_file, str):
266
+ path = uploaded_file
267
+ elif isinstance(uploaded_file, dict) and "path" in uploaded_file:
268
+ path = uploaded_file["path"]
269
+ elif hasattr(uploaded_file, "name"):
270
+ path = uploaded_file.name
271
+
272
+ if not path or not os.path.exists(path):
273
+ return {"ok": False, "error": f"Upload path missing or not found: {path}"}, None
274
+
275
  workdir = tempfile.mkdtemp()
276
  in_path = os.path.join(workdir, "input.mp4")
277
+ shutil.copy(path, in_path)
278
 
279
  out_path = os.path.join(workdir, "annotated.mp4")
280
  result = analyze_pushup_video(in_path, save_annotated=True, annotated_out_path=out_path)
281
 
282
+ # Return JSON + video path (Gradio serves it)
283
+ return result, result["annotated_video_path"]
284
 
 
 
 
 
 
 
 
 
 
 
 
 
285
 
286
+ with gr.Blocks(title="Pushup Prototype API") as demo:
287
+ gr.Markdown("# Pushup Prototype (API)\nUpload a video -> get JSON result + annotated output.")
288
+ video_file = gr.File(label="Upload MP4", file_types=["video"])
289
+ run_btn = gr.Button("Analyze")
290
+ out_json = gr.JSON(label="Result JSON")
291
+ out_vid = gr.Video(label="Annotated Output")
292
 
293
+ run_btn.click(
294
+ fn=api_run,
295
+ inputs=[video_file],
296
+ outputs=[out_json, out_vid],
297
+ api_name="analyze" # IMPORTANT for GitHub JS calls
298
+ )
299
 
300
+ demo.queue()
301
+ demo.launch()