gibil commited on
Commit
3d2ecec
·
verified ·
1 Parent(s): 1dded61

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +298 -0
app.py ADDED
@@ -0,0 +1,298 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import math
3
+ import cv2
4
+ import numpy as np
5
+ import mediapipe as mp
6
+
7
+ import gradio as gr
8
+ import tempfile
9
+ import shutil
10
+
11
+
12
+ # -----------------------
13
+ # Core pipeline function
14
+ # -----------------------
15
+ def analyze_pushup_video(video_path: str, save_annotated: bool = True, annotated_out_path: str | None = None):
16
+ """
17
+ Runs MediaPipe Pose on a video, counts pushup reps, and returns:
18
+ {
19
+ "ok": bool,
20
+ "error": str | None,
21
+ "rep_count": int,
22
+ "rep_events": list[dict],
23
+ "annotated_video_path": str | None
24
+ }
25
+ """
26
+ if not os.path.exists(video_path):
27
+ return {
28
+ "ok": False,
29
+ "error": f"Could not find input video: {video_path}",
30
+ "rep_count": 0,
31
+ "rep_events": [],
32
+ "annotated_video_path": None,
33
+ }
34
+
35
+ # ---------- Math helpers ----------
36
+ def clamp(x, lo=0.0, hi=1.0):
37
+ return max(lo, min(hi, x))
38
+
39
+ def angle_deg(a, b, c):
40
+ """Angle ABC in degrees using points a,b,c as (x,y)."""
41
+ a = np.array(a, dtype=np.float32)
42
+ b = np.array(b, dtype=np.float32)
43
+ c = np.array(c, dtype=np.float32)
44
+ ba = a - b
45
+ bc = c - b
46
+ denom = (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-9)
47
+ cosang = float(np.dot(ba, bc) / denom)
48
+ cosang = max(-1.0, min(1.0, cosang))
49
+ return float(np.degrees(np.arccos(cosang)))
50
+
51
+ def score_from_range(val, good_lo, good_hi, ok_lo, ok_hi):
52
+ """
53
+ Returns 1 if val in [good_lo, good_hi],
54
+ fades to 0 by the time it reaches ok_lo/ok_hi.
55
+ """
56
+ if good_lo <= val <= good_hi:
57
+ return 1.0
58
+ if val < good_lo:
59
+ return clamp((val - ok_lo) / (good_lo - ok_lo))
60
+ else:
61
+ return clamp((ok_hi - val) / (ok_hi - good_hi))
62
+
63
+ def ema(prev, x, a=0.25):
64
+ return x if prev is None else (a * x + (1 - a) * prev)
65
+
66
+ # ---------- Pose setup ----------
67
+ mp_pose = mp.solutions.pose
68
+ pose = mp_pose.Pose(
69
+ static_image_mode=False,
70
+ model_complexity=1,
71
+ smooth_landmarks=True,
72
+ enable_segmentation=False,
73
+ min_detection_confidence=0.5,
74
+ min_tracking_confidence=0.5,
75
+ )
76
+
77
+ # ---------- Video I/O ----------
78
+ cap = cv2.VideoCapture(video_path)
79
+ if not cap.isOpened():
80
+ pose.close()
81
+ return {
82
+ "ok": False,
83
+ "error": "OpenCV could not open the video. Try a different mp4 encoding.",
84
+ "rep_count": 0,
85
+ "rep_events": [],
86
+ "annotated_video_path": None,
87
+ }
88
+
89
+ fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
90
+ W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
91
+ H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0
92
+
93
+ # Output path handling
94
+ annotated_path = None
95
+ writer = None
96
+ if save_annotated:
97
+ if annotated_out_path is None:
98
+ annotated_out_path = os.path.join(tempfile.mkdtemp(), "annotated.mp4")
99
+ annotated_path = annotated_out_path
100
+ fourcc = cv2.VideoWriter_fourcc(*"mp4v")
101
+ writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))
102
+
103
+ # ---------- Pushup detection logic ----------
104
+ state = "UNKNOWN" # "UP" or "DOWN"
105
+ rep_events = []
106
+ current_rep = None
107
+ rep_count = 0
108
+
109
+ ema_elbow = None
110
+ ema_straight = None
111
+ ema_vis = None
112
+ alpha = 0.25
113
+
114
+ UP_ELBOW_DEG = 155
115
+ DOWN_ELBOW_DEG = 105
116
+ MIN_VIS = 0.45
117
+ MIN_REP_TIME_S = 0.35
118
+
119
+ frame_idx = -1
120
+
121
+ try:
122
+ while True:
123
+ ok, frame = cap.read()
124
+ if not ok:
125
+ break
126
+ frame_idx += 1
127
+
128
+ rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
129
+ res = pose.process(rgb)
130
+
131
+ frame_prob = 0.0
132
+ debug_txt = "No pose"
133
+
134
+ if res.pose_landmarks:
135
+ lms = res.pose_landmarks.landmark
136
+
137
+ # Choose side: whichever shoulder has higher visibility
138
+ Ls = lms[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
139
+ Rs = lms[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
140
+ left_side = (Ls.visibility >= Rs.visibility)
141
+
142
+ if left_side:
143
+ shoulder = lms[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
144
+ elbow = lms[mp_pose.PoseLandmark.LEFT_ELBOW.value]
145
+ wrist = lms[mp_pose.PoseLandmark.LEFT_WRIST.value]
146
+ hip = lms[mp_pose.PoseLandmark.LEFT_HIP.value]
147
+ ankle = lms[mp_pose.PoseLandmark.LEFT_ANKLE.value]
148
+ else:
149
+ shoulder = lms[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
150
+ elbow = lms[mp_pose.PoseLandmark.RIGHT_ELBOW.value]
151
+ wrist = lms[mp_pose.PoseLandmark.RIGHT_WRIST.value]
152
+ hip = lms[mp_pose.PoseLandmark.RIGHT_HIP.value]
153
+ ankle = lms[mp_pose.PoseLandmark.RIGHT_ANKLE.value]
154
+
155
+ vis = float(np.mean([shoulder.visibility, elbow.visibility, wrist.visibility, hip.visibility, ankle.visibility]))
156
+ ema_vis = ema(ema_vis, vis, alpha)
157
+
158
+ sh = (shoulder.x, shoulder.y)
159
+ el = (elbow.x, elbow.y)
160
+ wr = (wrist.x, wrist.y)
161
+ hp = (hip.x, hip.y)
162
+ ak = (ankle.x, ankle.y)
163
+
164
+ elbow_deg = angle_deg(sh, el, wr)
165
+ straight_deg = angle_deg(sh, hp, ak)
166
+
167
+ ema_elbow = ema(ema_elbow, elbow_deg, alpha)
168
+ ema_straight = ema(ema_straight, straight_deg, alpha)
169
+
170
+ s_straight = score_from_range(ema_straight, 165, 185, 145, 195)
171
+ s_elbow = score_from_range(ema_elbow, 85, 175, 60, 190)
172
+ s_vis = clamp((ema_vis - MIN_VIS) / (0.85 - MIN_VIS))
173
+
174
+ frame_prob = clamp(0.15 + 0.45 * s_elbow + 0.30 * s_straight + 0.10 * s_vis)
175
+
176
+ trusted = (ema_vis is not None and ema_vis >= MIN_VIS)
177
+
178
+ if trusted:
179
+ if state in ["UNKNOWN", "UP"]:
180
+ if ema_elbow <= DOWN_ELBOW_DEG and frame_prob >= 0.45:
181
+ state = "DOWN"
182
+ if current_rep is None:
183
+ current_rep = {
184
+ "start_f": frame_idx,
185
+ "frame_probs": [],
186
+ "min_elbow": float(ema_elbow),
187
+ "min_straight": float(ema_straight),
188
+ }
189
+
190
+ elif state == "DOWN":
191
+ if ema_elbow >= UP_ELBOW_DEG and frame_prob >= 0.35:
192
+ end_f = frame_idx
193
+ if current_rep is not None:
194
+ duration_s = (end_f - current_rep["start_f"]) / fps
195
+ if duration_s >= MIN_REP_TIME_S:
196
+ rep_count += 1
197
+ probs = current_rep["frame_probs"] if current_rep["frame_probs"] else [frame_prob]
198
+ rep_prob = float(np.mean(probs))
199
+
200
+ rep_events.append({
201
+ "rep": rep_count,
202
+ "start_f": int(current_rep["start_f"]),
203
+ "end_f": int(end_f),
204
+ "start_t": float(current_rep["start_f"] / fps),
205
+ "end_t": float(end_f / fps),
206
+ "prob": float(rep_prob),
207
+ "min_elbow": float(current_rep["min_elbow"]),
208
+ "min_straight": float(current_rep["min_straight"]),
209
+ })
210
+ current_rep = None
211
+ state = "UP"
212
+
213
+ if current_rep is not None:
214
+ current_rep["frame_probs"].append(float(frame_prob))
215
+ current_rep["min_elbow"] = float(min(current_rep["min_elbow"], ema_elbow))
216
+ current_rep["min_straight"] = float(min(current_rep["min_straight"], ema_straight))
217
+
218
+ debug_txt = f"{'L' if left_side else 'R'} vis={ema_vis:.2f} elbow={ema_elbow:.0f} straight={ema_straight:.0f} p={frame_prob:.2f} state={state}"
219
+
220
+ # Simple overlay
221
+ cv2.putText(frame, f"Reps: {rep_count}", (20, 40),
222
+ cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA)
223
+ cv2.putText(frame, debug_txt[:90], (20, 75),
224
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA)
225
+
226
+ else:
227
+ cv2.putText(frame, f"Reps: {rep_count}", (20, 40),
228
+ cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA)
229
+ cv2.putText(frame, debug_txt[:90], (20, 75),
230
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA)
231
+
232
+ if writer is not None:
233
+ writer.write(frame)
234
+
235
+ except Exception as e:
236
+ cap.release()
237
+ if writer is not None:
238
+ writer.release()
239
+ pose.close()
240
+ return {
241
+ "ok": False,
242
+ "error": f"Runtime error: {type(e).__name__}: {e}",
243
+ "rep_count": rep_count,
244
+ "rep_events": rep_events,
245
+ "annotated_video_path": annotated_path,
246
+ }
247
+
248
+ cap.release()
249
+ if writer is not None:
250
+ writer.release()
251
+ pose.close()
252
+
253
+ return {
254
+ "ok": True,
255
+ "error": None,
256
+ "rep_count": rep_count,
257
+ "rep_events": rep_events,
258
+ "annotated_video_path": annotated_path,
259
+ }
260
+
261
+
262
+ # -----------------------
263
+ # Gradio wrapper
264
+ # -----------------------
265
+ def gradio_run(video_file):
266
+ # video_file is a temp path provided by Gradio
267
+ workdir = tempfile.mkdtemp()
268
+ in_path = os.path.join(workdir, "input.mp4")
269
+ shutil.copy(video_file, in_path)
270
+
271
+ out_path = os.path.join(workdir, "annotated.mp4")
272
+ result = analyze_pushup_video(in_path, save_annotated=True, annotated_out_path=out_path)
273
+
274
+ if not result["ok"]:
275
+ return "Error: " + str(result["error"]), None, []
276
+
277
+ summary = f"Rep count: {result['rep_count']}\n"
278
+ if result["rep_events"]:
279
+ avg_prob = sum(r["prob"] for r in result["rep_events"]) / len(result["rep_events"])
280
+ summary += f"Avg rep probability: {avg_prob:.2f}\n"
281
+
282
+ return summary, result["annotated_video_path"], result["rep_events"]
283
+
284
+
285
+ demo = gr.Interface(
286
+ fn=gradio_run,
287
+ inputs=gr.Video(label="Upload pushup video"),
288
+ outputs=[
289
+ gr.Textbox(label="Results"),
290
+ gr.Video(label="Annotated output"),
291
+ gr.JSON(label="Per-rep details"),
292
+ ],
293
+ title="Pushup Prototype",
294
+ description="Uploads a video, counts reps, and gives per-rep likelihood.",
295
+ )
296
+
297
+ if __name__ == "__main__":
298
+ demo.launch()