gibil commited on
Commit
637375e
·
verified ·
1 Parent(s): c1e06d5

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +114 -33
app.py CHANGED
@@ -1,22 +1,37 @@
1
  import os
2
- import shutil
3
- import tempfile
4
- from typing import Optional, Dict, Any, List
5
-
6
  import cv2
7
  import numpy as np
8
  import mediapipe as mp
9
 
10
- from fastapi import FastAPI, UploadFile, File
11
- from fastapi.responses import JSONResponse
12
-
13
- app = FastAPI()
14
-
15
-
16
- def analyze_pushup_video(video_path: str, save_annotated: bool = False, annotated_out_path: Optional[str] = None) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
  if not os.path.exists(video_path):
18
- return {"ok": False, "error": f"Could not find input video: {video_path}", "rep_count": 0, "rep_events": [], "annotated_video_path": None}
 
19
 
 
20
  def clamp(x, lo=0.0, hi=1.0):
21
  return max(lo, min(hi, x))
22
 
@@ -41,6 +56,7 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = False, annotate
41
  def ema(prev, x, a=0.25):
42
  return x if prev is None else (a * x + (1 - a) * prev)
43
 
 
44
  mp_pose = mp.solutions.pose
45
  pose = mp_pose.Pose(
46
  static_image_mode=False,
@@ -54,11 +70,22 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = False, annotate
54
  cap = cv2.VideoCapture(video_path)
55
  if not cap.isOpened():
56
  pose.close()
57
- return {"ok": False, "error": "OpenCV could not open the video. Try a different mp4 encoding.", "rep_count": 0, "rep_events": [], "annotated_video_path": None}
 
 
 
 
 
 
 
 
 
 
 
58
 
59
- fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
60
- W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
61
- H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
62
 
63
  annotated_path = None
64
  writer = None
@@ -68,9 +95,14 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = False, annotate
68
  annotated_path = annotated_out_path
69
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
70
  writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))
 
 
 
 
71
 
 
72
  state = "UNKNOWN"
73
- rep_events: List[Dict[str, Any]] = []
74
  current_rep = None
75
  rep_count = 0
76
 
@@ -93,10 +125,14 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = False, annotate
93
  break
94
  frame_idx += 1
95
 
 
 
 
96
  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
97
  res = pose.process(rgb)
98
 
99
  frame_prob = 0.0
 
100
 
101
  if res.pose_landmarks:
102
  lms = res.pose_landmarks.landmark
@@ -135,7 +171,7 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = False, annotate
135
 
136
  s_straight = score_from_range(ema_straight, 165, 185, 145, 195)
137
  s_elbow = score_from_range(ema_elbow, 85, 175, 60, 190)
138
- s_vis = clamp((ema_vis - MIN_VIS) / (0.85 - MIN_VIS))
139
 
140
  frame_prob = clamp(0.15 + 0.45 * s_elbow + 0.30 * s_straight + 0.10 * s_vis)
141
 
@@ -146,8 +182,12 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = False, annotate
146
  if ema_elbow <= DOWN_ELBOW_DEG and frame_prob >= 0.45:
147
  state = "DOWN"
148
  if current_rep is None:
149
- current_rep = {"start_f": frame_idx, "frame_probs": [], "min_elbow": float(ema_elbow), "min_straight": float(ema_straight)}
150
-
 
 
 
 
151
  elif state == "DOWN":
152
  if ema_elbow >= UP_ELBOW_DEG and frame_prob >= 0.35:
153
  end_f = frame_idx
@@ -173,6 +213,13 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = False, annotate
173
  current_rep["min_elbow"] = float(min(current_rep["min_elbow"], ema_elbow))
174
  current_rep["min_straight"] = float(min(current_rep["min_straight"], ema_straight))
175
 
 
 
 
 
 
 
 
176
  if writer is not None:
177
  writer.write(frame)
178
 
@@ -181,30 +228,64 @@ def analyze_pushup_video(video_path: str, save_annotated: bool = False, annotate
181
  if writer is not None:
182
  writer.release()
183
  pose.close()
184
- return {"ok": False, "error": f"Runtime error: {type(e).__name__}: {e}", "rep_count": rep_count, "rep_events": rep_events, "annotated_video_path": annotated_path}
 
185
 
186
  cap.release()
187
  if writer is not None:
188
  writer.release()
189
  pose.close()
190
 
191
- return {"ok": True, "error": None, "rep_count": rep_count, "rep_events": rep_events, "annotated_video_path": annotated_path}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192
 
193
 
194
- @app.post("/analyze")
195
- async def analyze(video: UploadFile = File(...)):
196
  workdir = tempfile.mkdtemp()
197
  in_path = os.path.join(workdir, "input.mp4")
198
 
199
- with open(in_path, "wb") as f:
200
- shutil.copyfileobj(video.file, f)
201
 
202
- result = analyze_pushup_video(in_path, save_annotated=False)
 
203
 
204
- # clean temp (optional)
205
- try:
206
- shutil.rmtree(workdir)
207
- except:
208
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209
 
210
- return JSONResponse(result)
 
 
1
  import os
 
 
 
 
2
  import cv2
3
  import numpy as np
4
  import mediapipe as mp
5
 
6
+ import gradio as gr
7
+ import tempfile
8
+ import shutil
9
+ from typing import Optional, Any, Dict
10
+
11
+
12
+ # -----------------------
13
+ # Core pipeline function
14
+ # -----------------------
15
+ def analyze_pushup_video(
16
+ video_path: str,
17
+ save_annotated: bool = True,
18
+ annotated_out_path: Optional[str] = None,
19
+ ):
20
+ """
21
+ Returns dict:
22
+ {
23
+ "ok": bool,
24
+ "error": str|None,
25
+ "rep_count": int,
26
+ "rep_events": list[dict],
27
+ "annotated_video_path": str|None
28
+ }
29
+ """
30
  if not os.path.exists(video_path):
31
+ return {"ok": False, "error": f"Could not find input video: {video_path}",
32
+ "rep_count": 0, "rep_events": [], "annotated_video_path": None}
33
 
34
+ # ---------- helpers ----------
35
  def clamp(x, lo=0.0, hi=1.0):
36
  return max(lo, min(hi, x))
37
 
 
56
  def ema(prev, x, a=0.25):
57
  return x if prev is None else (a * x + (1 - a) * prev)
58
 
59
+ # ---------- pose ----------
60
  mp_pose = mp.solutions.pose
61
  pose = mp_pose.Pose(
62
  static_image_mode=False,
 
70
  cap = cv2.VideoCapture(video_path)
71
  if not cap.isOpened():
72
  pose.close()
73
+ return {"ok": False, "error": "OpenCV could not open the video. Try a different MP4 encoding.",
74
+ "rep_count": 0, "rep_events": [], "annotated_video_path": None}
75
+
76
+ fps = float(cap.get(cv2.CAP_PROP_FPS) or 30.0)
77
+ W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
78
+ H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
79
+
80
+ if W <= 0 or H <= 0:
81
+ cap.release()
82
+ pose.close()
83
+ return {"ok": False, "error": f"Video decode failed (W={W}, H={H}). Try re-encoding the video.",
84
+ "rep_count": 0, "rep_events": [], "annotated_video_path": None}
85
 
86
+ # Some encoders hate odd dimensions
87
+ if W % 2 == 1: W -= 1
88
+ if H % 2 == 1: H -= 1
89
 
90
  annotated_path = None
91
  writer = None
 
95
  annotated_path = annotated_out_path
96
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
97
  writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))
98
+ if not writer.isOpened():
99
+ # Don’t crash the whole run if writing fails
100
+ writer = None
101
+ annotated_path = None
102
 
103
+ # ---------- detection ----------
104
  state = "UNKNOWN"
105
+ rep_events = []
106
  current_rep = None
107
  rep_count = 0
108
 
 
125
  break
126
  frame_idx += 1
127
 
128
+ # Resize to the exact writer size if we adjusted odd dims
129
+ frame = frame[:H, :W]
130
+
131
  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
132
  res = pose.process(rgb)
133
 
134
  frame_prob = 0.0
135
+ debug_txt = "No pose"
136
 
137
  if res.pose_landmarks:
138
  lms = res.pose_landmarks.landmark
 
171
 
172
  s_straight = score_from_range(ema_straight, 165, 185, 145, 195)
173
  s_elbow = score_from_range(ema_elbow, 85, 175, 60, 190)
174
+ s_vis = clamp((ema_vis - MIN_VIS) / (0.85 - MIN_VIS)) if ema_vis is not None else 0.0
175
 
176
  frame_prob = clamp(0.15 + 0.45 * s_elbow + 0.30 * s_straight + 0.10 * s_vis)
177
 
 
182
  if ema_elbow <= DOWN_ELBOW_DEG and frame_prob >= 0.45:
183
  state = "DOWN"
184
  if current_rep is None:
185
+ current_rep = {
186
+ "start_f": frame_idx,
187
+ "frame_probs": [],
188
+ "min_elbow": float(ema_elbow),
189
+ "min_straight": float(ema_straight),
190
+ }
191
  elif state == "DOWN":
192
  if ema_elbow >= UP_ELBOW_DEG and frame_prob >= 0.35:
193
  end_f = frame_idx
 
213
  current_rep["min_elbow"] = float(min(current_rep["min_elbow"], ema_elbow))
214
  current_rep["min_straight"] = float(min(current_rep["min_straight"], ema_straight))
215
 
216
+ debug_txt = f"{'L' if left_side else 'R'} vis={ema_vis:.2f} elbow={ema_elbow:.0f} straight={ema_straight:.0f} p={frame_prob:.2f} state={state}"
217
+
218
+ cv2.putText(frame, f"Reps: {rep_count}", (20, 40),
219
+ cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA)
220
+ cv2.putText(frame, debug_txt[:90], (20, 75),
221
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA)
222
+
223
  if writer is not None:
224
  writer.write(frame)
225
 
 
228
  if writer is not None:
229
  writer.release()
230
  pose.close()
231
+ return {"ok": False, "error": f"Runtime error: {type(e).__name__}: {e}",
232
+ "rep_count": rep_count, "rep_events": rep_events, "annotated_video_path": annotated_path}
233
 
234
  cap.release()
235
  if writer is not None:
236
  writer.release()
237
  pose.close()
238
 
239
+ return {"ok": True, "error": None, "rep_count": rep_count,
240
+ "rep_events": rep_events, "annotated_video_path": annotated_path}
241
+
242
+
243
+ # -----------------------
244
+ # Gradio wrapper
245
+ # -----------------------
246
+ def _extract_video_path(video_input: Any) -> str:
247
+ # Gradio sometimes gives a string path, sometimes a dict with "path"
248
+ if isinstance(video_input, str):
249
+ return video_input
250
+ if isinstance(video_input, dict) and "path" in video_input:
251
+ return video_input["path"]
252
+ # Some versions use "name"
253
+ if isinstance(video_input, dict) and "name" in video_input:
254
+ return video_input["name"]
255
+ raise ValueError(f"Unexpected video input type: {type(video_input)} value={video_input}")
256
 
257
 
258
+ def gradio_run(video_file):
 
259
  workdir = tempfile.mkdtemp()
260
  in_path = os.path.join(workdir, "input.mp4")
261
 
262
+ src_path = _extract_video_path(video_file)
263
+ shutil.copy(src_path, in_path)
264
 
265
+ out_path = os.path.join(workdir, "annotated.mp4")
266
+ result = analyze_pushup_video(in_path, save_annotated=True, annotated_out_path=out_path)
267
 
268
+ if not result["ok"]:
269
+ return "Error: " + str(result["error"]), None, []
270
+
271
+ summary = f"Rep count: {result['rep_count']}\n"
272
+ if result["rep_events"]:
273
+ avg_prob = sum(r["prob"] for r in result["rep_events"]) / len(result["rep_events"])
274
+ summary += f"Avg rep probability: {avg_prob:.2f}\n"
275
+
276
+ return summary, result["annotated_video_path"], result["rep_events"]
277
+
278
+
279
+ demo = gr.Interface(
280
+ fn=gradio_run,
281
+ inputs=gr.Video(label="Upload pushup video"),
282
+ outputs=[
283
+ gr.Textbox(label="Results"),
284
+ gr.Video(label="Annotated output"),
285
+ gr.JSON(label="Per-rep details"),
286
+ ],
287
+ allow_flagging="never",
288
+ )
289
 
290
+ if __name__ == "__main__":
291
+ demo.launch()