gibil commited on
Commit
a8034fd
·
verified ·
1 Parent(s): 1846d40

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +268 -245
app.py CHANGED
@@ -1,254 +1,282 @@
1
  import os
 
 
 
 
2
  import cv2
3
  import numpy as np
4
- import mediapipe as mp
5
- import tempfile
6
- import shutil
7
  import gradio as gr
8
 
9
 
10
- # -----------------------
11
- # Core pipeline function
12
- # -----------------------
13
- def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated_out_path: str | None = None):
14
- if not os.path.exists(video_path):
15
- return {
16
- "ok": False,
17
- "error": f"Could not find input video: {video_path}",
18
- "rep_count": 0,
19
- "avg_rep_prob": 0.0,
20
- "rep_events": [],
21
- "annotated_video_path": None,
22
- }
23
-
24
- def clamp(x, lo=0.0, hi=1.0):
25
- return max(lo, min(hi, x))
26
-
27
- def angle_deg(a, b, c):
28
- a = np.array(a, dtype=np.float32)
29
- b = np.array(b, dtype=np.float32)
30
- c = np.array(c, dtype=np.float32)
31
- ba = a - b
32
- bc = c - b
33
- denom = (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-9)
34
- cosang = float(np.dot(ba, bc) / denom)
35
- cosang = max(-1.0, min(1.0, cosang))
36
- return float(np.degrees(np.arccos(cosang)))
37
-
38
- def score_from_range(val, good_lo, good_hi, ok_lo, ok_hi):
39
- if good_lo <= val <= good_hi:
40
- return 1.0
41
- if val < good_lo:
42
- return clamp((val - ok_lo) / (good_lo - ok_lo))
43
- return clamp((ok_hi - val) / (ok_hi - good_hi))
44
-
45
- def ema(prev, x, a=0.25):
46
- return x if prev is None else (a * x + (1 - a) * prev)
47
-
48
- mp_pose = mp.solutions.pose
49
- pose = mp_pose.Pose(
50
- static_image_mode=False,
51
- model_complexity=1,
52
- smooth_landmarks=True,
53
- enable_segmentation=False,
54
- min_detection_confidence=0.5,
55
- min_tracking_confidence=0.5,
56
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
  cap = cv2.VideoCapture(video_path)
59
  if not cap.isOpened():
60
- pose.close()
61
- return {
62
- "ok": False,
63
- "error": "OpenCV could not open the video. Try a different mp4 encoding.",
64
- "rep_count": 0,
65
- "avg_rep_prob": 0.0,
66
- "rep_events": [],
67
- "annotated_video_path": None,
68
- }
69
 
70
  fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
71
- W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
72
- H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
73
-
74
- annotated_path = None
75
- writer = None
76
- if save_annotated:
77
- if annotated_out_path is None:
78
- annotated_out_path = os.path.join(tempfile.mkdtemp(), "annotated.mp4")
79
- annotated_path = annotated_out_path
80
- fourcc = cv2.VideoWriter_fourcc(*"mp4v")
81
- writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))
82
-
83
- state = "UNKNOWN"
84
- rep_events = []
85
- current_rep = None
86
- rep_count = 0
87
-
88
- ema_elbow = None
89
- ema_straight = None
90
- ema_vis = None
91
- alpha = 0.25
92
-
93
- UP_ELBOW_DEG = 155
94
- DOWN_ELBOW_DEG = 105
95
- MIN_VIS = 0.45
96
- MIN_REP_TIME_S = 0.35
97
-
98
- frame_idx = -1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
 
100
- try:
101
- while True:
102
- ok, frame = cap.read()
103
- if not ok:
104
- break
105
- frame_idx += 1
106
-
107
- rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
108
- res = pose.process(rgb)
109
-
110
- frame_prob = 0.0
111
- debug_txt = "No pose"
112
-
113
- if res.pose_landmarks:
114
- lms = res.pose_landmarks.landmark
115
-
116
- Ls = lms[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
117
- Rs = lms[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
118
- left_side = (Ls.visibility >= Rs.visibility)
119
-
120
- if left_side:
121
- shoulder = lms[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
122
- elbow = lms[mp_pose.PoseLandmark.LEFT_ELBOW.value]
123
- wrist = lms[mp_pose.PoseLandmark.LEFT_WRIST.value]
124
- hip = lms[mp_pose.PoseLandmark.LEFT_HIP.value]
125
- ankle = lms[mp_pose.PoseLandmark.LEFT_ANKLE.value]
126
- else:
127
- shoulder = lms[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
128
- elbow = lms[mp_pose.PoseLandmark.RIGHT_ELBOW.value]
129
- wrist = lms[mp_pose.PoseLandmark.RIGHT_WRIST.value]
130
- hip = lms[mp_pose.PoseLandmark.RIGHT_HIP.value]
131
- ankle = lms[mp_pose.PoseLandmark.RIGHT_ANKLE.value]
132
-
133
- vis = float(np.mean([shoulder.visibility, elbow.visibility, wrist.visibility, hip.visibility, ankle.visibility]))
134
- ema_vis = ema(ema_vis, vis, alpha)
135
-
136
- sh = (shoulder.x, shoulder.y)
137
- el = (elbow.x, elbow.y)
138
- wr = (wrist.x, wrist.y)
139
- hp = (hip.x, hip.y)
140
- ak = (ankle.x, ankle.y)
141
-
142
- elbow_deg = angle_deg(sh, el, wr)
143
- straight_deg = angle_deg(sh, hp, ak)
144
-
145
- ema_elbow = ema(ema_elbow, elbow_deg, alpha)
146
- ema_straight = ema(ema_straight, straight_deg, alpha)
147
-
148
- s_straight = score_from_range(ema_straight, 165, 185, 145, 195)
149
- s_elbow = score_from_range(ema_elbow, 85, 175, 60, 190)
150
- s_vis = clamp((ema_vis - MIN_VIS) / (0.85 - MIN_VIS))
151
-
152
- frame_prob = clamp(0.15 + 0.45 * s_elbow + 0.30 * s_straight + 0.10 * s_vis)
153
-
154
- trusted = (ema_vis is not None and ema_vis >= MIN_VIS)
155
-
156
- if trusted:
157
- if state in ["UNKNOWN", "UP"]:
158
- if ema_elbow <= DOWN_ELBOW_DEG and frame_prob >= 0.45:
159
- state = "DOWN"
160
- if current_rep is None:
161
- current_rep = {
162
- "start_f": frame_idx,
163
- "frame_probs": [],
164
- "min_elbow": float(ema_elbow),
165
- "min_straight": float(ema_straight),
166
- }
167
-
168
- elif state == "DOWN":
169
- if ema_elbow >= UP_ELBOW_DEG and frame_prob >= 0.35:
170
- end_f = frame_idx
171
- if current_rep is not None:
172
- duration_s = (end_f - current_rep["start_f"]) / fps
173
- if duration_s >= MIN_REP_TIME_S:
174
- rep_count += 1
175
- probs = current_rep["frame_probs"] if current_rep["frame_probs"] else [frame_prob]
176
- rep_prob = float(np.mean(probs))
177
- rep_events.append({
178
- "rep": rep_count,
179
- "start_t": float(current_rep["start_f"] / fps),
180
- "end_t": float(end_f / fps),
181
- "prob": float(rep_prob),
182
- "min_elbow": float(current_rep["min_elbow"]),
183
- "min_straight": float(current_rep["min_straight"]),
184
- })
185
- current_rep = None
186
- state = "UP"
187
-
188
- if current_rep is not None:
189
- current_rep["frame_probs"].append(float(frame_prob))
190
- current_rep["min_elbow"] = float(min(current_rep["min_elbow"], ema_elbow))
191
- current_rep["min_straight"] = float(min(current_rep["min_straight"], ema_straight))
192
-
193
- debug_txt = f"{'L' if left_side else 'R'} vis={ema_vis:.2f} elbow={ema_elbow:.0f} straight={ema_straight:.0f} p={frame_prob:.2f} state={state}"
194
-
195
- cv2.putText(frame, f"Reps: {rep_count}", (20, 40),
196
- cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA)
197
- cv2.putText(frame, debug_txt[:90], (20, 75),
198
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA)
199
-
200
- if writer is not None:
201
- writer.write(frame)
202
 
203
- except Exception as e:
204
- cap.release()
205
- if writer is not None:
206
- writer.release()
207
- pose.close()
208
- return {
209
- "ok": False,
210
- "error": f"Runtime error: {type(e).__name__}: {e}",
211
- "rep_count": rep_count,
212
- "avg_rep_prob": float(np.mean([r["prob"] for r in rep_events])) if rep_events else 0.0,
213
- "rep_events": rep_events,
214
- "annotated_video_path": annotated_path,
215
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
 
217
- cap.release()
218
- if writer is not None:
219
- writer.release()
220
- pose.close()
 
 
 
221
 
222
- avg_prob = float(np.mean([r["prob"] for r in rep_events])) if rep_events else 0.0
 
223
 
224
- return {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
225
  "ok": True,
226
  "error": None,
227
- "rep_count": rep_count,
228
- "avg_rep_prob": avg_prob,
229
- "rep_events": rep_events,
230
- "annotated_video_path": annotated_path,
231
  }
232
 
 
233
 
234
- # -----------------------
235
- # API function (Space endpoint)
236
- # -----------------------
 
237
  def api_analyze(uploaded_file):
238
- """
239
- Gradio can pass:
240
- - a FileData object with .path
241
- - a dict with "path"
242
- - a file-like with .name
243
- """
244
  workdir = tempfile.mkdtemp()
245
  in_path = os.path.join(workdir, "input.mp4")
246
 
247
  # Resolve source path robustly
248
  src_path = None
249
- if uploaded_file is None:
250
- return {"ok": False, "error": "No file received.", "rep_count": 0, "avg_rep_prob": 0.0, "rep_events": []}, None
251
-
252
  if hasattr(uploaded_file, "path") and uploaded_file.path:
253
  src_path = uploaded_file.path
254
  elif isinstance(uploaded_file, dict) and uploaded_file.get("path"):
@@ -258,46 +286,41 @@ def api_analyze(uploaded_file):
258
  else:
259
  src_path = str(uploaded_file)
260
 
261
- # Optional: enforce extension yourself (Gradio can mis-detect MIME)
262
  ext = os.path.splitext(src_path)[1].lower()
263
  allowed = {".mp4", ".mov", ".webm", ".mkv"}
264
  if ext and ext not in allowed:
265
- return {"ok": False, "error": f"Unsupported extension: {ext}. Use mp4/mov/webm/mkv.", "rep_count": 0, "avg_rep_prob": 0.0, "rep_events": []}, None
266
 
267
  shutil.copy(src_path, in_path)
268
 
269
- out_path = os.path.join(workdir, "annotated.mp4")
270
- result = analyze_pushup_video(in_path, save_annotated=True, annotated_out_path=out_path)
271
-
272
- if not result["ok"]:
273
- return {"ok": False, "error": result["error"], "rep_count": 0, "avg_rep_prob": 0.0, "rep_events": []}, None
274
-
275
- summary = {
276
- "ok": True,
277
- "error": None,
278
- "rep_count": result["rep_count"],
279
- "avg_rep_prob": result["avg_rep_prob"],
280
- "rep_events": result["rep_events"],
281
- }
282
- return summary, result["annotated_video_path"]
283
 
284
 
285
- with gr.Blocks(title="Pushup API") as demo:
286
- gr.Markdown("# Pushup Analyzer API\nUpload a video, get rep count + confidence.\n")
 
 
 
287
 
288
- # KEY CHANGE: accept any video to avoid Gradio rejecting it
289
  video_file = gr.File(label="Upload video")
290
 
291
  btn = gr.Button("Analyze")
292
  out_json = gr.JSON(label="Results JSON")
293
  out_video = gr.Video(label="Annotated Output")
 
294
 
295
  btn.click(
296
  fn=api_analyze,
297
  inputs=[video_file],
298
- outputs=[out_json, out_video],
299
  api_name="analyze",
300
  )
301
 
302
  if __name__ == "__main__":
303
- demo.launch()
 
1
  import os
2
+ import math
3
+ import shutil
4
+ import tempfile
5
+
6
  import cv2
7
  import numpy as np
8
+ import pandas as pd
9
+ from scipy.signal import savgol_filter
 
10
  import gradio as gr
11
 
12
 
13
+ # ----------------------------
14
+ # Settings (same as Colab)
15
+ # ----------------------------
16
+ UP_ANGLE = 155
17
+ DOWN_ANGLE = 105
18
+ MIN_REP_FRAMES = 8
19
+ FRAME_STRIDE = 1
20
+
21
+
22
+ # ----------------------------
23
+ # Load YOLO pose model (lazy)
24
+ # ----------------------------
25
+ _MODEL = None
26
+
27
+ def load_pose_model():
28
+ global _MODEL
29
+ if _MODEL is not None:
30
+ return _MODEL
31
+
32
+ from ultralytics import YOLO
33
+ last_err = None
34
+ for w in ["yolo11n-pose.pt", "yolov8n-pose.pt"]:
35
+ try:
36
+ _MODEL = YOLO(w)
37
+ print("Loaded model:", w)
38
+ return _MODEL
39
+ except Exception as e:
40
+ last_err = e
41
+
42
+ raise RuntimeError(f"Could not load YOLO pose model. Last error: {last_err}")
43
+
44
+
45
+ # ----------------------------
46
+ # Helpers (from your script)
47
+ # ----------------------------
48
+ def angle_deg(a, b, c):
49
+ a = np.asarray(a, dtype=np.float32)
50
+ b = np.asarray(b, dtype=np.float32)
51
+ c = np.asarray(c, dtype=np.float32)
52
+ ba = a - b
53
+ bc = c - b
54
+ denom = (np.linalg.norm(ba) * np.linalg.norm(bc)) + 1e-9
55
+ cosv = np.clip(np.dot(ba, bc) / denom, -1.0, 1.0)
56
+ return float(math.degrees(math.acos(cosv)))
57
+
58
+ def pick_best_side(kxy, kconf):
59
+ left = [5, 7, 9] # L shoulder, L elbow, L wrist (YOLO COCO indices)
60
+ right = [6, 8, 10] # R shoulder, R elbow, R wrist
61
+ if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])):
62
+ return right, float(np.mean(kconf[right]))
63
+ return left, float(np.mean(kconf[left]))
64
+
65
+ def sigmoid(x):
66
+ return 1.0 / (1.0 + math.exp(-x))
67
+
68
+ def rep_likelihood(min_ang, max_ang, mean_conf):
69
+ ang_range = max_ang - min_ang
70
+ range_score = sigmoid((ang_range - 45) / 10)
71
+ depth_score = sigmoid((DOWN_ANGLE - min_ang) / 8)
72
+ lockout_score = sigmoid((max_ang - UP_ANGLE) / 8)
73
+ conf_score = float(np.clip(mean_conf, 0.0, 1.0))
74
+ return float(np.clip(range_score * depth_score * lockout_score * conf_score, 0.0, 1.0))
75
+
76
+ def likelihood_to_score(p):
77
+ p = float(np.clip(p, 0.0, 1.0))
78
+ buckets = [
79
+ (0.50, 1.00, 90, 100),
80
+ (0.45, 0.50, 80, 89),
81
+ (0.40, 0.45, 70, 79),
82
+ (0.35, 0.40, 60, 69),
83
+ (0.30, 0.35, 50, 59),
84
+ (0.25, 0.30, 40, 49),
85
+ (0.20, 0.25, 30, 39),
86
+ (0.15, 0.20, 20, 29),
87
+ (0.10, 0.15, 10, 19),
88
+ (0.00, 0.10, 0, 9),
89
+ ]
90
+ for lo, hi, s_lo, s_hi in buckets:
91
+ if (lo <= p < hi) or (p == 1.0 and hi == 1.0):
92
+ t = (p - lo) / max(hi - lo, 1e-6)
93
+ return int(round(s_lo + t * (s_hi - s_lo)))
94
+ return 0
95
+
96
+
97
+ # ----------------------------
98
+ # Core pipeline
99
+ # ----------------------------
100
+ def analyze_pushup_video_yolo(video_path: str, out_dir: str):
101
+ model = load_pose_model()
102
 
103
  cap = cv2.VideoCapture(video_path)
104
  if not cap.isOpened():
105
+ raise RuntimeError("OpenCV could not open the video. Try a different mp4 encoding.")
 
 
 
 
 
 
 
 
106
 
107
  fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
108
+ w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
109
+ h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
110
+
111
+ # 1) First pass: compute angles + confs per sampled frame
112
+ angles, confs, frame_ids = [], [], []
113
+ frame_i = 0
114
+
115
+ while True:
116
+ ok, frame = cap.read()
117
+ if not ok:
118
+ break
119
+
120
+ if frame_i % FRAME_STRIDE != 0:
121
+ frame_i += 1
122
+ continue
123
+
124
+ res = model(frame, verbose=False)[0]
125
+ if res.keypoints is None or len(res.keypoints.xy) == 0:
126
+ angles.append(np.nan)
127
+ confs.append(0.0)
128
+ frame_ids.append(frame_i)
129
+ frame_i += 1
130
+ continue
131
+
132
+ kxy_all = res.keypoints.xy.cpu().numpy()
133
+ kconf_all = res.keypoints.conf.cpu().numpy()
134
+
135
+ # choose best person by mean confidence
136
+ pidx = int(np.argmax(np.mean(kconf_all, axis=1)))
137
+ kxy = kxy_all[pidx]
138
+ kconf = kconf_all[pidx]
139
+
140
+ ids, side_conf = pick_best_side(kxy, kconf)
141
+ if side_conf < 0.2:
142
+ angles.append(np.nan)
143
+ confs.append(float(side_conf))
144
+ frame_ids.append(frame_i)
145
+ frame_i += 1
146
+ continue
147
+
148
+ a, b, c = kxy[ids[0]], kxy[ids[1]], kxy[ids[2]]
149
+ angles.append(angle_deg(a, b, c))
150
+ confs.append(float(side_conf))
151
+ frame_ids.append(frame_i)
152
+ frame_i += 1
153
 
154
+ cap.release()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
 
156
+ angles = np.array(angles, dtype=np.float32)
157
+ confs = np.array(confs, dtype=np.float32)
158
+ frame_ids = np.array(frame_ids, dtype=np.int32)
159
+
160
+ if len(angles) < 5:
161
+ raise RuntimeError("Video too short or no usable frames detected.")
162
+
163
+ mask = np.isfinite(angles)
164
+ if np.any(mask) and not np.all(mask):
165
+ angles[~mask] = np.interp(frame_ids[~mask], frame_ids[mask], angles[mask])
166
+ elif not np.any(mask):
167
+ raise RuntimeError("No valid pose angles detected.")
168
+
169
+ win = min(31, (len(angles) // 2) * 2 + 1)
170
+ win = max(win, 5) # savgol requires >= 5 for polyorder=2 comfortably
171
+ angles_smooth = savgol_filter(angles, win, 2)
172
+
173
+ # 2) Rep detection on smoothed angles
174
+ reps = []
175
+ state = "WAIT_DOWN"
176
+ rep_min = rep_max = rep_conf_sum = rep_len = rep_start = None
177
+
178
+ for i, ang in enumerate(angles_smooth):
179
+ cf = float(confs[i])
180
+
181
+ if state == "WAIT_DOWN":
182
+ if ang <= DOWN_ANGLE:
183
+ state = "IN_DOWN"
184
+ rep_min = rep_max = float(ang)
185
+ rep_conf_sum = cf
186
+ rep_len = 1
187
+ rep_start = i
188
+ else:
189
+ rep_min = min(rep_min, float(ang))
190
+ rep_max = max(rep_max, float(ang))
191
+ rep_conf_sum += cf
192
+ rep_len += 1
193
+
194
+ if ang >= UP_ANGLE:
195
+ if rep_len >= MIN_REP_FRAMES:
196
+ mean_cf = float(rep_conf_sum / rep_len)
197
+ likelihood = rep_likelihood(rep_min, rep_max, mean_cf)
198
+ score = likelihood_to_score(likelihood)
199
+
200
+ sf = int(frame_ids[rep_start])
201
+ ef = int(frame_ids[i])
202
+
203
+ reps.append({
204
+ "rep": len(reps) + 1,
205
+ "start_frame": sf,
206
+ "end_frame": ef,
207
+ "start_time_s": float(sf / fps),
208
+ "end_time_s": float(ef / fps),
209
+ "min_elbow_angle": float(rep_min),
210
+ "max_elbow_angle": float(rep_max),
211
+ "mean_kpt_conf": float(mean_cf),
212
+ "pushup_likelihood": float(likelihood),
213
+ "pushup_score": int(score),
214
+ })
215
+
216
+ state = "WAIT_DOWN"
217
+
218
+ # 3) Save CSV
219
+ csv_path = os.path.join(out_dir, "pushup_reps.csv")
220
+ df = pd.DataFrame(reps)
221
+ df.to_csv(csv_path, index=False)
222
+
223
+ # 4) Annotated video
224
+ annotated_path = os.path.join(out_dir, "pushup_annotated.mp4")
225
+ cap = cv2.VideoCapture(video_path)
226
+ fourcc = cv2.VideoWriter_fourcc(*"mp4v")
227
+ writer = cv2.VideoWriter(annotated_path, fourcc, fps, (w, h))
228
 
229
+ rep_windows = [(r["start_frame"], r["end_frame"], r["pushup_score"]) for r in reps]
230
+
231
+ frame_i = 0
232
+ while True:
233
+ ok, frame = cap.read()
234
+ if not ok:
235
+ break
236
 
237
+ active = next((s for sf, ef, s in rep_windows if sf <= frame_i <= ef), None)
238
+ count = sum(1 for _, ef, _ in rep_windows if ef < frame_i)
239
 
240
+ j = int(min(np.searchsorted(frame_ids, frame_i), len(angles_smooth) - 1))
241
+ ang_disp = float(angles_smooth[j])
242
+
243
+ cv2.putText(frame, f"Reps: {count}/{len(reps)}", (20, 40),
244
+ cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,255), 2)
245
+ cv2.putText(frame, f"Elbow angle: {ang_disp:.1f}", (20, 80),
246
+ cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
247
+ cv2.putText(frame, f"Rep score: {active if active is not None else '-'}", (20, 120),
248
+ cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
249
+
250
+ writer.write(frame)
251
+ frame_i += 1
252
+
253
+ cap.release()
254
+ writer.release()
255
+
256
+ summary = {
257
  "ok": True,
258
  "error": None,
259
+ "rep_count": int(len(reps)),
260
+ "avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0,
261
+ "avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0,
262
+ "rep_events": reps,
263
  }
264
 
265
+ return summary, annotated_path, csv_path
266
 
267
+
268
+ # ----------------------------
269
+ # API wrapper (robust file handling like your old one)
270
+ # ----------------------------
271
  def api_analyze(uploaded_file):
272
+ if uploaded_file is None:
273
+ return {"ok": False, "error": "No file received.", "rep_count": 0, "rep_events": []}, None, None
274
+
 
 
 
275
  workdir = tempfile.mkdtemp()
276
  in_path = os.path.join(workdir, "input.mp4")
277
 
278
  # Resolve source path robustly
279
  src_path = None
 
 
 
280
  if hasattr(uploaded_file, "path") and uploaded_file.path:
281
  src_path = uploaded_file.path
282
  elif isinstance(uploaded_file, dict) and uploaded_file.get("path"):
 
286
  else:
287
  src_path = str(uploaded_file)
288
 
289
+ # Optional extension check (same idea as your old code)
290
  ext = os.path.splitext(src_path)[1].lower()
291
  allowed = {".mp4", ".mov", ".webm", ".mkv"}
292
  if ext and ext not in allowed:
293
+ return {"ok": False, "error": f"Unsupported extension: {ext}. Use mp4/mov/webm/mkv.", "rep_count": 0, "rep_events": []}, None, None
294
 
295
  shutil.copy(src_path, in_path)
296
 
297
+ try:
298
+ summary, annotated_path, csv_path = analyze_pushup_video_yolo(in_path, out_dir=workdir)
299
+ return summary, annotated_path, csv_path
300
+ except Exception as e:
301
+ return {"ok": False, "error": f"{type(e).__name__}: {e}", "rep_count": 0, "rep_events": []}, None, None
 
 
 
 
 
 
 
 
 
302
 
303
 
304
+ # ----------------------------
305
+ # Gradio UI + API endpoint
306
+ # ----------------------------
307
+ with gr.Blocks(title="Pushup API (YOLO)") as demo:
308
+ gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n")
309
 
310
+ # IMPORTANT: keep this as gr.File to avoid the “Invalid file type: ['video']” problem you hit before
311
  video_file = gr.File(label="Upload video")
312
 
313
  btn = gr.Button("Analyze")
314
  out_json = gr.JSON(label="Results JSON")
315
  out_video = gr.Video(label="Annotated Output")
316
+ out_csv = gr.File(label="CSV Output")
317
 
318
  btn.click(
319
  fn=api_analyze,
320
  inputs=[video_file],
321
+ outputs=[out_json, out_video, out_csv],
322
  api_name="analyze",
323
  )
324
 
325
  if __name__ == "__main__":
326
+ demo.launch()