BakoAI / personal_analysis /pipeline.py
Okidi Norbert
Fix: resolve _BACKEND_ROOT NameError in pipeline
380e126
"""
Personal Analysis Pipeline Service.
Wraps the swiss basketball shot analysis algorithm and exposes
a single async-friendly entry point: run_personal_analysis().
This is completely separate from the team YOLO analysis pipeline.
"""
import os
import cv2
import uuid
import asyncio
import logging
import traceback
import math
from typing import List, Optional, Dict, Any
from concurrent.futures import ThreadPoolExecutor
import sys
logger = logging.getLogger("personal_analysis")
print(f"PIPELINE LOADED FROM: {__file__}", file=sys.stderr)
print(f"DEBUG: CWD = {os.getcwd()}", file=sys.stderr)
# Thread pool for CPU-bound analysis work (keeps FastAPI event loop free)
_executor = ThreadPoolExecutor(max_workers=2)
from app.config import get_settings
from app.api.videos import get_video_info
# ── Model paths ──────────────────────────────────────────────────────────────
settings = get_settings()
def _resolve_model_path(path: str) -> str:
"""
Resolve a model path relative to the backend root.
β€’ Absolute paths are used as-is.
β€’ Relative paths are anchored to the directory containing 'app/'.
"""
if os.path.isabs(path):
return path
# Anchor to the backend root (parent of 'personal_analysis/')
_BACKEND_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
candidate = os.path.abspath(os.path.join(_BACKEND_ROOT, path))
if os.path.exists(candidate):
return candidate
# --- DIAGNOSTIC LOGGING ---
logger.error(f"❌ Model file MISSING: {candidate}")
logger.info(f"DEBUG: Current Working Directory: {os.getcwd()}")
logger.info(f"DEBUG: Backend Root: {_BACKEND_ROOT}")
models_dir = os.path.join(_BACKEND_ROOT, "models")
if os.path.exists(models_dir):
try:
files = os.listdir(models_dir)
logger.info(f"DEBUG: Contents of {models_dir}: {files}")
except Exception as e:
logger.error(f"DEBUG: Could not list {models_dir}: {e}")
else:
logger.error(f"DEBUG: Directory {models_dir} does not even exist!")
# Search for the file anywhere in /home/user/app
logger.info(f"DEBUG: Searching for {os.path.basename(path)} in /home/user/app...")
found_paths = []
for root, dirs, files in os.walk("/home/user/app"):
if os.path.basename(path) in files:
found_paths.append(os.path.join(root, os.path.basename(path)))
if found_paths:
logger.info(f"βœ… FOUND at alternative locations: {found_paths}")
return found_paths[0]
# ---------------------------
# Robust fallback: check if we are in a 'DON'T TOUCH' mangled path environment
# but only if the candidate doesn't exist.
if "OKIDI-DONT TOUCH" in candidate:
healed = candidate.replace("OKIDI-DONT TOUCH", "OKIDI-DON'T TOUCH")
if os.path.exists(healed):
return healed
logger.warning(f"Model file not found at {candidate!r} β€” ensure 'python download_models.py' has been run.")
return candidate
BALL_RIM_MODEL = _resolve_model_path(settings.swish_ball_rim_model)
POSE_MODEL = _resolve_model_path(settings.swish_pose_model)
print(f"DEBUG: FINAL BALL_RIM_MODEL: {BALL_RIM_MODEL}", file=sys.stderr)
print(f"DEBUG: FINAL POSE_MODEL: {POSE_MODEL}", file=sys.stderr)
def _read_video(path: str):
"""Read video frames using OpenCV. Returns (frames, fps)."""
cap = cv2.VideoCapture(path)
if not cap.isOpened():
raise IOError(f"Cannot open video: {path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 30
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
cap.release()
return frames, fps
def _write_video(frames: list, out_path: str, fps: float = 30.0):
"""Write annotated frames to a browser-compatible MP4 file."""
if not frames:
return
h, w = frames[0].shape[:2]
# Primary choice: mp4v (widely supported software codec)
# We then transcode to H.264 (libx264) using FFmpeg for the browser.
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
# Fallback to XVID if mp4v fails (rare)
if not writer.isOpened():
logger.warning(f"VideoWriter failed with {fourcc}. Trying XVID fallback.")
fourcc = cv2.VideoWriter_fourcc(*"XVID")
writer = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
if not writer.isOpened():
logger.error(f"❌ CRITICAL: Failed to open VideoWriter with path: {out_path}")
return
for frame in frames:
writer.write(frame)
writer.release()
# Optional: re-encode with ffmpeg for a proper web-ready H.264+AAC stream
# (ffmpeg ensures the moov atom is at the front for fast browser starts)
try:
import subprocess
tmp_path = out_path.replace(".mp4", "_tmp.mp4")
os.rename(out_path, tmp_path)
result = subprocess.run(
["ffmpeg", "-y", "-i", tmp_path,
"-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2",
"-c:v", "libx264", "-preset", "fast", "-movflags", "+faststart",
"-pix_fmt", "yuv420p", "-an", out_path],
capture_output=True, timeout=300
)
if result.returncode == 0:
os.remove(tmp_path)
else:
# ffmpeg failed β€” restore original mp4v file
os.rename(tmp_path, out_path)
except Exception:
# ffmpeg not available or failed β€” mp4v file is still usable
pass
def _run_pipeline_sync(video_path: str, output_dir: str, job_id: str, shooting_arm: str = "right") -> dict:
"""
Synchronous (blocking) pipeline β€” runs in a thread pool.
Returns a structured results dict.
"""
from personal_analysis.trackers.ball_tracker import BallTracker
from personal_analysis.trackers.rim_tracker import RimTracker
from personal_analysis.trackers.human_tracker import HumanTracker
from personal_analysis.drawers.shot_tracker import ShotTracker
from personal_analysis.drawers.human_tracks_drawer import HumanTracksDrawer
from personal_analysis.utils.ball_hand import ball_hand, shot_started
from utils import get_foot_position, measure_distance, get_center_of_bbox
os.makedirs(output_dir, exist_ok=True)
report_path = os.path.join(output_dir, f"{job_id}_report.txt")
video_out_path = os.path.join(output_dir, f"{job_id}_output.mp4")
# ── 1. Read video ──────────────────────────────────────────────────────
logger.info(f"[{job_id}] Reading video: {video_path}")
video_frames, fps = _read_video(video_path)
if not video_frames:
raise ValueError("Video has no frames")
# ── 2. Detect ball & rim ───────────────────────────────────────────────
logger.info(f"[{job_id}] Running ball/rim detection...")
ball_tracker = BallTracker(model_path=BALL_RIM_MODEL)
rim_tracker = RimTracker(model_path=BALL_RIM_MODEL)
# get_object_tracks returns both ball and rim detections in one pass
all_tracks = ball_tracker.get_object_tracks(video_frames)
# Clean up ball and rim tracks separately without deleting the other
ball_tracks = ball_tracker.remove_wrong_tracks([dict(t) for t in all_tracks])
rim_tracks = rim_tracker.remove_wrong_tracks([dict(t) for t in all_tracks])
interpolated_ball_tracks = ball_tracker.interpolate_missing_tracks(ball_tracks)
ball_loco = ball_tracker.get_ball_loco(video_frames, interpolated_ball_tracks)
rim_tracks = rim_tracker.interpolate_missing_tracks(rim_tracks)
# ── 3. Human pose detection ────────────────────────────────────────────
logger.info(f"[{job_id}] Running human pose detection for {shooting_arm} arm...")
human_tracker = HumanTracker(model_path=POSE_MODEL)
human_tracks = human_tracker.detect_frame(video_frames)
angles = human_tracker.calc_angles(video_frames, human_tracks, shooting_arm=shooting_arm)
points = human_tracker.get_points(video_frames, human_tracks)
# ── 4. Shot detection ──────────────────────────────────────────────────
logger.info(f"[{job_id}] Detecting release frames...")
ball_left_frames = ball_hand(ball_loco, points, video_frames)
shot_starts = shot_started(points, ball_left_frames)
# ── 4a. Shot Distance Calculation ─────────────────────────────────────
logger.info(f"[{job_id}] Calculating shot distances...")
shot_distances = []
for frame_idx in ball_left_frames:
if frame_idx < len(human_tracks) and frame_idx < len(rim_tracks):
res = human_tracks[frame_idx]
# Get person bbox (class 0)
human_bbox = None
if hasattr(res, 'boxes') and len(res.boxes) > 0:
# Find the first person
for box in res.boxes:
if int(box.cls[0]) == 0:
human_bbox = box.xyxy[0].cpu().numpy()
break
# Find rim position
rim_pos = None
for rim in rim_tracks[frame_idx].values():
if rim.get("bbox"):
rim_pos = get_center_of_bbox(rim["bbox"])
break
if human_bbox is not None and rim_pos:
foot_pos = get_foot_position(human_bbox)
# Basic pixel distance
px_dist = measure_distance(foot_pos, rim_pos)
# Convert to meters (rough scale 0.01)
dist_m = round(px_dist * 0.01, 1)
shot_distances.append(dist_m)
else:
shot_distances.append(None)
else:
shot_distances.append(None)
# Classify shot types: 3-pointer (> 6.75m) or mid-range
shot_types = []
for d in shot_distances:
if d is None:
shot_types.append("Mid-Range")
elif d > 6.75:
shot_types.append("3-Pointer")
else:
shot_types.append("Mid-Range")
# ── 4b. Movement & Dribbling Metrics ──────────────────────────────────
logger.info(f"[{job_id}] Calculating movement and dribbling metrics...")
# Track dribbles
dribble_count = 0
prev_wrist_y = None
direction = None # 'up' or 'down'
# Hand index for dribble detection
wrist_idx = 10 if shooting_arm.lower() == "right" else 9
# Movement tracking points (using hip center)
movement_points = []
for i, pts in enumerate(points):
if pts is not None and len(pts) > wrist_idx:
# Dribble detection logic
wrist_y = pts[wrist_idx][1]
if wrist_y > 0:
if prev_wrist_y is not None:
diff = wrist_y - prev_wrist_y
if diff > 10: # Moving down
if direction == 'up':
dribble_count += 1
direction = 'down'
elif diff < -10: # Moving up
direction = 'up'
prev_wrist_y = wrist_y
# Movement tracking (hip center at indices 11, 12)
if len(pts) > 12:
hip_l, hip_r = pts[11], pts[12]
if hip_l[0] > 0 and hip_r[0] > 0:
center = [(hip_l[0] + hip_r[0]) / 2, (hip_l[1] + hip_r[1]) / 2]
movement_points.append({"frame": i, "pos": center})
# Calculate distance and speed
total_distance = 0.0
speeds = []
for i in range(1, len(movement_points)):
p1 = movement_points[i-1]["pos"]
p2 = movement_points[i]["pos"]
frame_diff = movement_points[i]["frame"] - movement_points[i-1]["frame"]
# Rough scale: 1 pixel β‰ˆ 0.01 meters (consistent with analysis/personal_analysis.py)
px_dist = math.sqrt((p2[0]-p1[0])**2 + (p2[1]-p1[1])**2)
m_dist = px_dist * 0.01
total_distance += m_dist
if frame_diff > 0:
speed = m_dist / (frame_diff / fps)
speeds.append(speed)
avg_speed_kmh = (sum(speeds)/len(speeds) * 3.6) if speeds else 0.0
max_speed_kmh = (max(speeds) * 3.6) if speeds else 0.0
# ── 5. Make/miss tracking ──────────────────────────────────────────────
logger.info(f"[{job_id}] Scoring shots...")
shot_tracker = ShotTracker()
shot_tracker.detect_shot(video_frames, interpolated_ball_tracks, rim_tracks)
# ── 6. Draw overlays & analyse form ───────────────────────────────────
logger.info(f"[{job_id}] Drawing analysis overlays...")
human_drawer = HumanTracksDrawer()
out_frames = human_drawer.draw(
video_frames, human_tracks, angles,
draw_boxes=False, draw_keypoints=True,
shooting_arm=shooting_arm
)
out_frames = human_drawer.analysis(
out_frames, angles, ball_left_frames, shot_starts,
report_path,
shot_distances=shot_distances,
shooting_arm=shooting_arm
)
out_frames = shot_tracker.draw_shots(out_frames, shot_types=shot_types)
# ── 7. Calculate aggregate joint metrics ───────────────────────────────
sew_list, esh_list, knee_list, hip_list = angles
avg_sew = sum(a for a in sew_list if a)/len([a for a in sew_list if a]) if any(sew_list) else None
avg_esh = sum(a for a in esh_list if a)/len([a for a in esh_list if a]) if any(esh_list) else None
avg_knee = sum(a for a in knee_list if a)/len([a for a in knee_list if a]) if any(knee_list) else None
avg_hip = sum(a for a in hip_list if a)/len([a for a in hip_list if a]) if any(hip_list) else None
# ── 8. Write video ─────────────────────────────────────────────────────
logger.info(f"[{job_id}] Writing output video...")
_write_video(out_frames, video_out_path, fps)
# ── 8. Parse report.txt into structured data ───────────────────────────
shot_reports = _parse_report(report_path, shot_distances, shot_types)
shots = shot_tracker.shots
made = len([s for s in shots if s["outcome"] == "make"])
missed = len([s for s in shots if s["outcome"] == "miss"])
total = len(shots)
made_pct = round((made / total * 100), 1) if total > 0 else 0.0
return {
"job_id": job_id,
"status": "completed",
"shots_total": total,
"shots_made": made,
"shots_missed": missed,
"made_percentage": made_pct,
"shot_reports": shot_reports,
"shooting_arm": shooting_arm,
"annotated_video_url": f"/personal-output/{job_id}_output.mp4",
# New metrics for web analysis
"dribble_count": dribble_count,
"total_distance_meters": round(total_distance, 1),
"avg_speed_kmh": round(avg_speed_kmh, 1),
"max_speed_kmh": round(max_speed_kmh, 1),
"duration_seconds": round(len(video_frames) / fps, 1),
# Joint metrics
"avg_elbow_angle": round(avg_sew, 1) if avg_sew else None,
"avg_shoulder_angle": round(avg_esh, 1) if avg_esh else None,
"avg_knee_angle": round(avg_knee, 1) if avg_knee else None,
"avg_hip_angle": round(avg_hip, 1) if avg_hip else None
}
def _parse_report(report_path: str, shot_distances: list = None, shot_types: list = None) -> list:
"""Parse the text form-analysis report into structured shot dicts."""
reports = []
if not os.path.exists(report_path):
return reports
with open(report_path) as f:
content = f.read()
# Split on blank lines β†’ one block per shot
blocks = [b.strip() for b in content.strip().split("\n\n") if b.strip()]
for i, block in enumerate(blocks):
lines = [l.strip() for l in block.splitlines() if l.strip()]
verdict = "GOOD FORM" if any("GOOD FORM" in l for l in lines) else "NEEDS WORK"
# Extract issues (ignore header lines and metric lines)
issues = [l for l in lines if (any(k in l.lower() for k in ["angle", "try", "shoot", "distance", "open", "close", "arc"]) and not l.startswith("ANGLE_"))]
# Extract metrics
metrics = {"elbow_angle": 0.0, "shoulder_angle": 0.0, "knee_angle": 0.0, "hip_angle": 0.0}
for l in lines:
if l.startswith("ANGLE_SEW:"):
try: metrics["elbow_angle"] = float(l.split(":")[1].strip())
except: pass
elif l.startswith("ANGLE_ESH:"):
try: metrics["shoulder_angle"] = float(l.split(":")[1].strip())
except: pass
elif l.startswith("ANGLE_KNEE:"):
try: metrics["knee_angle"] = float(l.split(":")[1].strip())
except: pass
elif l.startswith("ANGLE_HIP:"):
try: metrics["hip_angle"] = float(l.split(":")[1].strip())
except: pass
dist = shot_distances[i] if shot_distances and i < len(shot_distances) else None
s_type = shot_types[i] if shot_types and i < len(shot_types) else "Unknown"
reports.append({
"shot_number": i + 1,
"verdict": verdict,
"issues": issues,
"metrics": {**metrics, "distance": dist, "shot_type": s_type}
})
return reports
async def run_personal_analysis(
video_path: str,
output_dir: str,
job_id: str,
shooting_arm: str = "right"
) -> dict:
"""
Async wrapper: runs the blocking pipeline in a thread pool
so FastAPI's event loop stays responsive.
"""
loop = asyncio.get_running_loop()
try:
result = await loop.run_in_executor(
_executor,
_run_pipeline_sync,
video_path,
output_dir,
job_id,
shooting_arm,
)
return result
except Exception as e:
logger.error(f"[{job_id}] Pipeline failed: {e}\n{traceback.format_exc()}")
return {
"job_id": job_id,
"status": "failed",
"error": str(e),
}