gibil commited on
Commit
c75fd7e
·
verified ·
1 Parent(s): e2dc1fe

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +52 -66
app.py CHANGED
@@ -1,46 +1,37 @@
1
  import os
2
- import math
3
- import shutil
4
- import tempfile
5
- from typing import Any, Dict, List, Optional, Union
6
-
7
  import cv2
8
  import numpy as np
9
  import mediapipe as mp
 
 
10
  import gradio as gr
11
 
12
 
13
  # -----------------------
14
  # Core pipeline function
15
  # -----------------------
16
- def analyze_pushup_video(
17
- video_path: str,
18
- save_annotated: bool = True,
19
- annotated_out_path: Optional[str] = None,
20
- ) -> Dict[str, Any]:
21
  """
22
- Runs MediaPipe Pose on a video, counts pushup reps, and returns:
23
  {
24
  "ok": bool,
25
  "error": str | None,
26
  "rep_count": int,
27
- "avg_rep_prob": float | None,
28
  "rep_events": list[dict],
29
  "annotated_video_path": str | None
30
  }
31
  """
32
-
33
  if not os.path.exists(video_path):
34
  return {
35
  "ok": False,
36
  "error": f"Could not find input video: {video_path}",
37
  "rep_count": 0,
38
- "avg_rep_prob": None,
39
  "rep_events": [],
40
  "annotated_video_path": None,
41
  }
42
 
43
- # ---------- Helpers ----------
44
  def clamp(x, lo=0.0, hi=1.0):
45
  return max(lo, min(hi, x))
46
 
@@ -65,7 +56,6 @@ def analyze_pushup_video(
65
  def ema(prev, x, a=0.25):
66
  return x if prev is None else (a * x + (1 - a) * prev)
67
 
68
- # ---------- Pose ----------
69
  mp_pose = mp.solutions.pose
70
  pose = mp_pose.Pose(
71
  static_image_mode=False,
@@ -76,15 +66,14 @@ def analyze_pushup_video(
76
  min_tracking_confidence=0.5,
77
  )
78
 
79
- # ---------- Video ----------
80
  cap = cv2.VideoCapture(video_path)
81
  if not cap.isOpened():
82
  pose.close()
83
  return {
84
  "ok": False,
85
- "error": "OpenCV could not open the video. Try re-exporting to a standard H.264 mp4.",
86
  "rep_count": 0,
87
- "avg_rep_prob": None,
88
  "rep_events": [],
89
  "annotated_video_path": None,
90
  }
@@ -93,8 +82,8 @@ def analyze_pushup_video(
93
  W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
94
  H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
95
 
96
- writer = None
97
  annotated_path = None
 
98
  if save_annotated:
99
  if annotated_out_path is None:
100
  annotated_out_path = os.path.join(tempfile.mkdtemp(), "annotated.mp4")
@@ -102,10 +91,9 @@ def analyze_pushup_video(
102
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
103
  writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))
104
 
105
- # ---------- Detection state ----------
106
- state = "UNKNOWN" # UP/DOWN
107
- rep_events: List[Dict[str, Any]] = []
108
- current_rep: Optional[Dict[str, Any]] = None
109
  rep_count = 0
110
 
111
  ema_elbow = None
@@ -197,7 +185,6 @@ def analyze_pushup_video(
197
  rep_count += 1
198
  probs = current_rep["frame_probs"] if current_rep["frame_probs"] else [frame_prob]
199
  rep_prob = float(np.mean(probs))
200
-
201
  rep_events.append({
202
  "rep": rep_count,
203
  "start_t": float(current_rep["start_f"] / fps),
@@ -216,7 +203,6 @@ def analyze_pushup_video(
216
 
217
  debug_txt = f"{'L' if left_side else 'R'} vis={ema_vis:.2f} elbow={ema_elbow:.0f} straight={ema_straight:.0f} p={frame_prob:.2f} state={state}"
218
 
219
- # annotate frame
220
  cv2.putText(frame, f"Reps: {rep_count}", (20, 40),
221
  cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA)
222
  cv2.putText(frame, debug_txt[:90], (20, 75),
@@ -234,7 +220,7 @@ def analyze_pushup_video(
234
  "ok": False,
235
  "error": f"Runtime error: {type(e).__name__}: {e}",
236
  "rep_count": rep_count,
237
- "avg_rep_prob": None,
238
  "rep_events": rep_events,
239
  "annotated_video_path": annotated_path,
240
  }
@@ -244,72 +230,72 @@ def analyze_pushup_video(
244
  writer.release()
245
  pose.close()
246
 
247
- avg_rep_prob = None
248
- if rep_events:
249
- avg_rep_prob = float(sum(r["prob"] for r in rep_events) / len(rep_events))
250
 
251
  return {
252
  "ok": True,
253
  "error": None,
254
- "rep_count": int(rep_count),
255
- "avg_rep_prob": avg_rep_prob,
256
  "rep_events": rep_events,
257
  "annotated_video_path": annotated_path,
258
  }
259
 
260
 
261
  # -----------------------
262
- # Gradio API function
263
  # -----------------------
264
- def api_analyze(file_obj: Union[str, Dict[str, Any], None]):
265
  """
266
- Accepts Gradio File input. Depending on Gradio version, it may come in as:
267
- - a string path
268
- - a dict with {"name": "..."}
 
269
  """
270
- if file_obj is None:
271
- return {"ok": False, "error": "No file received."}
272
-
273
- # Extract path robustly
274
- if isinstance(file_obj, str):
275
- src_path = file_obj
276
- elif isinstance(file_obj, dict) and "name" in file_obj:
277
- src_path = file_obj["name"]
278
- else:
279
- return {"ok": False, "error": f"Unsupported file object: {type(file_obj)}"}
280
-
281
- if not os.path.exists(src_path):
282
- return {"ok": False, "error": f"Upload path not found on server: {src_path}"}
283
-
284
  workdir = tempfile.mkdtemp()
285
  in_path = os.path.join(workdir, "input.mp4")
 
 
 
 
 
 
 
 
286
  shutil.copy(src_path, in_path)
287
 
288
  out_path = os.path.join(workdir, "annotated.mp4")
289
  result = analyze_pushup_video(in_path, save_annotated=True, annotated_out_path=out_path)
290
 
291
- # Return only what frontend needs + a file path for annotated video
292
- return {
293
- "ok": result["ok"],
294
- "error": result["error"],
 
 
 
295
  "rep_count": result["rep_count"],
296
  "avg_rep_prob": result["avg_rep_prob"],
297
  "rep_events": result["rep_events"],
298
- "annotated_video_path": result["annotated_video_path"],
299
  }
 
300
 
301
 
302
- # -----------------------
303
- # Gradio app
304
- # -----------------------
305
- with gr.Blocks(title="Pushup Analyzer API") as demo:
306
- gr.Markdown("# Pushup Analyzer (Backend)\nThis Space provides an API + optional Gradio UI.")
307
 
308
- file_in = gr.File(label="Upload video", file_types=["video"])
309
  btn = gr.Button("Analyze")
310
- json_out = gr.JSON(label="Result JSON")
311
 
312
- # api_name is IMPORTANT: frontend will call this endpoint
313
- btn.click(fn=api_analyze, inputs=file_in, outputs=json_out, api_name="analyze")
 
 
 
 
 
 
 
314
 
315
- demo.launch()
 
 
1
  import os
 
 
 
 
 
2
  import cv2
3
  import numpy as np
4
  import mediapipe as mp
5
+ import tempfile
6
+ import shutil
7
  import gradio as gr
8
 
9
 
10
  # -----------------------
11
  # Core pipeline function
12
  # -----------------------
13
+ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated_out_path: str | None = None):
 
 
 
 
14
  """
15
+ Returns:
16
  {
17
  "ok": bool,
18
  "error": str | None,
19
  "rep_count": int,
20
+ "avg_rep_prob": float,
21
  "rep_events": list[dict],
22
  "annotated_video_path": str | None
23
  }
24
  """
 
25
  if not os.path.exists(video_path):
26
  return {
27
  "ok": False,
28
  "error": f"Could not find input video: {video_path}",
29
  "rep_count": 0,
30
+ "avg_rep_prob": 0.0,
31
  "rep_events": [],
32
  "annotated_video_path": None,
33
  }
34
 
 
35
  def clamp(x, lo=0.0, hi=1.0):
36
  return max(lo, min(hi, x))
37
 
 
56
  def ema(prev, x, a=0.25):
57
  return x if prev is None else (a * x + (1 - a) * prev)
58
 
 
59
  mp_pose = mp.solutions.pose
60
  pose = mp_pose.Pose(
61
  static_image_mode=False,
 
66
  min_tracking_confidence=0.5,
67
  )
68
 
 
69
  cap = cv2.VideoCapture(video_path)
70
  if not cap.isOpened():
71
  pose.close()
72
  return {
73
  "ok": False,
74
+ "error": "OpenCV could not open the video. Try a different mp4 encoding.",
75
  "rep_count": 0,
76
+ "avg_rep_prob": 0.0,
77
  "rep_events": [],
78
  "annotated_video_path": None,
79
  }
 
82
  W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
83
  H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
84
 
 
85
  annotated_path = None
86
+ writer = None
87
  if save_annotated:
88
  if annotated_out_path is None:
89
  annotated_out_path = os.path.join(tempfile.mkdtemp(), "annotated.mp4")
 
91
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
92
  writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))
93
 
94
+ state = "UNKNOWN"
95
+ rep_events = []
96
+ current_rep = None
 
97
  rep_count = 0
98
 
99
  ema_elbow = None
 
185
  rep_count += 1
186
  probs = current_rep["frame_probs"] if current_rep["frame_probs"] else [frame_prob]
187
  rep_prob = float(np.mean(probs))
 
188
  rep_events.append({
189
  "rep": rep_count,
190
  "start_t": float(current_rep["start_f"] / fps),
 
203
 
204
  debug_txt = f"{'L' if left_side else 'R'} vis={ema_vis:.2f} elbow={ema_elbow:.0f} straight={ema_straight:.0f} p={frame_prob:.2f} state={state}"
205
 
 
206
  cv2.putText(frame, f"Reps: {rep_count}", (20, 40),
207
  cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA)
208
  cv2.putText(frame, debug_txt[:90], (20, 75),
 
220
  "ok": False,
221
  "error": f"Runtime error: {type(e).__name__}: {e}",
222
  "rep_count": rep_count,
223
+ "avg_rep_prob": float(np.mean([r["prob"] for r in rep_events])) if rep_events else 0.0,
224
  "rep_events": rep_events,
225
  "annotated_video_path": annotated_path,
226
  }
 
230
  writer.release()
231
  pose.close()
232
 
233
+ avg_prob = float(np.mean([r["prob"] for r in rep_events])) if rep_events else 0.0
 
 
234
 
235
  return {
236
  "ok": True,
237
  "error": None,
238
+ "rep_count": rep_count,
239
+ "avg_rep_prob": avg_prob,
240
  "rep_events": rep_events,
241
  "annotated_video_path": annotated_path,
242
  }
243
 
244
 
245
  # -----------------------
246
+ # API function (Space endpoint)
247
  # -----------------------
248
+ def api_analyze(uploaded_file):
249
  """
250
+ uploaded_file is a dict-like object from gr.File in many gradio versions:
251
+ - sometimes it is {"path": "..."}
252
+ - sometimes it is an object with .name
253
+ We'll support both.
254
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
255
  workdir = tempfile.mkdtemp()
256
  in_path = os.path.join(workdir, "input.mp4")
257
+
258
+ # Resolve input path
259
+ if isinstance(uploaded_file, dict) and "path" in uploaded_file:
260
+ src_path = uploaded_file["path"]
261
+ else:
262
+ # gradio can pass a file object with .name
263
+ src_path = getattr(uploaded_file, "name", None) or str(uploaded_file)
264
+
265
  shutil.copy(src_path, in_path)
266
 
267
  out_path = os.path.join(workdir, "annotated.mp4")
268
  result = analyze_pushup_video(in_path, save_annotated=True, annotated_out_path=out_path)
269
 
270
+ if not result["ok"]:
271
+ # Return placeholders so frontend won't crash
272
+ return {"ok": False, "error": result["error"], "rep_count": 0, "avg_rep_prob": 0.0, "rep_events": []}, None
273
+
274
+ summary = {
275
+ "ok": True,
276
+ "error": None,
277
  "rep_count": result["rep_count"],
278
  "avg_rep_prob": result["avg_rep_prob"],
279
  "rep_events": result["rep_events"],
 
280
  }
281
+ return summary, result["annotated_video_path"]
282
 
283
 
284
+ with gr.Blocks(title="Pushup API") as demo:
285
+ gr.Markdown("# Pushup Analyzer API\nUpload a video, get rep count + confidence.\n")
 
 
 
286
 
287
+ video_file = gr.File(label="Upload .mp4", file_types=["video"])
288
  btn = gr.Button("Analyze")
 
289
 
290
+ out_json = gr.JSON(label="Results JSON")
291
+ out_video = gr.Video(label="Annotated Output")
292
+
293
+ btn.click(
294
+ fn=api_analyze,
295
+ inputs=[video_file],
296
+ outputs=[out_json, out_video],
297
+ api_name="analyze", # IMPORTANT: gives you a stable endpoint name
298
+ )
299
 
300
+ if __name__ == "__main__":
301
+ demo.launch()