Spaces:
Running on Zero
Running on Zero
NLag
refactor: update artifact URLs and improve router device handling for better classification performance
c8f5fea | from __future__ import annotations | |
| import json | |
| from queue import Queue | |
| import re | |
| import shutil | |
| import sys | |
| import tempfile | |
| from pathlib import Path | |
| from threading import Thread | |
| from typing import Any | |
| import gradio as gr | |
| from fastapi import File, Form, HTTPException, UploadFile | |
| from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| sys.path.insert(0, str(Path(__file__).parent / "src")) | |
| from pozify.exercise_catalog import USER_SELECTABLE_EXERCISES | |
| from pozify.pipeline import run_pipeline | |
| BASE_DIR = Path(__file__).parent | |
| WEB_DIR = BASE_DIR / "web" | |
| RUNS_ROOT = BASE_DIR / "runs" | |
| APP_DESCRIPTION = ( | |
| "Upload a short workout clip, tune the athlete context, and generate an annotated " | |
| "form-review report with structured artifacts." | |
| ) | |
| server = gr.Server( | |
| title="Pozify", | |
| summary="Video-based workout form review API", | |
| description=APP_DESCRIPTION, | |
| ) | |
| server.mount("/static", StaticFiles(directory=WEB_DIR), name="static") | |
| def healthz() -> dict[str, str]: | |
| return {"status": "ok", "service": "pozify"} | |
| def index() -> FileResponse: | |
| return FileResponse(WEB_DIR / "index.html") | |
| def config() -> dict[str, Any]: | |
| return { | |
| "description": APP_DESCRIPTION, | |
| "goals": [ | |
| "strength", | |
| "hypertrophy", | |
| "endurance", | |
| "mobility", | |
| "beginner_practice", | |
| ], | |
| "experience_levels": ["beginner", "intermediate"], | |
| "exercises": ["auto", *USER_SELECTABLE_EXERCISES], | |
| "limitations": ["wrist_discomfort", "knee_discomfort", "shoulder_discomfort"], | |
| "equipment": ["bodyweight", "dumbbell", "barbell", "unknown"], | |
| } | |
| def artifact(run_id: str, filename: str) -> FileResponse: | |
| artifact_path = (RUNS_ROOT / run_id / filename).resolve() | |
| runs_root = RUNS_ROOT.resolve() | |
| if runs_root not in artifact_path.parents or not artifact_path.is_file(): | |
| raise HTTPException(status_code=404, detail="Artifact not found.") | |
| return FileResponse(artifact_path) | |
| def _artifact_url(run_id: str, path: str | None) -> str | None: | |
| if not path: | |
| return None | |
| if not isinstance(path, str): | |
| return None | |
| artifact_path = Path(path).resolve() | |
| run_root = (RUNS_ROOT / run_id).resolve() | |
| if run_root not in artifact_path.parents or not artifact_path.is_file(): | |
| return None | |
| return f"/api/artifacts/{run_id}/{artifact_path.name}" | |
| def _artifact_link(run_id: str, name: str, path: str | None) -> dict[str, str] | None: | |
| url = _artifact_url(run_id, path) | |
| if url is None: | |
| return None | |
| return {"name": name, "url": url} | |
| def _artifact_urls(result: dict[str, Any]) -> list[dict[str, str]]: | |
| run_id = result["run_id"] | |
| run_dir = Path(str(result["run_dir"])) | |
| links: list[dict[str, str]] = [] | |
| artifact_files = [ | |
| "final_report.json", | |
| "video_manifest.json", | |
| "pose_sequence.json", | |
| "exercise_classification.json", | |
| "reps.json", | |
| "rep_debug.json", | |
| "rep_analysis.json", | |
| "variation.json", | |
| "issue_markers.json", | |
| "coach_summary.json", | |
| "verification.json", | |
| "manifest.json", | |
| ] | |
| for filename in artifact_files: | |
| link = _artifact_link(run_id, filename, str(run_dir / filename)) | |
| if link is not None: | |
| links.append(link) | |
| video_link = _artifact_link( | |
| run_id, | |
| "annotated_video.mp4", | |
| result.get("annotated_video_path"), | |
| ) | |
| if video_link is not None: | |
| links.append(video_link) | |
| for thumbnail in result.get("issue_thumbnail_paths", []): | |
| if not isinstance(thumbnail, dict): | |
| continue | |
| path = thumbnail.get("path") | |
| issue = thumbnail.get("issue", "issue") | |
| rep_id = thumbnail.get("rep_id", "?") | |
| if not isinstance(path, str): | |
| continue | |
| link = _artifact_link(run_id, f"thumbnail_rep_{rep_id}_{issue}.jpg", path) | |
| if link is not None: | |
| links.append(link) | |
| for clip in result.get("issue_clip_paths", []): | |
| if not isinstance(clip, dict): | |
| continue | |
| path = clip.get("path") | |
| issue = clip.get("issue", "issue") | |
| rep_id = clip.get("rep_id", "?") | |
| if not isinstance(path, str): | |
| continue | |
| link = _artifact_link(run_id, f"clip_rep_{rep_id}_{issue}.mp4", path) | |
| if link is not None: | |
| links.append(link) | |
| return links | |
| def _parse_limitations(limitations: str) -> list[str]: | |
| try: | |
| parsed_limitations = json.loads(limitations) | |
| except json.JSONDecodeError as exc: | |
| raise HTTPException( | |
| status_code=400, detail="Limitations must be valid JSON." | |
| ) from exc | |
| if not isinstance(parsed_limitations, list) or not all( | |
| isinstance(item, str) for item in parsed_limitations | |
| ): | |
| raise HTTPException( | |
| status_code=400, detail="Limitations must be a JSON list of strings." | |
| ) | |
| return parsed_limitations | |
| def _profile_input( | |
| *, | |
| goal: str, | |
| experience_level: str, | |
| intended_exercise: str, | |
| intended_variation: str, | |
| limitations: str, | |
| equipment: str, | |
| ) -> dict[str, Any]: | |
| return { | |
| "goal": goal, | |
| "experience_level": experience_level, | |
| "intended_exercise": intended_exercise, | |
| "intended_variation": intended_variation or None, | |
| "known_limitations": _parse_limitations(limitations), | |
| "equipment": equipment, | |
| } | |
| def _parse_bool_form(value: str) -> bool: | |
| return value.strip().lower() in {"1", "true", "yes", "on"} | |
| def _safe_upload_stem(filename: str) -> str: | |
| stem = Path(filename).stem or "upload" | |
| safe_stem = re.sub(r"[^A-Za-z0-9_.-]+", "-", stem).strip(".-_") | |
| return safe_stem[:48] or "upload" | |
| def _run_analysis_pipeline( | |
| video_path: str | None, | |
| profile_input: dict[str, Any], | |
| bypass_verifier: bool = True, | |
| progress: Any | None = None, | |
| ) -> dict[str, Any]: | |
| return run_pipeline( | |
| video_path=video_path, | |
| profile_input=profile_input, | |
| bypass_verifier=bypass_verifier, | |
| progress=progress, | |
| ) | |
| async def _save_upload(video: UploadFile | None) -> str | None: | |
| video_path: str | None = None | |
| if video is not None and video.filename: | |
| suffix = Path(video.filename).suffix or ".mp4" | |
| prefix = f"pozify-{_safe_upload_stem(video.filename)}-" | |
| with tempfile.NamedTemporaryFile( | |
| delete=False, | |
| prefix=prefix, | |
| suffix=suffix, | |
| ) as temp_video: | |
| shutil.copyfileobj(video.file, temp_video) | |
| video_path = temp_video.name | |
| return video_path | |
| def _analysis_response(result: dict[str, Any]) -> dict[str, Any]: | |
| issue_thumbnail_urls = [] | |
| for thumbnail in result.get("issue_thumbnail_paths", []): | |
| if not isinstance(thumbnail, dict): | |
| continue | |
| path = thumbnail.get("path") | |
| if not isinstance(path, str): | |
| continue | |
| url = _artifact_url(result["run_id"], path) | |
| if url is not None: | |
| issue_thumbnail_urls.append({**thumbnail, "url": url}) | |
| issue_clip_urls = [] | |
| for clip in result.get("issue_clip_paths", []): | |
| if not isinstance(clip, dict): | |
| continue | |
| path = clip.get("path") | |
| if not isinstance(path, str): | |
| continue | |
| url = _artifact_url(result["run_id"], path) | |
| if url is not None: | |
| issue_clip_urls.append({**clip, "url": url}) | |
| return { | |
| "run_id": result["run_id"], | |
| "run_dir": result["run_dir"], | |
| "annotated_video_url": _artifact_url( | |
| result["run_id"], result["annotated_video_path"] | |
| ), | |
| "issue_thumbnail_urls": issue_thumbnail_urls, | |
| "issue_clip_urls": issue_clip_urls, | |
| "artifact_urls": _artifact_urls(result), | |
| "final_report_url": f"/api/artifacts/{result['run_id']}/final_report.json", | |
| "report": result["final_report"], | |
| } | |
| async def analyze_api( | |
| video: UploadFile | None = File(default=None), | |
| goal: str = Form(default="beginner_practice"), | |
| experience_level: str = Form(default="beginner"), | |
| intended_exercise: str = Form(default="auto"), | |
| intended_variation: str = Form(default=""), | |
| limitations: str = Form(default="[]"), | |
| equipment: str = Form(default="bodyweight"), | |
| bypass_verifier: str = Form(default="true"), | |
| ) -> dict[str, Any]: | |
| profile = _profile_input( | |
| goal=goal, | |
| experience_level=experience_level, | |
| intended_exercise=intended_exercise, | |
| intended_variation=intended_variation, | |
| limitations=limitations, | |
| equipment=equipment, | |
| ) | |
| video_path = await _save_upload(video) | |
| try: | |
| result = _run_analysis_pipeline( | |
| video_path, | |
| profile, | |
| _parse_bool_form(bypass_verifier), | |
| ) | |
| finally: | |
| if video_path is not None: | |
| Path(video_path).unlink(missing_ok=True) | |
| return _analysis_response(result) | |
| async def analyze_stream_api( | |
| video: UploadFile | None = File(default=None), | |
| goal: str = Form(default="beginner_practice"), | |
| experience_level: str = Form(default="beginner"), | |
| intended_exercise: str = Form(default="auto"), | |
| intended_variation: str = Form(default=""), | |
| limitations: str = Form(default="[]"), | |
| equipment: str = Form(default="bodyweight"), | |
| bypass_verifier: str = Form(default="true"), | |
| ) -> StreamingResponse: | |
| profile = _profile_input( | |
| goal=goal, | |
| experience_level=experience_level, | |
| intended_exercise=intended_exercise, | |
| intended_variation=intended_variation, | |
| limitations=limitations, | |
| equipment=equipment, | |
| ) | |
| video_path = await _save_upload(video) | |
| events: Queue[dict[str, Any] | None] = Queue() | |
| def worker() -> None: | |
| try: | |
| result = _run_analysis_pipeline( | |
| video_path, | |
| profile, | |
| _parse_bool_form(bypass_verifier), | |
| events.put, | |
| ) | |
| events.put({"type": "complete", "result": _analysis_response(result)}) | |
| except Exception as exc: # pragma: no cover - surfaced to browser clients | |
| events.put({"type": "error", "detail": str(exc)}) | |
| finally: | |
| if video_path is not None: | |
| Path(video_path).unlink(missing_ok=True) | |
| events.put(None) | |
| def event_stream() -> Any: | |
| thread = Thread(target=worker, daemon=True) | |
| thread.start() | |
| while True: | |
| event = events.get() | |
| if event is None: | |
| break | |
| yield f"{json.dumps(event)}\n" | |
| thread.join(timeout=1) | |
| return StreamingResponse( | |
| event_stream(), | |
| media_type="application/x-ndjson", | |
| headers={"Cache-Control": "no-cache"}, | |
| ) | |
| if __name__ == "__main__": | |
| server.launch(_frontend=False) | |