Pozify / app.py
NLag
refactor: update artifact URLs and improve router device handling for better classification performance
c8f5fea
Raw
History Blame Contribute Delete
11.2 kB
from __future__ import annotations
import json
from queue import Queue
import re
import shutil
import sys
import tempfile
from pathlib import Path
from threading import Thread
from typing import Any
import gradio as gr
from fastapi import File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
sys.path.insert(0, str(Path(__file__).parent / "src"))
from pozify.exercise_catalog import USER_SELECTABLE_EXERCISES
from pozify.pipeline import run_pipeline
BASE_DIR = Path(__file__).parent
WEB_DIR = BASE_DIR / "web"
RUNS_ROOT = BASE_DIR / "runs"
APP_DESCRIPTION = (
"Upload a short workout clip, tune the athlete context, and generate an annotated "
"form-review report with structured artifacts."
)
server = gr.Server(
title="Pozify",
summary="Video-based workout form review API",
description=APP_DESCRIPTION,
)
server.mount("/static", StaticFiles(directory=WEB_DIR), name="static")
@server.get("/healthz", include_in_schema=False)
def healthz() -> dict[str, str]:
return {"status": "ok", "service": "pozify"}
@server.get("/", response_class=HTMLResponse, include_in_schema=False)
def index() -> FileResponse:
return FileResponse(WEB_DIR / "index.html")
@server.get("/api/config")
def config() -> dict[str, Any]:
return {
"description": APP_DESCRIPTION,
"goals": [
"strength",
"hypertrophy",
"endurance",
"mobility",
"beginner_practice",
],
"experience_levels": ["beginner", "intermediate"],
"exercises": ["auto", *USER_SELECTABLE_EXERCISES],
"limitations": ["wrist_discomfort", "knee_discomfort", "shoulder_discomfort"],
"equipment": ["bodyweight", "dumbbell", "barbell", "unknown"],
}
@server.get("/api/artifacts/{run_id}/{filename}", include_in_schema=False)
def artifact(run_id: str, filename: str) -> FileResponse:
artifact_path = (RUNS_ROOT / run_id / filename).resolve()
runs_root = RUNS_ROOT.resolve()
if runs_root not in artifact_path.parents or not artifact_path.is_file():
raise HTTPException(status_code=404, detail="Artifact not found.")
return FileResponse(artifact_path)
def _artifact_url(run_id: str, path: str | None) -> str | None:
if not path:
return None
if not isinstance(path, str):
return None
artifact_path = Path(path).resolve()
run_root = (RUNS_ROOT / run_id).resolve()
if run_root not in artifact_path.parents or not artifact_path.is_file():
return None
return f"/api/artifacts/{run_id}/{artifact_path.name}"
def _artifact_link(run_id: str, name: str, path: str | None) -> dict[str, str] | None:
url = _artifact_url(run_id, path)
if url is None:
return None
return {"name": name, "url": url}
def _artifact_urls(result: dict[str, Any]) -> list[dict[str, str]]:
run_id = result["run_id"]
run_dir = Path(str(result["run_dir"]))
links: list[dict[str, str]] = []
artifact_files = [
"final_report.json",
"video_manifest.json",
"pose_sequence.json",
"exercise_classification.json",
"reps.json",
"rep_debug.json",
"rep_analysis.json",
"variation.json",
"issue_markers.json",
"coach_summary.json",
"verification.json",
"manifest.json",
]
for filename in artifact_files:
link = _artifact_link(run_id, filename, str(run_dir / filename))
if link is not None:
links.append(link)
video_link = _artifact_link(
run_id,
"annotated_video.mp4",
result.get("annotated_video_path"),
)
if video_link is not None:
links.append(video_link)
for thumbnail in result.get("issue_thumbnail_paths", []):
if not isinstance(thumbnail, dict):
continue
path = thumbnail.get("path")
issue = thumbnail.get("issue", "issue")
rep_id = thumbnail.get("rep_id", "?")
if not isinstance(path, str):
continue
link = _artifact_link(run_id, f"thumbnail_rep_{rep_id}_{issue}.jpg", path)
if link is not None:
links.append(link)
for clip in result.get("issue_clip_paths", []):
if not isinstance(clip, dict):
continue
path = clip.get("path")
issue = clip.get("issue", "issue")
rep_id = clip.get("rep_id", "?")
if not isinstance(path, str):
continue
link = _artifact_link(run_id, f"clip_rep_{rep_id}_{issue}.mp4", path)
if link is not None:
links.append(link)
return links
def _parse_limitations(limitations: str) -> list[str]:
try:
parsed_limitations = json.loads(limitations)
except json.JSONDecodeError as exc:
raise HTTPException(
status_code=400, detail="Limitations must be valid JSON."
) from exc
if not isinstance(parsed_limitations, list) or not all(
isinstance(item, str) for item in parsed_limitations
):
raise HTTPException(
status_code=400, detail="Limitations must be a JSON list of strings."
)
return parsed_limitations
def _profile_input(
*,
goal: str,
experience_level: str,
intended_exercise: str,
intended_variation: str,
limitations: str,
equipment: str,
) -> dict[str, Any]:
return {
"goal": goal,
"experience_level": experience_level,
"intended_exercise": intended_exercise,
"intended_variation": intended_variation or None,
"known_limitations": _parse_limitations(limitations),
"equipment": equipment,
}
def _parse_bool_form(value: str) -> bool:
return value.strip().lower() in {"1", "true", "yes", "on"}
def _safe_upload_stem(filename: str) -> str:
stem = Path(filename).stem or "upload"
safe_stem = re.sub(r"[^A-Za-z0-9_.-]+", "-", stem).strip(".-_")
return safe_stem[:48] or "upload"
def _run_analysis_pipeline(
video_path: str | None,
profile_input: dict[str, Any],
bypass_verifier: bool = True,
progress: Any | None = None,
) -> dict[str, Any]:
return run_pipeline(
video_path=video_path,
profile_input=profile_input,
bypass_verifier=bypass_verifier,
progress=progress,
)
async def _save_upload(video: UploadFile | None) -> str | None:
video_path: str | None = None
if video is not None and video.filename:
suffix = Path(video.filename).suffix or ".mp4"
prefix = f"pozify-{_safe_upload_stem(video.filename)}-"
with tempfile.NamedTemporaryFile(
delete=False,
prefix=prefix,
suffix=suffix,
) as temp_video:
shutil.copyfileobj(video.file, temp_video)
video_path = temp_video.name
return video_path
def _analysis_response(result: dict[str, Any]) -> dict[str, Any]:
issue_thumbnail_urls = []
for thumbnail in result.get("issue_thumbnail_paths", []):
if not isinstance(thumbnail, dict):
continue
path = thumbnail.get("path")
if not isinstance(path, str):
continue
url = _artifact_url(result["run_id"], path)
if url is not None:
issue_thumbnail_urls.append({**thumbnail, "url": url})
issue_clip_urls = []
for clip in result.get("issue_clip_paths", []):
if not isinstance(clip, dict):
continue
path = clip.get("path")
if not isinstance(path, str):
continue
url = _artifact_url(result["run_id"], path)
if url is not None:
issue_clip_urls.append({**clip, "url": url})
return {
"run_id": result["run_id"],
"run_dir": result["run_dir"],
"annotated_video_url": _artifact_url(
result["run_id"], result["annotated_video_path"]
),
"issue_thumbnail_urls": issue_thumbnail_urls,
"issue_clip_urls": issue_clip_urls,
"artifact_urls": _artifact_urls(result),
"final_report_url": f"/api/artifacts/{result['run_id']}/final_report.json",
"report": result["final_report"],
}
@server.post("/api/analyze")
async def analyze_api(
video: UploadFile | None = File(default=None),
goal: str = Form(default="beginner_practice"),
experience_level: str = Form(default="beginner"),
intended_exercise: str = Form(default="auto"),
intended_variation: str = Form(default=""),
limitations: str = Form(default="[]"),
equipment: str = Form(default="bodyweight"),
bypass_verifier: str = Form(default="true"),
) -> dict[str, Any]:
profile = _profile_input(
goal=goal,
experience_level=experience_level,
intended_exercise=intended_exercise,
intended_variation=intended_variation,
limitations=limitations,
equipment=equipment,
)
video_path = await _save_upload(video)
try:
result = _run_analysis_pipeline(
video_path,
profile,
_parse_bool_form(bypass_verifier),
)
finally:
if video_path is not None:
Path(video_path).unlink(missing_ok=True)
return _analysis_response(result)
@server.post("/api/analyze/stream")
async def analyze_stream_api(
video: UploadFile | None = File(default=None),
goal: str = Form(default="beginner_practice"),
experience_level: str = Form(default="beginner"),
intended_exercise: str = Form(default="auto"),
intended_variation: str = Form(default=""),
limitations: str = Form(default="[]"),
equipment: str = Form(default="bodyweight"),
bypass_verifier: str = Form(default="true"),
) -> StreamingResponse:
profile = _profile_input(
goal=goal,
experience_level=experience_level,
intended_exercise=intended_exercise,
intended_variation=intended_variation,
limitations=limitations,
equipment=equipment,
)
video_path = await _save_upload(video)
events: Queue[dict[str, Any] | None] = Queue()
def worker() -> None:
try:
result = _run_analysis_pipeline(
video_path,
profile,
_parse_bool_form(bypass_verifier),
events.put,
)
events.put({"type": "complete", "result": _analysis_response(result)})
except Exception as exc: # pragma: no cover - surfaced to browser clients
events.put({"type": "error", "detail": str(exc)})
finally:
if video_path is not None:
Path(video_path).unlink(missing_ok=True)
events.put(None)
def event_stream() -> Any:
thread = Thread(target=worker, daemon=True)
thread.start()
while True:
event = events.get()
if event is None:
break
yield f"{json.dumps(event)}\n"
thread.join(timeout=1)
return StreamingResponse(
event_stream(),
media_type="application/x-ndjson",
headers={"Cache-Control": "no-cache"},
)
if __name__ == "__main__":
server.launch(_frontend=False)