""" Personal Analysis Pipeline Service. Wraps the swiss basketball shot analysis algorithm and exposes a single async-friendly entry point: run_personal_analysis(). This is completely separate from the team YOLO analysis pipeline. """ import os import cv2 import uuid import asyncio import logging import traceback import math from typing import List, Optional, Dict, Any from concurrent.futures import ThreadPoolExecutor import sys logger = logging.getLogger("personal_analysis") print(f"PIPELINE LOADED FROM: {__file__}", file=sys.stderr) print(f"DEBUG: CWD = {os.getcwd()}", file=sys.stderr) # Thread pool for CPU-bound analysis work (keeps FastAPI event loop free) _executor = ThreadPoolExecutor(max_workers=2) from app.config import get_settings from app.api.videos import get_video_info # ── Model paths ────────────────────────────────────────────────────────────── settings = get_settings() def _resolve_model_path(path: str) -> str: """ Resolve a model path relative to the backend root. • Absolute paths are used as-is. • Relative paths are anchored to the directory containing 'app/'. """ if os.path.isabs(path): return path # Anchor to the backend root (parent of 'personal_analysis/') _BACKEND_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) candidate = os.path.abspath(os.path.join(_BACKEND_ROOT, path)) if os.path.exists(candidate): return candidate # --- DIAGNOSTIC LOGGING --- logger.error(f"❌ Model file MISSING: {candidate}") logger.info(f"DEBUG: Current Working Directory: {os.getcwd()}") logger.info(f"DEBUG: Backend Root: {_BACKEND_ROOT}") models_dir = os.path.join(_BACKEND_ROOT, "models") if os.path.exists(models_dir): try: files = os.listdir(models_dir) logger.info(f"DEBUG: Contents of {models_dir}: {files}") except Exception as e: logger.error(f"DEBUG: Could not list {models_dir}: {e}") else: logger.error(f"DEBUG: Directory {models_dir} does not even exist!") # Search for the file anywhere in /home/user/app logger.info(f"DEBUG: Searching for {os.path.basename(path)} in /home/user/app...") found_paths = [] for root, dirs, files in os.walk("/home/user/app"): if os.path.basename(path) in files: found_paths.append(os.path.join(root, os.path.basename(path))) if found_paths: logger.info(f"✅ FOUND at alternative locations: {found_paths}") return found_paths[0] # --------------------------- # Robust fallback: check if we are in a 'DON'T TOUCH' mangled path environment # but only if the candidate doesn't exist. if "OKIDI-DONT TOUCH" in candidate: healed = candidate.replace("OKIDI-DONT TOUCH", "OKIDI-DON'T TOUCH") if os.path.exists(healed): return healed logger.warning(f"Model file not found at {candidate!r} — ensure 'python download_models.py' has been run.") return candidate BALL_RIM_MODEL = _resolve_model_path(settings.swish_ball_rim_model) POSE_MODEL = _resolve_model_path(settings.swish_pose_model) print(f"DEBUG: FINAL BALL_RIM_MODEL: {BALL_RIM_MODEL}", file=sys.stderr) print(f"DEBUG: FINAL POSE_MODEL: {POSE_MODEL}", file=sys.stderr) def _read_video(path: str): """Read video frames using OpenCV. Returns (frames, fps).""" cap = cv2.VideoCapture(path) if not cap.isOpened(): raise IOError(f"Cannot open video: {path}") fps = cap.get(cv2.CAP_PROP_FPS) or 30 frames = [] while True: ret, frame = cap.read() if not ret: break frames.append(frame) cap.release() return frames, fps def _write_video(frames: list, out_path: str, fps: float = 30.0): """Write annotated frames to a browser-compatible MP4 file.""" if not frames: return h, w = frames[0].shape[:2] # Primary choice: mp4v (widely supported software codec) # We then transcode to H.264 (libx264) using FFmpeg for the browser. fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(out_path, fourcc, fps, (w, h)) # Fallback to XVID if mp4v fails (rare) if not writer.isOpened(): logger.warning(f"VideoWriter failed with {fourcc}. Trying XVID fallback.") fourcc = cv2.VideoWriter_fourcc(*"XVID") writer = cv2.VideoWriter(out_path, fourcc, fps, (w, h)) if not writer.isOpened(): logger.error(f"❌ CRITICAL: Failed to open VideoWriter with path: {out_path}") return for frame in frames: writer.write(frame) writer.release() # Optional: re-encode with ffmpeg for a proper web-ready H.264+AAC stream # (ffmpeg ensures the moov atom is at the front for fast browser starts) try: import subprocess tmp_path = out_path.replace(".mp4", "_tmp.mp4") os.rename(out_path, tmp_path) result = subprocess.run( ["ffmpeg", "-y", "-i", tmp_path, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-c:v", "libx264", "-preset", "fast", "-movflags", "+faststart", "-pix_fmt", "yuv420p", "-an", out_path], capture_output=True, timeout=300 ) if result.returncode == 0: os.remove(tmp_path) else: # ffmpeg failed — restore original mp4v file os.rename(tmp_path, out_path) except Exception: # ffmpeg not available or failed — mp4v file is still usable pass def _run_pipeline_sync(video_path: str, output_dir: str, job_id: str, shooting_arm: str = "right") -> dict: """ Synchronous (blocking) pipeline — runs in a thread pool. Returns a structured results dict. """ from personal_analysis.trackers.ball_tracker import BallTracker from personal_analysis.trackers.rim_tracker import RimTracker from personal_analysis.trackers.human_tracker import HumanTracker from personal_analysis.drawers.shot_tracker import ShotTracker from personal_analysis.drawers.human_tracks_drawer import HumanTracksDrawer from personal_analysis.utils.ball_hand import ball_hand, shot_started from utils import get_foot_position, measure_distance, get_center_of_bbox os.makedirs(output_dir, exist_ok=True) report_path = os.path.join(output_dir, f"{job_id}_report.txt") video_out_path = os.path.join(output_dir, f"{job_id}_output.mp4") # ── 1. Read video ────────────────────────────────────────────────────── logger.info(f"[{job_id}] Reading video: {video_path}") video_frames, fps = _read_video(video_path) if not video_frames: raise ValueError("Video has no frames") # ── 2. Detect ball & rim ─────────────────────────────────────────────── logger.info(f"[{job_id}] Running ball/rim detection...") ball_tracker = BallTracker(model_path=BALL_RIM_MODEL) rim_tracker = RimTracker(model_path=BALL_RIM_MODEL) # get_object_tracks returns both ball and rim detections in one pass all_tracks = ball_tracker.get_object_tracks(video_frames) # Clean up ball and rim tracks separately without deleting the other ball_tracks = ball_tracker.remove_wrong_tracks([dict(t) for t in all_tracks]) rim_tracks = rim_tracker.remove_wrong_tracks([dict(t) for t in all_tracks]) interpolated_ball_tracks = ball_tracker.interpolate_missing_tracks(ball_tracks) ball_loco = ball_tracker.get_ball_loco(video_frames, interpolated_ball_tracks) rim_tracks = rim_tracker.interpolate_missing_tracks(rim_tracks) # ── 3. Human pose detection ──────────────────────────────────────────── logger.info(f"[{job_id}] Running human pose detection for {shooting_arm} arm...") human_tracker = HumanTracker(model_path=POSE_MODEL) human_tracks = human_tracker.detect_frame(video_frames) angles = human_tracker.calc_angles(video_frames, human_tracks, shooting_arm=shooting_arm) points = human_tracker.get_points(video_frames, human_tracks) # ── 4. Shot detection ────────────────────────────────────────────────── logger.info(f"[{job_id}] Detecting release frames...") ball_left_frames = ball_hand(ball_loco, points, video_frames) shot_starts = shot_started(points, ball_left_frames) # ── 4a. Shot Distance Calculation ───────────────────────────────────── logger.info(f"[{job_id}] Calculating shot distances...") shot_distances = [] for frame_idx in ball_left_frames: if frame_idx < len(human_tracks) and frame_idx < len(rim_tracks): res = human_tracks[frame_idx] # Get person bbox (class 0) human_bbox = None if hasattr(res, 'boxes') and len(res.boxes) > 0: # Find the first person for box in res.boxes: if int(box.cls[0]) == 0: human_bbox = box.xyxy[0].cpu().numpy() break # Find rim position rim_pos = None for rim in rim_tracks[frame_idx].values(): if rim.get("bbox"): rim_pos = get_center_of_bbox(rim["bbox"]) break if human_bbox is not None and rim_pos: foot_pos = get_foot_position(human_bbox) # Basic pixel distance px_dist = measure_distance(foot_pos, rim_pos) # Convert to meters (rough scale 0.01) dist_m = round(px_dist * 0.01, 1) shot_distances.append(dist_m) else: shot_distances.append(None) else: shot_distances.append(None) # Classify shot types: 3-pointer (> 6.75m) or mid-range shot_types = [] for d in shot_distances: if d is None: shot_types.append("Mid-Range") elif d > 6.75: shot_types.append("3-Pointer") else: shot_types.append("Mid-Range") # ── 4b. Movement & Dribbling Metrics ────────────────────────────────── logger.info(f"[{job_id}] Calculating movement and dribbling metrics...") # Track dribbles dribble_count = 0 prev_wrist_y = None direction = None # 'up' or 'down' # Hand index for dribble detection wrist_idx = 10 if shooting_arm.lower() == "right" else 9 # Movement tracking points (using hip center) movement_points = [] for i, pts in enumerate(points): if pts is not None and len(pts) > wrist_idx: # Dribble detection logic wrist_y = pts[wrist_idx][1] if wrist_y > 0: if prev_wrist_y is not None: diff = wrist_y - prev_wrist_y if diff > 10: # Moving down if direction == 'up': dribble_count += 1 direction = 'down' elif diff < -10: # Moving up direction = 'up' prev_wrist_y = wrist_y # Movement tracking (hip center at indices 11, 12) if len(pts) > 12: hip_l, hip_r = pts[11], pts[12] if hip_l[0] > 0 and hip_r[0] > 0: center = [(hip_l[0] + hip_r[0]) / 2, (hip_l[1] + hip_r[1]) / 2] movement_points.append({"frame": i, "pos": center}) # Calculate distance and speed total_distance = 0.0 speeds = [] for i in range(1, len(movement_points)): p1 = movement_points[i-1]["pos"] p2 = movement_points[i]["pos"] frame_diff = movement_points[i]["frame"] - movement_points[i-1]["frame"] # Rough scale: 1 pixel ≈ 0.01 meters (consistent with analysis/personal_analysis.py) px_dist = math.sqrt((p2[0]-p1[0])**2 + (p2[1]-p1[1])**2) m_dist = px_dist * 0.01 total_distance += m_dist if frame_diff > 0: speed = m_dist / (frame_diff / fps) speeds.append(speed) avg_speed_kmh = (sum(speeds)/len(speeds) * 3.6) if speeds else 0.0 max_speed_kmh = (max(speeds) * 3.6) if speeds else 0.0 # ── 5. Make/miss tracking ────────────────────────────────────────────── logger.info(f"[{job_id}] Scoring shots...") shot_tracker = ShotTracker() shot_tracker.detect_shot(video_frames, interpolated_ball_tracks, rim_tracks) # ── 6. Draw overlays & analyse form ─────────────────────────────────── logger.info(f"[{job_id}] Drawing analysis overlays...") human_drawer = HumanTracksDrawer() out_frames = human_drawer.draw( video_frames, human_tracks, angles, draw_boxes=False, draw_keypoints=True, shooting_arm=shooting_arm ) out_frames = human_drawer.analysis( out_frames, angles, ball_left_frames, shot_starts, report_path, shot_distances=shot_distances, shooting_arm=shooting_arm ) out_frames = shot_tracker.draw_shots(out_frames, shot_types=shot_types) # ── 7. Calculate aggregate joint metrics ─────────────────────────────── sew_list, esh_list, knee_list, hip_list = angles avg_sew = sum(a for a in sew_list if a)/len([a for a in sew_list if a]) if any(sew_list) else None avg_esh = sum(a for a in esh_list if a)/len([a for a in esh_list if a]) if any(esh_list) else None avg_knee = sum(a for a in knee_list if a)/len([a for a in knee_list if a]) if any(knee_list) else None avg_hip = sum(a for a in hip_list if a)/len([a for a in hip_list if a]) if any(hip_list) else None # ── 8. Write video ───────────────────────────────────────────────────── logger.info(f"[{job_id}] Writing output video...") _write_video(out_frames, video_out_path, fps) # ── 8. Parse report.txt into structured data ─────────────────────────── shot_reports = _parse_report(report_path, shot_distances, shot_types) shots = shot_tracker.shots made = len([s for s in shots if s["outcome"] == "make"]) missed = len([s for s in shots if s["outcome"] == "miss"]) total = len(shots) made_pct = round((made / total * 100), 1) if total > 0 else 0.0 return { "job_id": job_id, "status": "completed", "shots_total": total, "shots_made": made, "shots_missed": missed, "made_percentage": made_pct, "shot_reports": shot_reports, "shooting_arm": shooting_arm, "annotated_video_url": f"/personal-output/{job_id}_output.mp4", # New metrics for web analysis "dribble_count": dribble_count, "total_distance_meters": round(total_distance, 1), "avg_speed_kmh": round(avg_speed_kmh, 1), "max_speed_kmh": round(max_speed_kmh, 1), "duration_seconds": round(len(video_frames) / fps, 1), # Joint metrics "avg_elbow_angle": round(avg_sew, 1) if avg_sew else None, "avg_shoulder_angle": round(avg_esh, 1) if avg_esh else None, "avg_knee_angle": round(avg_knee, 1) if avg_knee else None, "avg_hip_angle": round(avg_hip, 1) if avg_hip else None } def _parse_report(report_path: str, shot_distances: list = None, shot_types: list = None) -> list: """Parse the text form-analysis report into structured shot dicts.""" reports = [] if not os.path.exists(report_path): return reports with open(report_path) as f: content = f.read() # Split on blank lines → one block per shot blocks = [b.strip() for b in content.strip().split("\n\n") if b.strip()] for i, block in enumerate(blocks): lines = [l.strip() for l in block.splitlines() if l.strip()] verdict = "GOOD FORM" if any("GOOD FORM" in l for l in lines) else "NEEDS WORK" # Extract issues (ignore header lines and metric lines) issues = [l for l in lines if (any(k in l.lower() for k in ["angle", "try", "shoot", "distance", "open", "close", "arc"]) and not l.startswith("ANGLE_"))] # Extract metrics metrics = {"elbow_angle": 0.0, "shoulder_angle": 0.0, "knee_angle": 0.0, "hip_angle": 0.0} for l in lines: if l.startswith("ANGLE_SEW:"): try: metrics["elbow_angle"] = float(l.split(":")[1].strip()) except: pass elif l.startswith("ANGLE_ESH:"): try: metrics["shoulder_angle"] = float(l.split(":")[1].strip()) except: pass elif l.startswith("ANGLE_KNEE:"): try: metrics["knee_angle"] = float(l.split(":")[1].strip()) except: pass elif l.startswith("ANGLE_HIP:"): try: metrics["hip_angle"] = float(l.split(":")[1].strip()) except: pass dist = shot_distances[i] if shot_distances and i < len(shot_distances) else None s_type = shot_types[i] if shot_types and i < len(shot_types) else "Unknown" reports.append({ "shot_number": i + 1, "verdict": verdict, "issues": issues, "metrics": {**metrics, "distance": dist, "shot_type": s_type} }) return reports async def run_personal_analysis( video_path: str, output_dir: str, job_id: str, shooting_arm: str = "right" ) -> dict: """ Async wrapper: runs the blocking pipeline in a thread pool so FastAPI's event loop stays responsive. """ loop = asyncio.get_running_loop() try: result = await loop.run_in_executor( _executor, _run_pipeline_sync, video_path, output_dir, job_id, shooting_arm, ) return result except Exception as e: logger.error(f"[{job_id}] Pipeline failed: {e}\n{traceback.format_exc()}") return { "job_id": job_id, "status": "failed", "error": str(e), }