File size: 12,273 Bytes
3d2ecec
a8034fd
 
 
 
3d2ecec
 
a8034fd
 
637375e
 
 
a8034fd
010ddfd
a8034fd
cf45b0f
 
010ddfd
 
 
 
 
 
 
 
bca95c6
 
 
 
a8034fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
010ddfd
a8034fd
 
 
 
 
 
 
 
 
 
 
 
010ddfd
 
a8034fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d2ecec
 
 
a8034fd
d7f9742
 
a8034fd
 
 
010ddfd
 
 
 
bca95c6
010ddfd
 
 
bca95c6
 
 
010ddfd
 
 
a22abd7
010ddfd
 
a8034fd
 
 
 
 
 
 
 
 
010ddfd
a8034fd
 
 
a22abd7
a8034fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d2ecec
a8034fd
3d2ecec
a8034fd
 
 
 
 
 
 
010ddfd
a8034fd
 
 
 
 
 
a22abd7
bca95c6
 
 
 
 
a8034fd
 
a22abd7
a8034fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bca95c6
 
 
 
 
a8034fd
010ddfd
a8034fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
010ddfd
a8034fd
 
 
 
3d2ecec
a8034fd
 
 
 
 
 
 
3d2ecec
a8034fd
 
aa938e5
a8034fd
 
 
 
d14fed8
a8034fd
d14fed8
a8034fd
d14fed8
a8034fd
 
 
 
 
 
 
 
d7f9742
 
a8034fd
 
 
 
010ddfd
 
 
 
 
 
 
d7f9742
637375e
a8034fd
637375e
a8034fd
 
010ddfd
a8034fd
c75fd7e
a8034fd
 
 
3d2ecec
 
c75fd7e
d14fed8
6d3e171
 
 
 
c75fd7e
6d3e171
 
c75fd7e
6d3e171
c75fd7e
6d3e171
 
 
a8034fd
92c7c48
3259094
3d2ecec
a8034fd
 
 
 
 
3259094
637375e
a8034fd
 
 
 
 
637375e
010ddfd
1b733bd
d14fed8
6d3e171
c75fd7e
 
a8034fd
c75fd7e
 
 
 
a8034fd
92c7c48
c75fd7e
3d2ecec
c75fd7e
a22abd7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
import os
import math
import shutil
import tempfile

import cv2
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
import gradio as gr


# ----------------------------
# Settings (optimized for speed, still robust)
# ----------------------------
UP_ANGLE = 125
DOWN_ANGLE = 90

# Aggressive sampling target (effective inference rate)
# 6 fps usually gives ~5x fewer YOLO calls on 30fps videos.
TARGET_FPS = 6.0

# Minimum rep duration in seconds (keeps behavior stable when stride changes)
MIN_REP_SECONDS = 0.33

# NEW (from our efficient logic): Maximum rep duration in seconds
# Prevents very long false reps when tracking fails.
MAX_REP_SECONDS = 8.0


# ----------------------------
# Load YOLO pose model (lazy)
# ----------------------------
_MODEL = None

def load_pose_model():
    global _MODEL
    if _MODEL is not None:
        return _MODEL

    from ultralytics import YOLO
    last_err = None
    for w in ["yolo11n-pose.pt", "yolov8n-pose.pt"]:
        try:
            _MODEL = YOLO(w)
            print("Loaded model:", w)
            return _MODEL
        except Exception as e:
            last_err = e

    raise RuntimeError(f"Could not load YOLO pose model. Last error: {last_err}")


# ----------------------------
# Helpers
# ----------------------------
def angle_deg(a, b, c):
    a = np.asarray(a, dtype=np.float32)
    b = np.asarray(b, dtype=np.float32)
    c = np.asarray(c, dtype=np.float32)
    ba = a - b
    bc = c - b
    denom = (np.linalg.norm(ba) * np.linalg.norm(bc)) + 1e-9
    cosv = np.clip(np.dot(ba, bc) / denom, -1.0, 1.0)
    return float(math.degrees(math.acos(cosv)))

def pick_best_side(kxy, kconf):
    left = [5, 7, 9]    # L shoulder, L elbow, L wrist (YOLO COCO indices)
    right = [6, 8, 10]  # R shoulder, R elbow, R wrist
    if float(np.mean(kconf[right])) >= float(np.mean(kconf[left])):
        return right, float(np.mean(kconf[right]))
    return left, float(np.mean(kconf[left]))

def sigmoid(x):
    return 1.0 / (1.0 + math.exp(-x))

def rep_likelihood(min_ang, max_ang, mean_conf):
    ang_range = max_ang - min_ang
    range_score   = sigmoid((ang_range - 45) / 10)
    depth_score   = sigmoid((DOWN_ANGLE - min_ang) / 8)
    lockout_score = sigmoid((max_ang - UP_ANGLE) / 8)
    conf_score    = float(np.clip(mean_conf, 0.0, 1.0))
    return float(np.clip(range_score * depth_score * lockout_score * conf_score, 0.0, 1.0))

def likelihood_to_score(p):
    p = float(np.clip(p, 0.0, 1.0))
    buckets = [
        (0.50, 1.00, 90, 100),
        (0.45, 0.50, 80, 89),
        (0.40, 0.45, 70, 79),
        (0.35, 0.40, 60, 69),
        (0.30, 0.35, 50, 59),
        (0.25, 0.30, 40, 49),
        (0.20, 0.25, 30, 39),
        (0.15, 0.20, 20, 29),
        (0.10, 0.15, 10, 19),
        (0.00, 0.10, 0, 9),
    ]
    for lo, hi, s_lo, s_hi in buckets:
        if (lo <= p < hi) or (p == 1.0 and hi == 1.0):
            t = (p - lo) / max(hi - lo, 1e-6)
            return int(round(s_lo + t * (s_hi - s_lo)))
    return 0


# ----------------------------
# Core pipeline
# ----------------------------
def analyze_pushup_video_yolo(video_path: str, out_dir: str):
    model = load_pose_model()

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError("OpenCV could not open the video. Try a different mp4 encoding.")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 0
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 0

    # Compute stride to hit TARGET_FPS (effective inference rate)
    frame_stride = max(1, int(round(float(fps) / float(TARGET_FPS))))
    effective_fps = float(fps) / float(frame_stride)

    # Convert time-based rep duration limits to sampled frames (matches our efficient logic)
    min_rep_frames = int(math.ceil(MIN_REP_SECONDS * effective_fps))
    min_rep_frames = max(2, min_rep_frames)

    max_rep_frames = int(math.ceil(MAX_REP_SECONDS * effective_fps))
    max_rep_frames = max(min_rep_frames + 2, max_rep_frames)

    print(
        f"[speed] video_fps={fps:.2f} target_fps={TARGET_FPS:.2f} "
        f"stride={frame_stride} effective_fps={effective_fps:.2f} "
        f"min_rep_frames={min_rep_frames} max_rep_frames={max_rep_frames}"
    )

    # 1) First pass: compute angles + confs per sampled frame
    angles, confs, frame_ids = [], [], []
    frame_i = 0

    while True:
        ok, frame = cap.read()
        if not ok:
            break

        if frame_i % frame_stride != 0:
            frame_i += 1
            continue

        res = model(frame, verbose=False)[0]
        if res.keypoints is None or len(res.keypoints.xy) == 0:
            angles.append(np.nan)
            confs.append(0.0)
            frame_ids.append(frame_i)
            frame_i += 1
            continue

        kxy_all = res.keypoints.xy.cpu().numpy()
        kconf_all = res.keypoints.conf.cpu().numpy()

        # choose best person by mean confidence
        pidx = int(np.argmax(np.mean(kconf_all, axis=1)))
        kxy = kxy_all[pidx]
        kconf = kconf_all[pidx]

        ids, side_conf = pick_best_side(kxy, kconf)
        if side_conf < 0.2:
            angles.append(np.nan)
            confs.append(float(side_conf))
            frame_ids.append(frame_i)
            frame_i += 1
            continue

        a, b, c = kxy[ids[0]], kxy[ids[1]], kxy[ids[2]]
        angles.append(angle_deg(a, b, c))
        confs.append(float(side_conf))
        frame_ids.append(frame_i)
        frame_i += 1

    cap.release()

    angles = np.array(angles, dtype=np.float32)
    confs = np.array(confs, dtype=np.float32)
    frame_ids = np.array(frame_ids, dtype=np.int32)

    if len(angles) < 5:
        raise RuntimeError("Video too short or no usable frames detected.")

    # Interpolate missing angles
    mask = np.isfinite(angles)
    if np.any(mask) and not np.all(mask):
        angles[~mask] = np.interp(frame_ids[~mask], frame_ids[mask], angles[mask])
    elif not np.any(mask):
        raise RuntimeError("No valid pose angles detected.")

    # Smooth (match our efficient logic: ~1 second window scaled by effective_fps)
    win = int(round(effective_fps * 1.0))
    win = max(5, win)
    if win % 2 == 0:
        win += 1
    win = min(win, (len(angles) // 2) * 2 + 1)
    angles_smooth = savgol_filter(angles, win, 2)

    # 2) Rep detection on smoothed angles (match our efficient logic)
    reps = []
    state = "WAIT_DOWN"
    rep_min = rep_max = rep_conf_sum = rep_len = rep_start = None

    for i, ang in enumerate(angles_smooth):
        cf = float(confs[i])

        if state == "WAIT_DOWN":
            if ang <= DOWN_ANGLE:
                state = "IN_DOWN"
                rep_min = rep_max = float(ang)
                rep_conf_sum = cf
                rep_len = 1
                rep_start = i
        else:
            rep_min = min(rep_min, float(ang))
            rep_max = max(rep_max, float(ang))
            rep_conf_sum += cf
            rep_len += 1

            # Abort absurdly long reps (tracking failure / stall)
            if rep_len > max_rep_frames:
                state = "WAIT_DOWN"
                continue

            if ang >= UP_ANGLE:
                if rep_len >= min_rep_frames:
                    mean_cf = float(rep_conf_sum / rep_len)
                    likelihood = rep_likelihood(rep_min, rep_max, mean_cf)
                    score = likelihood_to_score(likelihood)

                    sf = int(frame_ids[rep_start])
                    ef = int(frame_ids[i])

                    reps.append({
                        "rep": len(reps) + 1,
                        "start_frame": sf,
                        "end_frame": ef,
                        "start_time_s": float(sf / fps),
                        "end_time_s": float(ef / fps),
                        "min_elbow_angle": float(rep_min),
                        "max_elbow_angle": float(rep_max),
                        "mean_kpt_conf": float(mean_cf),
                        "pushup_likelihood": float(likelihood),
                        "pushup_score": int(score),
                    })

                state = "WAIT_DOWN"

    # 3) Save CSV
    csv_path = os.path.join(out_dir, "pushup_reps.csv")
    df = pd.DataFrame(reps)
    df.to_csv(csv_path, index=False)

    # 4) Annotated video (kept original resolution)
    annotated_path = os.path.join(out_dir, "pushup_annotated.mp4")
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(annotated_path, fourcc, fps, (w, h))

    rep_windows = [(r["start_frame"], r["end_frame"], r["pushup_score"]) for r in reps]

    frame_i = 0
    while True:
        ok, frame = cap.read()
        if not ok:
            break

        active = next((s for sf, ef, s in rep_windows if sf <= frame_i <= ef), None)
        count = sum(1 for _, ef, _ in rep_windows if ef < frame_i)

        j = int(min(np.searchsorted(frame_ids, frame_i), len(angles_smooth) - 1))
        ang_disp = float(angles_smooth[j])

        cv2.putText(frame, f"Reps: {count}/{len(reps)}", (20, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,255), 2)
        cv2.putText(frame, f"Elbow angle: {ang_disp:.1f}", (20, 80),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
        cv2.putText(frame, f"Rep score: {active if active is not None else '-'}", (20, 120),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)

        writer.write(frame)
        frame_i += 1

    cap.release()
    writer.release()

    summary = {
        "ok": True,
        "error": None,
        "rep_count": int(len(reps)),
        "avg_score": int(round(float(np.mean([r["pushup_score"] for r in reps])))) if reps else 0,
        "avg_likelihood": float(np.mean([r["pushup_likelihood"] for r in reps])) if reps else 0.0,
        "rep_events": reps,
        "speed_settings": {
            "video_fps": float(fps),
            "target_fps": float(TARGET_FPS),
            "frame_stride": int(frame_stride),
            "effective_fps": float(effective_fps),
            "min_rep_frames": int(min_rep_frames),
        }
    }

    return summary, annotated_path, csv_path


# ----------------------------
# API wrapper
# ----------------------------
def api_analyze(uploaded_file):
    if uploaded_file is None:
        return {"ok": False, "error": "No file received.", "rep_count": 0, "rep_events": []}, None, None

    workdir = tempfile.mkdtemp()
    in_path = os.path.join(workdir, "input.mp4")

    # Resolve source path robustly
    src_path = None
    if hasattr(uploaded_file, "path") and uploaded_file.path:
        src_path = uploaded_file.path
    elif isinstance(uploaded_file, dict) and uploaded_file.get("path"):
        src_path = uploaded_file["path"]
    elif hasattr(uploaded_file, "name") and uploaded_file.name:
        src_path = uploaded_file.name
    else:
        src_path = str(uploaded_file)

    ext = os.path.splitext(src_path)[1].lower()
    allowed = {".mp4", ".mov", ".webm", ".mkv"}
    if ext and ext not in allowed:
        return {"ok": False, "error": f"Unsupported extension: {ext}. Use mp4/mov/webm/mkv.", "rep_count": 0, "rep_events": []}, None, None

    shutil.copy(src_path, in_path)

    try:
        summary, annotated_path, csv_path = analyze_pushup_video_yolo(in_path, out_dir=workdir)
        return summary, annotated_path, csv_path
    except Exception as e:
        return {"ok": False, "error": f"{type(e).__name__}: {e}", "rep_count": 0, "rep_events": []}, None, None


# ----------------------------
# Gradio UI + API endpoint
# ----------------------------
with gr.Blocks(title="Pushup API (YOLO)") as demo:
    gr.Markdown("# Pushup Analyzer API (YOLO)\nUpload a video, get rep scores + CSV + annotated video.\n")

    # Keep gr.File to avoid Invalid file type issues
    video_file = gr.File(label="Upload video")

    btn = gr.Button("Analyze")
    out_json = gr.JSON(label="Results JSON")
    out_video = gr.Video(label="Annotated Output")
    out_csv = gr.File(label="CSV Output")

    btn.click(
        fn=api_analyze,
        inputs=[video_file],
        outputs=[out_json, out_video, out_csv],
        api_name="analyze",
    )

if __name__ == "__main__":
    demo.launch()